Skip to main content

wafrift_evolution/
edge_pop_coverage.rs

1//! Cross-region Cloudflare edge-POP coverage map.
2//!
3//! Closes #170. Cloudflare runs an anycast network with 300+ edge
4//! POPs (IATA-coded data centers — `SJC`, `LHR`, `NRT`, `FRA`, etc).
5//! A payload that's *blocked* through one POP can still be *bypassed*
6//! through another if that POP runs a different OpenResty build,
7//! older ruleset compiler, or different geo-specific managed rules.
8//!
9//! `parse_cf_block` in the oracle crate already extracts the
10//! `edge_pop` (IATA suffix of `cf-ray`) from every response. This
11//! module accumulates the (egress_label, target_host) → set-of-pops
12//! mapping so the hunt loop can bias rotation toward
13//! egress-IPs / proxy-routes that have NOT yet been observed hitting
14//! a given POP.
15//!
16//! ## Coverage policy
17//!
18//! - A `(egress, target)` pair that has hit `≥k` distinct POPs is
19//!   considered *exhausted* for cross-region purposes — further
20//!   probes through that egress are unlikely to land in a new POP
21//!   any time soon. (`k` defaults to 8; CF anycast usually pins a
22//!   client IP to a small set of nearby POPs.)
23//! - When *all* egress entries are exhausted, the hunt loop should
24//!   either rotate to a fresh egress pool (different proxy provider
25//!   / different VPN exit) or accept that the current set has
26//!   plateaued.
27//! - POPs are stored as upper-case 3-letter IATA codes for stable
28//!   set semantics (e.g. `SJC`, not `sjc` or `Sjc`).
29//!
30//! ## Why this matters for #170
31//!
32//! Without POP awareness, a hunt loop that rotates egress IPs blindly
33//! often re-hits the same POP many times before stumbling onto a new
34//! one. With POP awareness, the loop can:
35//!
36//! 1. **Detect plateau early** — if the same egress has hit only one
37//!    POP after 50 probes, anycast has pinned it; abandon faster.
38//! 2. **Prioritize gap-filling** — pick egress entries whose seen-POP
39//!    set is smallest, since those have the most room to discover
40//!    new POPs.
41//! 3. **Report coverage** — after a hunt round, surface "we touched
42//!    47 distinct CF edge POPs" so the operator knows the search
43//!    actually fanned out.
44//!
45//! All persistent — same atomic save/load contract as
46//! [`crate::rule_corpus`].
47
48use std::collections::{BTreeMap, BTreeSet};
49use std::path::Path;
50
51/// Bounded number of POPs we track per (egress, target). A pair that
52/// has hit this many distinct POPs is considered *exhausted* — the
53/// anycast network is unlikely to surface new POPs without major IP
54/// rotation. Conservative default; raise via [`EdgePopCoverage::set_exhaustion_threshold`].
55pub const DEFAULT_EXHAUSTION_THRESHOLD: usize = 8;
56
57/// Schema version. Bumped if the on-disk shape changes. Backwards
58/// compatibility is preserved via `load_or_default`.
59pub const SCHEMA_VERSION: u32 = 1;
60
61/// Per (egress_label, target_host) record of which POPs have been
62/// observed. POPs are stored as upper-case 3-letter IATA strings so
63/// set equality is byte-exact.
64#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
65pub struct EgressTargetPops {
66    /// IATA POPs observed from this (egress, target). Sorted via
67    /// `BTreeSet` so serialization is deterministic for bench
68    /// reproducibility.
69    pub pops: BTreeSet<String>,
70    /// Total probes observed (regardless of POP). Useful for
71    /// computing pop-discovery efficiency.
72    pub total_probes: u64,
73}
74
75/// Coverage map keyed by `(egress_label, target_host)`.
76///
77/// The composite key is encoded as `egress_label \u{1F} target_host`
78/// (ASCII unit separator). `BTreeMap` over the joined key gives stable
79/// iteration order for deterministic save/load. Operators read the
80/// map via `pops_for` / `uncovered_pops` without seeing the
81/// internal key encoding.
82#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
83pub struct EdgePopCoverage {
84    /// Schema version of the on-disk format.
85    pub schema_version: u32,
86    /// (egress_label, target_host) → observed POPs.
87    pub entries: BTreeMap<String, EgressTargetPops>,
88    /// Anycast plateau threshold for [`is_exhausted`].
89    exhaustion_threshold: usize,
90}
91
92impl Default for EdgePopCoverage {
93    fn default() -> Self {
94        Self {
95            schema_version: SCHEMA_VERSION,
96            entries: BTreeMap::new(),
97            exhaustion_threshold: DEFAULT_EXHAUSTION_THRESHOLD,
98        }
99    }
100}
101
102const KEY_SEP: char = '\u{1F}';
103
104fn make_key(egress: &str, target: &str) -> String {
105    format!("{egress}{KEY_SEP}{target}")
106}
107
108fn split_key(key: &str) -> Option<(&str, &str)> {
109    key.split_once(KEY_SEP)
110}
111
112/// Validate an IATA-style POP code: exactly 3 ASCII letters. Returns
113/// the upper-cased canonical form, or `None` if not a valid IATA
114/// suffix. We accept any 3-letter ASCII because CF expands its POP
115/// list often; whitelisting known POPs would rot.
116#[must_use]
117pub fn normalize_pop(raw: &str) -> Option<String> {
118    let trimmed = raw.trim();
119    if trimmed.len() != 3 {
120        return None;
121    }
122    if !trimmed.chars().all(|c| c.is_ascii_alphabetic()) {
123        return None;
124    }
125    Some(trimmed.to_ascii_uppercase())
126}
127
128impl EdgePopCoverage {
129    /// Construct an empty map at the current schema version.
130    #[must_use]
131    pub fn new() -> Self {
132        Self::default()
133    }
134
135    /// Override the exhaustion threshold. Useful for tests, or for
136    /// operators on extremely well-distributed proxy pools.
137    pub fn set_exhaustion_threshold(&mut self, n: usize) {
138        self.exhaustion_threshold = n.max(1);
139    }
140
141    /// Current exhaustion threshold.
142    #[must_use]
143    pub fn exhaustion_threshold(&self) -> usize {
144        self.exhaustion_threshold
145    }
146
147    /// Record that we observed `pop` (must be a valid IATA-style
148    /// string — pass the raw `signal.edge_pop` from `parse_cf_block`).
149    /// Returns `true` if the POP was newly observed for this
150    /// `(egress, target)`, `false` if already known.
151    ///
152    /// Invalid POP strings (wrong length, non-letter) increment
153    /// `total_probes` but don't add to the set — they're noise from
154    /// non-CF responses (origin direct, captive portals, etc).
155    pub fn record(&mut self, egress: &str, target: &str, pop_raw: &str) -> bool {
156        let key = make_key(egress, target);
157        let entry = self.entries.entry(key).or_default();
158        entry.total_probes += 1;
159        match normalize_pop(pop_raw) {
160            Some(canon) => entry.pops.insert(canon),
161            None => false,
162        }
163    }
164
165    /// Record a probe with NO POP observed (e.g. timeout, non-CF
166    /// edge, raw TCP error). Updates only the probe counter.
167    pub fn record_no_pop(&mut self, egress: &str, target: &str) {
168        let key = make_key(egress, target);
169        self.entries.entry(key).or_default().total_probes += 1;
170    }
171
172    /// Look up observed POPs for this pair. Returns an empty set if
173    /// the pair has never been probed.
174    #[must_use]
175    pub fn pops_for(&self, egress: &str, target: &str) -> BTreeSet<String> {
176        self.entries
177            .get(&make_key(egress, target))
178            .map(|e| e.pops.clone())
179            .unwrap_or_default()
180    }
181
182    /// Total probes recorded for this pair. Zero if never probed.
183    #[must_use]
184    pub fn probes_for(&self, egress: &str, target: &str) -> u64 {
185        self.entries
186            .get(&make_key(egress, target))
187            .map(|e| e.total_probes)
188            .unwrap_or(0)
189    }
190
191    /// True if `(egress, target)` has hit at least
192    /// `exhaustion_threshold` distinct POPs and is unlikely to
193    /// surface more without major IP rotation.
194    #[must_use]
195    pub fn is_exhausted(&self, egress: &str, target: &str) -> bool {
196        self.pops_for(egress, target).len() >= self.exhaustion_threshold
197    }
198
199    /// All POPs seen across every egress for this target. Used to
200    /// answer "what's the union of CF POPs our hunt has touched for
201    /// `target.example`".
202    #[must_use]
203    pub fn pops_covered_for_target(&self, target: &str) -> BTreeSet<String> {
204        let mut out = BTreeSet::new();
205        for (key, entry) in &self.entries {
206            if let Some((_, t)) = split_key(key)
207                && t == target
208            {
209                out.extend(entry.pops.iter().cloned());
210            }
211        }
212        out
213    }
214
215    /// All POPs seen across every (egress, target). Useful for the
216    /// global "we touched N distinct POPs this hunt round" headline.
217    #[must_use]
218    pub fn pops_covered_global(&self) -> BTreeSet<String> {
219        let mut out = BTreeSet::new();
220        for entry in self.entries.values() {
221            out.extend(entry.pops.iter().cloned());
222        }
223        out
224    }
225
226    /// All egress labels we have data for. Stable order (BTreeMap
227    /// iteration).
228    #[must_use]
229    pub fn egress_labels(&self) -> BTreeSet<String> {
230        let mut out = BTreeSet::new();
231        for key in self.entries.keys() {
232            if let Some((e, _)) = split_key(key) {
233                out.insert(e.to_string());
234            }
235        }
236        out
237    }
238
239    /// Egress entries that are NOT yet exhausted for `target` —
240    /// these are the candidates the hunt loop should prioritize for
241    /// new probes, sorted ascending by current POP count so we
242    /// favor entries with the most room to grow.
243    #[must_use]
244    pub fn rank_egresses_for_discovery(&self, target: &str) -> Vec<(String, usize)> {
245        let mut all: BTreeMap<String, usize> = BTreeMap::new();
246        for (key, entry) in &self.entries {
247            if let Some((e, t)) = split_key(key)
248                && t == target
249            {
250                all.insert(e.to_string(), entry.pops.len());
251            }
252        }
253        let mut ranked: Vec<(String, usize)> = all
254            .into_iter()
255            .filter(|(_, n)| *n < self.exhaustion_threshold)
256            .collect();
257        ranked.sort_by(|a, b| a.1.cmp(&b.1).then_with(|| a.0.cmp(&b.0)));
258        ranked
259    }
260
261    /// Persist atomically via tempfile + rename. The on-disk format
262    /// is JSON for human-readable diffs across hunt sessions.
263    ///
264    /// R55 pass-19 I4 (CLAUDE.md §7 DEDUP): delegates to
265    /// `wafrift_types::loaders::write_atomic` so the atomic-write
266    /// recipe lives in one place. Pre-fix: 3 evolution modules each
267    /// rolled their own.
268    pub fn save_atomic(&self, path: &Path) -> std::io::Result<()> {
269        let bytes = serde_json::to_vec_pretty(self).map_err(std::io::Error::other)?;
270        wafrift_types::loaders::write_atomic(path, &bytes)
271    }
272
273    /// Load from disk; on missing file or corrupt JSON return
274    /// `default()`. Same forgiveness contract as
275    /// `rule_corpus::load_or_default` — operator data is precious
276    /// but a corrupt coverage map shouldn't crash a hunt round.
277    #[must_use]
278    pub fn load_or_default(path: &Path) -> Self {
279        // Coverage maps grow with each pop+technique combination;
280        // 64 MiB is a soft ceiling above any realistic deployment.
281        const EDGE_POP_COVERAGE_MAX_BYTES: usize = 64 * 1024 * 1024;
282        let bytes = match crate::safe_io::read_capped_bytes(path, EDGE_POP_COVERAGE_MAX_BYTES) {
283            Ok(b) => b,
284            Err(_) => return Self::default(),
285        };
286        serde_json::from_slice(&bytes).unwrap_or_default()
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293    use std::collections::HashSet;
294
295    #[test]
296    fn normalize_pop_accepts_3_letter_iata() {
297        assert_eq!(normalize_pop("SJC"), Some("SJC".to_string()));
298        assert_eq!(normalize_pop("sjc"), Some("SJC".to_string()));
299        assert_eq!(normalize_pop("Lhr"), Some("LHR".to_string()));
300        assert_eq!(normalize_pop("  AMS  "), Some("AMS".to_string()));
301    }
302
303    #[test]
304    fn normalize_pop_rejects_garbage() {
305        assert_eq!(normalize_pop(""), None);
306        assert_eq!(normalize_pop("AB"), None);
307        assert_eq!(normalize_pop("ABCD"), None);
308        assert_eq!(normalize_pop("12A"), None);
309        assert_eq!(normalize_pop("A1A"), None);
310        assert_eq!(normalize_pop("---"), None);
311        // Unicode 3-char string that is not all ASCII alphabetic.
312        assert_eq!(normalize_pop("a\u{0301}b"), None);
313    }
314
315    #[test]
316    fn record_first_pop_returns_true() {
317        let mut c = EdgePopCoverage::new();
318        assert!(c.record("egress-a", "target.example", "SJC"));
319    }
320
321    #[test]
322    fn record_duplicate_pop_returns_false() {
323        let mut c = EdgePopCoverage::new();
324        c.record("egress-a", "target.example", "SJC");
325        assert!(!c.record("egress-a", "target.example", "SJC"));
326        // Lower-case duplicate is also detected after normalization.
327        assert!(!c.record("egress-a", "target.example", "sjc"));
328    }
329
330    #[test]
331    fn record_invalid_pop_still_counts_probe() {
332        let mut c = EdgePopCoverage::new();
333        let inserted = c.record("egress-a", "target.example", "NOT-A-POP");
334        assert!(!inserted);
335        assert_eq!(c.probes_for("egress-a", "target.example"), 1);
336        assert!(c.pops_for("egress-a", "target.example").is_empty());
337    }
338
339    #[test]
340    fn record_no_pop_increments_counter_only() {
341        let mut c = EdgePopCoverage::new();
342        c.record_no_pop("egress-a", "target.example");
343        c.record_no_pop("egress-a", "target.example");
344        assert_eq!(c.probes_for("egress-a", "target.example"), 2);
345        assert!(c.pops_for("egress-a", "target.example").is_empty());
346    }
347
348    #[test]
349    fn pops_per_pair_isolated() {
350        let mut c = EdgePopCoverage::new();
351        c.record("egress-a", "target.example", "SJC");
352        c.record("egress-b", "target.example", "LHR");
353        c.record("egress-a", "other.example", "NRT");
354        assert_eq!(
355            c.pops_for("egress-a", "target.example"),
356            ["SJC".to_string()].into_iter().collect()
357        );
358        assert_eq!(
359            c.pops_for("egress-b", "target.example"),
360            ["LHR".to_string()].into_iter().collect()
361        );
362        assert_eq!(
363            c.pops_for("egress-a", "other.example"),
364            ["NRT".to_string()].into_iter().collect()
365        );
366    }
367
368    #[test]
369    fn pops_covered_for_target_unions_across_egresses() {
370        let mut c = EdgePopCoverage::new();
371        c.record("egress-a", "target.example", "SJC");
372        c.record("egress-b", "target.example", "LHR");
373        c.record("egress-c", "target.example", "AMS");
374        // Mixed in a different target — must not contaminate.
375        c.record("egress-d", "other.example", "ORD");
376
377        let pops = c.pops_covered_for_target("target.example");
378        assert_eq!(pops.len(), 3);
379        assert!(pops.contains("SJC"));
380        assert!(pops.contains("LHR"));
381        assert!(pops.contains("AMS"));
382        assert!(!pops.contains("ORD"));
383    }
384
385    #[test]
386    fn pops_covered_global_unions_everything() {
387        let mut c = EdgePopCoverage::new();
388        c.record("egress-a", "target.example", "SJC");
389        c.record("egress-b", "other.example", "LHR");
390        let global = c.pops_covered_global();
391        assert_eq!(global.len(), 2);
392        assert!(global.contains("SJC"));
393        assert!(global.contains("LHR"));
394    }
395
396    #[test]
397    fn is_exhausted_only_after_threshold() {
398        let mut c = EdgePopCoverage::new();
399        c.set_exhaustion_threshold(3);
400        c.record("egress-a", "target.example", "SJC");
401        c.record("egress-a", "target.example", "LHR");
402        assert!(!c.is_exhausted("egress-a", "target.example"));
403        c.record("egress-a", "target.example", "NRT");
404        assert!(c.is_exhausted("egress-a", "target.example"));
405    }
406
407    #[test]
408    fn is_exhausted_unprobed_pair_is_false() {
409        let c = EdgePopCoverage::new();
410        assert!(!c.is_exhausted("egress-a", "target.example"));
411    }
412
413    #[test]
414    fn rank_egresses_excludes_exhausted_and_orders_by_pop_count() {
415        let mut c = EdgePopCoverage::new();
416        c.set_exhaustion_threshold(3);
417
418        // egress-a: 1 POP (most room to grow)
419        c.record("egress-a", "target.example", "SJC");
420        // egress-b: 2 POPs
421        c.record("egress-b", "target.example", "SJC");
422        c.record("egress-b", "target.example", "LHR");
423        // egress-c: 3 POPs (exhausted)
424        c.record("egress-c", "target.example", "SJC");
425        c.record("egress-c", "target.example", "LHR");
426        c.record("egress-c", "target.example", "AMS");
427
428        let ranked = c.rank_egresses_for_discovery("target.example");
429        // egress-c is excluded.
430        assert_eq!(ranked.len(), 2);
431        // egress-a (1 POP) before egress-b (2 POPs).
432        assert_eq!(ranked[0], ("egress-a".to_string(), 1));
433        assert_eq!(ranked[1], ("egress-b".to_string(), 2));
434    }
435
436    #[test]
437    fn rank_egresses_ignores_other_targets() {
438        let mut c = EdgePopCoverage::new();
439        c.set_exhaustion_threshold(3);
440        c.record("egress-a", "target.example", "SJC");
441        c.record("egress-b", "other.example", "LHR");
442        let ranked = c.rank_egresses_for_discovery("target.example");
443        assert_eq!(ranked.len(), 1);
444        assert_eq!(ranked[0].0, "egress-a");
445    }
446
447    #[test]
448    fn rank_egresses_breaks_ties_alphabetically() {
449        let mut c = EdgePopCoverage::new();
450        c.set_exhaustion_threshold(5);
451        c.record("egress-z", "target.example", "SJC");
452        c.record("egress-a", "target.example", "LHR");
453        c.record("egress-m", "target.example", "AMS");
454        let ranked = c.rank_egresses_for_discovery("target.example");
455        // All three have 1 POP; alphabetical order wins.
456        assert_eq!(ranked.len(), 3);
457        assert_eq!(ranked[0].0, "egress-a");
458        assert_eq!(ranked[1].0, "egress-m");
459        assert_eq!(ranked[2].0, "egress-z");
460    }
461
462    #[test]
463    fn egress_labels_returns_unique_set() {
464        let mut c = EdgePopCoverage::new();
465        c.record("egress-a", "target.example", "SJC");
466        c.record("egress-a", "other.example", "LHR");
467        c.record("egress-b", "target.example", "NRT");
468        let labels = c.egress_labels();
469        let want: HashSet<String> = ["egress-a".to_string(), "egress-b".to_string()]
470            .into_iter()
471            .collect();
472        let got: HashSet<String> = labels.into_iter().collect();
473        assert_eq!(got, want);
474    }
475
476    #[test]
477    fn save_load_roundtrip_atomic() {
478        let mut c = EdgePopCoverage::new();
479        c.set_exhaustion_threshold(5);
480        c.record("egress-a", "target.example", "SJC");
481        c.record("egress-a", "target.example", "LHR");
482        c.record("egress-b", "target.example", "NRT");
483        c.record_no_pop("egress-c", "target.example");
484
485        let tmp = std::env::temp_dir().join(format!("wafrift_pop_cov_{}.json", std::process::id()));
486        c.save_atomic(&tmp).unwrap();
487        let loaded = EdgePopCoverage::load_or_default(&tmp);
488        assert_eq!(loaded.schema_version, c.schema_version);
489        assert_eq!(loaded.entries.len(), c.entries.len());
490        assert_eq!(
491            loaded.pops_for("egress-a", "target.example"),
492            c.pops_for("egress-a", "target.example")
493        );
494        assert_eq!(
495            loaded.probes_for("egress-c", "target.example"),
496            c.probes_for("egress-c", "target.example")
497        );
498        // exhaustion_threshold is private; verify behaviorally via
499        // is_exhausted (5-threshold means 2 POPs is not enough).
500        assert!(!loaded.is_exhausted("egress-a", "target.example"));
501        std::fs::remove_file(&tmp).ok();
502    }
503
504    #[test]
505    fn load_missing_file_returns_default() {
506        let nope = std::env::temp_dir().join("wafrift_pop_cov_nonexistent.json");
507        let _ = std::fs::remove_file(&nope);
508        let loaded = EdgePopCoverage::load_or_default(&nope);
509        assert_eq!(loaded.schema_version, SCHEMA_VERSION);
510        assert!(loaded.entries.is_empty());
511    }
512
513    #[test]
514    fn load_corrupt_file_returns_default() {
515        let tmp = std::env::temp_dir().join(format!(
516            "wafrift_pop_cov_corrupt_{}.json",
517            std::process::id()
518        ));
519        std::fs::write(&tmp, b"this is not json {{{ ").unwrap();
520        let loaded = EdgePopCoverage::load_or_default(&tmp);
521        assert!(loaded.entries.is_empty());
522        std::fs::remove_file(&tmp).ok();
523    }
524
525    #[test]
526    fn save_creates_parent_directory() {
527        let dir = std::env::temp_dir().join(format!("wafrift_pop_cov_dir_{}", std::process::id()));
528        let _ = std::fs::remove_dir_all(&dir);
529        let nested = dir.join("nested").join("coverage.json");
530        let mut c = EdgePopCoverage::new();
531        c.record("egress-a", "target.example", "SJC");
532        c.save_atomic(&nested).unwrap();
533        assert!(nested.exists());
534        std::fs::remove_dir_all(&dir).ok();
535    }
536
537    #[test]
538    fn set_exhaustion_threshold_min_clamped_to_one() {
539        let mut c = EdgePopCoverage::new();
540        c.set_exhaustion_threshold(0);
541        assert_eq!(c.exhaustion_threshold(), 1);
542        // With threshold 1, a single POP is enough.
543        c.record("egress-a", "target.example", "SJC");
544        assert!(c.is_exhausted("egress-a", "target.example"));
545    }
546
547    #[test]
548    fn key_separator_not_observable_in_public_api() {
549        // Even if an operator passes the unit-separator char in
550        // egress / target strings, the lookups must remain consistent.
551        let mut c = EdgePopCoverage::new();
552        let weird_egress = "egress\u{1F}with-sep";
553        c.record(weird_egress, "target.example", "SJC");
554        let pops = c.pops_for(weird_egress, "target.example");
555        // Either it's recorded under a synthetic key or rejected, but
556        // must not crash and must remain self-consistent across save.
557        assert!(pops.len() <= 1);
558    }
559
560    #[test]
561    fn record_increments_total_probes_per_pair() {
562        let mut c = EdgePopCoverage::new();
563        c.record("egress-a", "target.example", "SJC");
564        c.record("egress-a", "target.example", "LHR");
565        c.record("egress-a", "target.example", "NRT");
566        assert_eq!(c.probes_for("egress-a", "target.example"), 3);
567        // Different pair untouched.
568        assert_eq!(c.probes_for("egress-b", "target.example"), 0);
569    }
570
571    #[test]
572    fn empty_global_coverage_is_empty_set() {
573        let c = EdgePopCoverage::new();
574        assert!(c.pops_covered_global().is_empty());
575        assert!(c.pops_covered_for_target("any.example").is_empty());
576        assert!(c.egress_labels().is_empty());
577    }
578}