1#![forbid(unsafe_code)]
2use std::sync::atomic::{AtomicBool, Ordering};
48use std::sync::Arc;
49use std::thread::JoinHandle;
50use std::time::{Duration, Instant};
51
52use wombatkv_radix::{BlockHash, BlockMeta, MetadataIndex, ModelDigest};
53
54const FETCH_WALLCLOCK_CAP: Duration = Duration::from_secs(5);
58
59#[derive(Clone, Debug)]
61pub struct PrefetchConfig {
62 pub interval: Duration,
65 pub top_k: usize,
68 pub model_digest: ModelDigest,
71 pub namespace: String,
75}
76
77impl Default for PrefetchConfig {
78 fn default() -> Self {
79 Self {
80 interval: Duration::from_millis(500),
81 top_k: 8,
82 model_digest: [0u8; 24],
83 namespace: String::new(),
84 }
85 }
86}
87
88pub struct PrefetchWorker {
94 handle: Option<JoinHandle<()>>,
95 stop: Arc<AtomicBool>,
96}
97
98impl PrefetchWorker {
99 pub fn signal_stop(&self) {
103 self.stop.store(true, Ordering::SeqCst);
104 }
105
106 #[must_use]
109 pub fn is_running(&self) -> bool {
110 self.handle.as_ref().is_some_and(|h| !h.is_finished())
111 }
112}
113
114impl Drop for PrefetchWorker {
115 fn drop(&mut self) {
116 self.stop.store(true, Ordering::SeqCst);
117 if let Some(h) = self.handle.take() {
118 let _ = h.join();
121 }
122 }
123}
124
125pub type PrefetchEmit = Arc<dyn Fn(&PrefetchPlan) + Send + Sync>;
129
130#[derive(Clone, Debug)]
133pub struct PrefetchPlan {
134 pub scored: usize,
136 pub selected: Vec<(BlockHash, BlockMeta, f64)>,
138 pub elapsed: Duration,
140}
141
142pub trait PrefetchFetcher: Send + Sync {
152 fn contains_flat(&self, namespace: &str, key: &str) -> bool;
156
157 fn fetch_block(&self, namespace: &str, key: &str) -> Result<Option<u64>, String>;
162}
163
164#[derive(Clone, Debug, Default)]
167pub struct PrefetchFetchOutcome {
168 pub scored: usize,
169 pub selected: usize,
170 pub skipped_already_flat: usize,
171 pub fetched: usize,
172 pub failed: usize,
173 pub bytes_materialized: u64,
174 pub elapsed_ms: u128,
175}
176
177#[must_use]
182pub fn score_block(meta: &BlockMeta, now_ns: u64, active_model: &ModelDigest) -> f64 {
183 const W_RECENCY: f64 = 1.0;
185 const W_CHAIN: f64 = 0.3;
186 const W_MODEL: f64 = 0.2;
187 let decay: f64 = std::f64::consts::LN_2 / 600.0e9_f64;
189
190 let age_ns = now_ns.saturating_sub(meta.last_access_ns) as f64;
194 let recency = (-decay * age_ns).exp();
195
196 let chain_bonus = if meta.block_seq == 0 { 1.0 } else { 0.0 };
197 let model_bonus = if &meta.model_digest == active_model { 1.0 } else { 0.0 };
198
199 W_RECENCY * recency + W_CHAIN * chain_bonus + W_MODEL * model_bonus
200}
201
202fn now_ns() -> u64 {
203 std::time::SystemTime::now()
204 .duration_since(std::time::UNIX_EPOCH)
205 .map_or(0, |d| d.as_nanos() as u64)
206}
207
208#[must_use]
212pub fn block_key_for_hash(hash: &BlockHash) -> String {
213 use wombatkv_radix::BLOCK_KEY_PREFIX;
214 let mut s = String::with_capacity(BLOCK_KEY_PREFIX.len() + 64);
215 s.push_str(BLOCK_KEY_PREFIX);
216 for b in hash {
217 s.push_str(&hex_pair(*b));
218 }
219 s
220}
221
222fn hex_pair(b: u8) -> String {
223 let hi = HEX[(b >> 4) as usize];
224 let lo = HEX[(b & 0x0f) as usize];
225 let mut s = String::with_capacity(2);
226 s.push(hi);
227 s.push(lo);
228 s
229}
230
231const HEX: [char; 16] =
232 ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'];
233
234#[must_use]
240pub fn run_cycle(index: &dyn MetadataIndex, config: &PrefetchConfig) -> PrefetchPlan {
241 let started = Instant::now();
242 let now = now_ns();
243 let snapshot = index.entries();
244 let scored = snapshot.len();
245
246 let mut scored_entries: Vec<(BlockHash, BlockMeta, f64)> = snapshot
251 .into_iter()
252 .map(|(h, m)| {
253 let s = score_block(&m, now, &config.model_digest);
254 (h, m, s)
255 })
256 .collect();
257
258 scored_entries.sort_by(|a, b| {
259 b.2.partial_cmp(&a.2).unwrap_or(std::cmp::Ordering::Equal)
262 });
263
264 let mut selected = scored_entries;
265 selected.truncate(config.top_k);
266
267 PrefetchPlan { scored, selected, elapsed: started.elapsed() }
268}
269
270pub fn spawn_worker(
282 index: Arc<dyn MetadataIndex>,
283 config: PrefetchConfig,
284 emit: PrefetchEmit,
285) -> PrefetchWorker {
286 let stop = Arc::new(AtomicBool::new(false));
287 let stop_for_thread = stop.clone();
288 let handle = std::thread::Builder::new()
289 .name("wombatkv-prefetch".to_string())
290 .spawn(move || {
291 let slice = Duration::from_millis(25);
295 loop {
296 if stop_for_thread.load(Ordering::SeqCst) {
297 break;
298 }
299 let plan = run_cycle(index.as_ref(), &config);
300 emit(&plan);
301
302 let mut remaining = config.interval;
305 while remaining > Duration::ZERO {
306 if stop_for_thread.load(Ordering::SeqCst) {
307 break;
308 }
309 let s = remaining.min(slice);
310 std::thread::sleep(s);
311 remaining = remaining.saturating_sub(s);
312 }
313 }
314 })
315 .expect("spawn prefetch worker");
316
317 PrefetchWorker { handle: Some(handle), stop }
318}
319
320pub fn spawn_worker_v2(
334 index: Arc<dyn MetadataIndex>,
335 config: PrefetchConfig,
336 fetcher: Arc<dyn PrefetchFetcher>,
337 emit_outcome: Arc<dyn Fn(&PrefetchFetchOutcome) + Send + Sync>,
338) -> PrefetchWorker {
339 let stop = Arc::new(AtomicBool::new(false));
340 let stop_for_thread = stop.clone();
341 let handle = std::thread::Builder::new()
342 .name("wombatkv-prefetch-v2".to_string())
343 .spawn(move || {
344 let slice = Duration::from_millis(25);
345 loop {
346 if stop_for_thread.load(Ordering::SeqCst) {
347 break;
348 }
349 let outcome =
350 run_cycle_v2(index.as_ref(), fetcher.as_ref(), &config, &stop_for_thread);
351 emit_outcome(&outcome);
352
353 let mut remaining = config.interval;
354 while remaining > Duration::ZERO {
355 if stop_for_thread.load(Ordering::SeqCst) {
356 break;
357 }
358 let s = remaining.min(slice);
359 std::thread::sleep(s);
360 remaining = remaining.saturating_sub(s);
361 }
362 }
363 })
364 .expect("spawn prefetch worker v2");
365
366 PrefetchWorker { handle: Some(handle), stop }
367}
368
369#[must_use]
376pub fn run_cycle_v2(
377 index: &dyn MetadataIndex,
378 fetcher: &dyn PrefetchFetcher,
379 config: &PrefetchConfig,
380 stop: &AtomicBool,
381) -> PrefetchFetchOutcome {
382 let started = Instant::now();
383 let plan = run_cycle(index, config);
384 let scored = plan.scored;
385 let selected = plan.selected.len();
386
387 let mut skipped_already_flat = 0_usize;
388 let mut fetched = 0_usize;
389 let mut failed = 0_usize;
390 let mut bytes_materialized = 0_u64;
391
392 for (hash, _meta, _score) in plan.selected {
393 if stop.load(Ordering::SeqCst) {
394 break;
395 }
396 let key = block_key_for_hash(&hash);
397 if fetcher.contains_flat(&config.namespace, &key) {
398 skipped_already_flat += 1;
399 continue;
400 }
401 let fetch_started = Instant::now();
402 match fetcher.fetch_block(&config.namespace, &key) {
403 Ok(Some(bytes_len)) => {
404 let cost = fetch_started.elapsed();
405 if cost > FETCH_WALLCLOCK_CAP {
406 eprintln!(
407 "wombatkv[prefetch v2]: get_kv({key}) took {cost:?} \
408 (cap {FETCH_WALLCLOCK_CAP:?}); aborting remainder of cycle"
409 );
410 fetched += 1;
411 bytes_materialized = bytes_materialized.saturating_add(bytes_len);
412 break;
413 }
414 fetched += 1;
415 bytes_materialized = bytes_materialized.saturating_add(bytes_len);
416 }
417 Ok(None) => {
418 failed += 1;
421 }
422 Err(err) => {
423 failed += 1;
424 eprintln!("wombatkv[prefetch v2]: get_kv({key}) failed: {err}");
425 }
426 }
427 }
428
429 PrefetchFetchOutcome {
430 scored,
431 selected,
432 skipped_already_flat,
433 fetched,
434 failed,
435 bytes_materialized,
436 elapsed_ms: started.elapsed().as_millis(),
437 }
438}
439
440#[must_use]
444pub fn default_emit() -> PrefetchEmit {
445 Arc::new(|plan: &PrefetchPlan| {
446 let elapsed_ms = plan.elapsed.as_millis();
447 let scored = plan.scored;
451 let materialized = plan.selected.len();
452 eprintln!(
453 "[MyelonInstr] {{\"scope\":\"wmbt_kv_timing\",\"fn\":\"prefetch_cycle\",\
454 \"stages\":{{\"scored\":{scored},\"materialized\":{materialized},\
455 \"elapsed_ms\":{elapsed_ms}}}}}"
456 );
457 })
458}
459
460#[must_use]
464pub fn default_v2_emit() -> Arc<dyn Fn(&PrefetchFetchOutcome) + Send + Sync> {
465 Arc::new(|o: &PrefetchFetchOutcome| {
466 eprintln!(
467 "[MyelonInstr] {{\"scope\":\"wmbt_kv_timing\",\"fn\":\"prefetch_cycle_v2\",\
468 \"stages\":{{\"scored\":{},\"selected\":{},\"skipped_already_flat\":{},\
469 \"fetched\":{},\"failed\":{},\"bytes_materialized\":{},\"elapsed_ms\":{}}}}}",
470 o.scored,
471 o.selected,
472 o.skipped_already_flat,
473 o.fetched,
474 o.failed,
475 o.bytes_materialized,
476 o.elapsed_ms,
477 );
478 })
479}
480
481#[must_use]
485pub fn dry_run_enabled() -> bool {
486 matches!(
487 std::env::var("WMBT_KV_PREFETCH_DRY_RUN").ok().as_deref(),
488 Some("1" | "true" | "TRUE" | "yes" | "on")
489 )
490}
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495 use std::sync::Mutex;
496 use std::time::Duration;
497 use wombatkv_radix::InMemoryMetadataIndex;
498
499 fn mk_meta(seq: u32, last_access_ns: u64, model: ModelDigest) -> BlockMeta {
500 let mut m = BlockMeta {
501 parent_hash: BlockMeta::ZERO_HASH,
502 block_seq: seq,
503 payload_bytes: 1024,
504 last_access_ns,
505 model_digest: model,
506 layout_tag: [0u8; 16],
507 ext_flags: 0,
508 };
509 m.last_access_ns = last_access_ns;
512 m
513 }
514
515 fn make_hash(seed: u8) -> BlockHash {
516 let mut h = [0u8; 32];
517 h[0] = seed;
518 h
519 }
520
521 #[test]
522 fn score_recency_decay_monotone() {
523 let model = [7u8; 24];
525 let now = 10_000_000_000_000_u64; let fresh = mk_meta(1, now - 1_000_000_000, model); let stale = mk_meta(1, now - 600_000_000_000, model); let ancient = mk_meta(1, now - 3_600_000_000_000, model); let s_fresh = score_block(&fresh, now, &model);
531 let s_stale = score_block(&stale, now, &model);
532 let s_ancient = score_block(&ancient, now, &model);
533
534 assert!(s_fresh > s_stale);
535 assert!(s_stale > s_ancient);
536 let recency_fresh = s_fresh - 0.2_f64;
541 let recency_stale = s_stale - 0.2_f64;
542 let ratio = recency_stale / recency_fresh;
543 assert!((0.40..0.60).contains(&ratio), "stale/fresh recency ratio {ratio} not near 0.5");
545 }
546
547 #[test]
548 fn score_chain_head_outranks_successor_at_equal_recency() {
549 let model = [7u8; 24];
550 let now = 10_000_000_000_000_u64;
551 let head = mk_meta(0, now - 1_000_000_000, model);
552 let succ = mk_meta(5, now - 1_000_000_000, model);
553 assert!(score_block(&head, now, &model) > score_block(&succ, now, &model));
554 }
555
556 #[test]
557 fn score_model_affinity_outranks_others() {
558 let active = [7u8; 24];
559 let other = [42u8; 24];
560 let now = 10_000_000_000_000_u64;
561 let same = mk_meta(1, now - 1_000_000_000, active);
562 let diff = mk_meta(1, now - 1_000_000_000, other);
563 assert!(score_block(&same, now, &active) > score_block(&diff, now, &active));
564 }
565
566 #[test]
567 fn run_cycle_picks_top_k_by_score() {
568 let idx = InMemoryMetadataIndex::new();
569 let active = [7u8; 24];
570 let now = now_ns();
571
572 let entries: Vec<(BlockHash, BlockMeta)> = (0..100_u8)
579 .map(|i| {
580 let h = make_hash(i);
581 let is_fresh = i >= 50;
582 let last = if is_fresh {
583 now.saturating_sub(1_000_000_000) } else {
585 now.saturating_sub(3_600_000_000_000) };
587 let seq = if is_fresh && i % 3 == 0 { 0 } else { u32::from(i) + 1 };
588 (h, mk_meta(seq, last, active))
589 })
590 .collect();
591 idx.bulk_load(entries);
592 assert_eq!(idx.len(), 100);
593
594 let cfg = PrefetchConfig {
595 interval: Duration::from_mins(1),
596 top_k: 10,
597 model_digest: active,
598 namespace: String::new(),
599 };
600 let plan = run_cycle(&idx, &cfg);
601 assert_eq!(plan.scored, 100);
602 assert_eq!(plan.selected.len(), 10);
603
604 for (h, _, _) in &plan.selected {
609 assert!(h[0] >= 50, "selected stale block {}", h[0]);
610 }
611
612 for w in plan.selected.windows(2) {
614 assert!(w[0].2 >= w[1].2, "scores not descending");
615 }
616 }
617
618 #[test]
619 fn worker_runs_cycle_and_stops_within_one_second() {
620 let idx: Arc<dyn MetadataIndex> = Arc::new(InMemoryMetadataIndex::new());
621 let active = [7u8; 24];
624 idx.insert(make_hash(1), mk_meta(0, now_ns(), active));
625 idx.insert(make_hash(2), mk_meta(1, now_ns(), active));
626
627 let cycles = Arc::new(Mutex::new(0_usize));
628 let cycles_cb = cycles.clone();
629 let emit: PrefetchEmit = Arc::new(move |_plan: &PrefetchPlan| {
630 *cycles_cb.lock().unwrap() += 1;
631 });
632
633 let cfg = PrefetchConfig {
634 interval: Duration::from_millis(50),
635 top_k: 4,
636 model_digest: active,
637 namespace: String::new(),
638 };
639
640 let started = Instant::now();
641 let worker = spawn_worker(idx, cfg, emit);
642
643 std::thread::sleep(Duration::from_millis(200));
645 let observed = *cycles.lock().unwrap();
646 assert!(observed >= 2, "expected ≥ 2 cycles, got {observed}");
647
648 drop(worker);
651 let drop_time = started.elapsed();
652 assert!(drop_time < Duration::from_secs(2), "worker shutdown took {drop_time:?}");
653 }
654
655 #[test]
656 fn worker_handles_empty_index_gracefully() {
657 let idx: Arc<dyn MetadataIndex> = Arc::new(InMemoryMetadataIndex::new());
658 let cycles = Arc::new(Mutex::new(0_usize));
659 let cycles_cb = cycles.clone();
660 let emit: PrefetchEmit = Arc::new(move |plan: &PrefetchPlan| {
661 assert_eq!(plan.scored, 0);
662 assert!(plan.selected.is_empty());
663 *cycles_cb.lock().unwrap() += 1;
664 });
665
666 let cfg = PrefetchConfig {
667 interval: Duration::from_millis(30),
668 top_k: 8,
669 model_digest: [0u8; 24],
670 namespace: String::new(),
671 };
672 let worker = spawn_worker(idx, cfg, emit);
673 std::thread::sleep(Duration::from_millis(120));
674 let n = *cycles.lock().unwrap();
675 drop(worker);
676 assert!(n >= 2, "expected ≥ 2 cycles on empty index, got {n}");
677 }
678
679 #[test]
680 fn signal_stop_makes_drop_fast_even_with_long_interval() {
681 let idx: Arc<dyn MetadataIndex> = Arc::new(InMemoryMetadataIndex::new());
682 let emit: PrefetchEmit = Arc::new(|_| {});
683 let cfg = PrefetchConfig {
684 interval: Duration::from_secs(10),
687 top_k: 1,
688 model_digest: [0u8; 24],
689 namespace: String::new(),
690 };
691 let worker = spawn_worker(idx, cfg, emit);
692 std::thread::sleep(Duration::from_millis(50));
695 let t = Instant::now();
696 worker.signal_stop();
697 drop(worker);
698 assert!(t.elapsed() < Duration::from_millis(500), "drop took {:?}", t.elapsed());
699 }
700
701 #[test]
702 fn block_key_for_hash_matches_cabi_format() {
703 use wombatkv_radix::BLOCK_KEY_PREFIX;
705 let mut h = [0u8; 32];
706 h[0] = 0xab;
707 h[1] = 0xcd;
708 h[31] = 0xef;
709 let key = block_key_for_hash(&h);
710 assert_eq!(key.len(), BLOCK_KEY_PREFIX.len() + 64);
711 assert!(key.starts_with(BLOCK_KEY_PREFIX));
712 let hex = &key[BLOCK_KEY_PREFIX.len()..];
713 assert!(hex.starts_with("abcd"), "got hex prefix {hex:?}");
714 assert!(hex.ends_with("ef"));
715 assert!(hex.chars().all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase()));
716 }
717
718 struct MockFetcher {
728 flat_keys: Mutex<Vec<String>>,
729 errors: Mutex<std::collections::HashMap<String, String>>,
731 bytes: Mutex<std::collections::HashMap<String, u64>>,
733 calls: Mutex<Vec<(String, String)>>,
735 }
736
737 impl MockFetcher {
738 fn new() -> Self {
739 Self {
740 flat_keys: Mutex::new(Vec::new()),
741 errors: Mutex::new(std::collections::HashMap::new()),
742 bytes: Mutex::new(std::collections::HashMap::new()),
743 calls: Mutex::new(Vec::new()),
744 }
745 }
746
747 fn stock_hit(&self, key: &str, len: u64) {
748 self.bytes.lock().unwrap().insert(key.to_string(), len);
749 }
750
751 fn stock_err(&self, key: &str, msg: &str) {
752 self.errors.lock().unwrap().insert(key.to_string(), msg.to_string());
753 }
754
755 fn pre_warm_flat(&self, key: &str) {
756 self.flat_keys.lock().unwrap().push(key.to_string());
757 }
758
759 fn calls(&self) -> Vec<(String, String)> {
760 self.calls.lock().unwrap().clone()
761 }
762 }
763
764 impl PrefetchFetcher for MockFetcher {
765 fn contains_flat(&self, _namespace: &str, key: &str) -> bool {
766 self.flat_keys.lock().unwrap().iter().any(|k| k == key)
767 }
768
769 fn fetch_block(&self, namespace: &str, key: &str) -> Result<Option<u64>, String> {
770 self.calls.lock().unwrap().push((namespace.to_string(), key.to_string()));
771 if let Some(err) = self.errors.lock().unwrap().remove(key) {
772 return Err(err);
773 }
774 if let Some(len) = self.bytes.lock().unwrap().get(key).copied() {
775 self.flat_keys.lock().unwrap().push(key.to_string());
778 return Ok(Some(len));
779 }
780 Ok(None)
781 }
782 }
783
784 fn seed_recent_blocks(
785 idx: &InMemoryMetadataIndex,
786 count: u8,
787 model: ModelDigest,
788 ) -> Vec<BlockHash> {
789 let now = now_ns();
790 let mut hashes = Vec::with_capacity(count as usize);
791 for i in 0..count {
792 let h = make_hash(i);
793 let last = now.saturating_sub(1_000_000_000_u64.saturating_mul(u64::from(i)));
797 idx.bulk_load(std::iter::once((h, mk_meta(0, last, model))));
798 hashes.push(h);
799 }
800 hashes
801 }
802
803 #[test]
804 fn v2_fetches_top_k_against_kvstore() {
805 let idx = Arc::new(InMemoryMetadataIndex::new());
809 let model = [7u8; 24];
810 let hashes = seed_recent_blocks(&idx, 100, model);
811
812 let fetcher = Arc::new(MockFetcher::new());
813 for h in &hashes {
814 let key = block_key_for_hash(h);
815 fetcher.stock_hit(&key, 1024);
816 }
817
818 let cfg = PrefetchConfig {
819 interval: Duration::from_mins(1),
820 top_k: 10,
821 model_digest: model,
822 namespace: "ns-a".to_string(),
823 };
824
825 let stop = AtomicBool::new(false);
826 let outcome = run_cycle_v2(
827 idx.as_ref() as &dyn MetadataIndex,
828 fetcher.as_ref() as &dyn PrefetchFetcher,
829 &cfg,
830 &stop,
831 );
832
833 assert_eq!(outcome.scored, 100);
834 assert_eq!(outcome.selected, 10);
835 assert_eq!(outcome.skipped_already_flat, 0);
836 assert_eq!(outcome.fetched, 10);
837 assert_eq!(outcome.failed, 0);
838 assert_eq!(outcome.bytes_materialized, 10 * 1024);
839
840 let calls = fetcher.calls();
841 assert_eq!(calls.len(), 10);
842 for (ns, key) in &calls {
843 assert_eq!(ns, "ns-a");
844 assert!(key.starts_with(wombatkv_radix::BLOCK_KEY_PREFIX));
845 }
846 }
847
848 #[test]
849 fn v2_skips_already_flat() {
850 let idx = Arc::new(InMemoryMetadataIndex::new());
854 let model = [7u8; 24];
855 let hashes = seed_recent_blocks(&idx, 10, model);
856
857 let fetcher = Arc::new(MockFetcher::new());
858 for h in &hashes {
859 let key = block_key_for_hash(h);
860 fetcher.stock_hit(&key, 256);
861 }
862 for h in &hashes[..5] {
864 let key = block_key_for_hash(h);
865 fetcher.pre_warm_flat(&key);
866 }
867
868 let cfg = PrefetchConfig {
869 interval: Duration::from_mins(1),
870 top_k: 10,
871 model_digest: model,
872 namespace: "ns-a".to_string(),
873 };
874 let stop = AtomicBool::new(false);
875 let outcome = run_cycle_v2(
876 idx.as_ref() as &dyn MetadataIndex,
877 fetcher.as_ref() as &dyn PrefetchFetcher,
878 &cfg,
879 &stop,
880 );
881
882 assert_eq!(outcome.scored, 10);
883 assert_eq!(outcome.selected, 10);
884 assert_eq!(outcome.skipped_already_flat, 5);
885 assert_eq!(outcome.fetched, 5);
886 assert_eq!(outcome.failed, 0);
887 assert_eq!(outcome.bytes_materialized, 5 * 256);
888
889 let calls = fetcher.calls();
890 assert_eq!(calls.len(), 5, "should only fetch the 5 not-yet-flat keys");
891 }
892
893 #[test]
894 fn v2_handles_get_kv_errors_gracefully() {
895 let idx = Arc::new(InMemoryMetadataIndex::new());
896 let model = [7u8; 24];
897 let hashes = seed_recent_blocks(&idx, 6, model);
898
899 let fetcher = Arc::new(MockFetcher::new());
900 for (i, h) in hashes.iter().enumerate() {
901 let key = block_key_for_hash(h);
902 if i % 2 == 0 {
903 fetcher.stock_err(&key, "synthetic backend error");
904 } else {
905 fetcher.stock_hit(&key, 100);
906 }
907 }
908
909 let cfg = PrefetchConfig {
910 interval: Duration::from_mins(1),
911 top_k: 6,
912 model_digest: model,
913 namespace: "ns-a".to_string(),
914 };
915 let stop = AtomicBool::new(false);
916 let outcome = run_cycle_v2(
917 idx.as_ref() as &dyn MetadataIndex,
918 fetcher.as_ref() as &dyn PrefetchFetcher,
919 &cfg,
920 &stop,
921 );
922
923 assert_eq!(outcome.scored, 6);
926 assert_eq!(outcome.selected, 6);
927 assert_eq!(outcome.fetched, 3);
928 assert_eq!(outcome.failed, 3);
929 assert_eq!(outcome.bytes_materialized, 3 * 100);
930 assert_eq!(fetcher.calls().len(), 6);
931 }
932
933 #[test]
934 fn v2_dry_run_does_not_fetch() {
935 let idx: Arc<dyn MetadataIndex> = Arc::new(InMemoryMetadataIndex::new());
945 let model = [7u8; 24];
946 let inner = InMemoryMetadataIndex::new();
950 seed_recent_blocks(&inner, 4, model);
951 let snapshot = inner.entries();
954 if let Some(_concrete) = idx.as_ref().entries().first() {
955 }
957 for (h, m) in snapshot {
962 idx.insert(h, m);
963 }
964
965 let fetcher = Arc::new(MockFetcher::new());
966 let plan_count = Arc::new(Mutex::new(0_usize));
967 let pc = plan_count.clone();
968 let emit: PrefetchEmit = Arc::new(move |_plan: &PrefetchPlan| {
969 *pc.lock().unwrap() += 1;
970 });
971
972 let cfg = PrefetchConfig {
973 interval: Duration::from_millis(40),
974 top_k: 4,
975 model_digest: model,
976 namespace: "ns-dry".to_string(),
977 };
978 let worker = spawn_worker(idx, cfg, emit);
979 std::thread::sleep(Duration::from_millis(150));
980 drop(worker);
981
982 assert!(*plan_count.lock().unwrap() >= 1);
984 assert!(
986 fetcher.calls().is_empty(),
987 "dry-run path must not call PrefetchFetcher: got {} calls",
988 fetcher.calls().len()
989 );
990 }
991
992 #[test]
993 fn dry_run_env_helper_reads_truthy_values() {
994 let saved = std::env::var("WMBT_KV_PREFETCH_DRY_RUN").ok();
999 std::env::remove_var("WMBT_KV_PREFETCH_DRY_RUN");
1000 assert!(!dry_run_enabled());
1001 std::env::set_var("WMBT_KV_PREFETCH_DRY_RUN", "1");
1002 assert!(dry_run_enabled());
1003 std::env::set_var("WMBT_KV_PREFETCH_DRY_RUN", "yes");
1004 assert!(dry_run_enabled());
1005 std::env::set_var("WMBT_KV_PREFETCH_DRY_RUN", "0");
1006 assert!(!dry_run_enabled());
1007 std::env::remove_var("WMBT_KV_PREFETCH_DRY_RUN");
1008 if let Some(v) = saved {
1009 std::env::set_var("WMBT_KV_PREFETCH_DRY_RUN", v);
1010 }
1011 }
1012
1013 #[test]
1014 fn v2_worker_runs_and_stops() {
1015 let idx_concrete = Arc::new(InMemoryMetadataIndex::new());
1018 let model = [3u8; 24];
1019 let hashes = seed_recent_blocks(&idx_concrete, 5, model);
1020 let idx: Arc<dyn MetadataIndex> = idx_concrete.clone();
1021
1022 let fetcher = Arc::new(MockFetcher::new());
1023 for h in &hashes {
1024 fetcher.stock_hit(&block_key_for_hash(h), 64);
1025 }
1026
1027 let cfg = PrefetchConfig {
1028 interval: Duration::from_millis(30),
1029 top_k: 5,
1030 model_digest: model,
1031 namespace: "ns-x".to_string(),
1032 };
1033
1034 let outcomes = Arc::new(Mutex::new(Vec::<PrefetchFetchOutcome>::new()));
1035 let outcomes_for_cb = outcomes.clone();
1036 let emit_outcome: Arc<dyn Fn(&PrefetchFetchOutcome) + Send + Sync> =
1037 Arc::new(move |o: &PrefetchFetchOutcome| {
1038 outcomes_for_cb.lock().unwrap().push(o.clone());
1039 });
1040
1041 let fetcher_dyn: Arc<dyn PrefetchFetcher> = fetcher.clone();
1042 let started = Instant::now();
1043 let worker = spawn_worker_v2(idx, cfg, fetcher_dyn, emit_outcome);
1044 std::thread::sleep(Duration::from_millis(140));
1045 drop(worker);
1046 assert!(started.elapsed() < Duration::from_secs(2));
1047
1048 let observed = outcomes.lock().unwrap();
1049 assert!(observed.len() >= 2);
1050 assert_eq!(observed[0].fetched, 5);
1053 if observed.len() >= 2 {
1054 assert_eq!(observed[1].fetched, 0);
1055 assert_eq!(observed[1].skipped_already_flat, 5);
1056 }
1057 }
1058}