Skip to main content

atomr_cluster_metrics/
lib.rs

1//! atomr-cluster-metrics.
2//!
3//! Phase 10 of `docs/full-port-plan.md`. Three layers:
4//!
5//! * [`ClusterMetrics`] — the per-node snapshot store (unchanged
6//!   from prior version).
7//! * [`MetricsProbe`] — pluggable trait that produces a
8//!   [`NodeMetrics`] sample per call. The default implementation
9//!   ([`StaticProbe`]) is for tests; production callers ship a probe
10//!   that reads `/proc/loadavg` or calls `sysinfo` themselves
11//!   (kept dep-free here so the metrics crate stays slim).
12//! * [`AdaptiveLoadBalancer`] — picks a node weighted by inverse
13//!   CPU load. Used by `RemoteRouterConfig` once the metrics gossip
14//!   wiring lands (Phase 10.B).
15
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use parking_lot::RwLock;
20use serde::{Deserialize, Serialize};
21
22#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
23pub struct NodeMetrics {
24    pub address: String,
25    pub timestamp: u64,
26    pub cpu_load: f64,
27    pub memory_used: u64,
28    pub memory_max: u64,
29}
30
31impl NodeMetrics {
32    /// Used memory as a fraction of max [0.0, 1.0]. Returns 0.0 if
33    /// `memory_max` is zero.
34    pub fn memory_usage(&self) -> f64 {
35        if self.memory_max == 0 {
36            0.0
37        } else {
38            self.memory_used as f64 / self.memory_max as f64
39        }
40    }
41}
42
43#[derive(Default)]
44pub struct ClusterMetrics {
45    entries: RwLock<HashMap<String, NodeMetrics>>,
46}
47
48impl ClusterMetrics {
49    pub fn new() -> Self {
50        Self::default()
51    }
52
53    pub fn publish(&self, m: NodeMetrics) {
54        self.entries.write().insert(m.address.clone(), m);
55    }
56
57    pub fn snapshot(&self) -> Vec<NodeMetrics> {
58        self.entries.read().values().cloned().collect()
59    }
60
61    pub fn get(&self, address: &str) -> Option<NodeMetrics> {
62        self.entries.read().get(address).cloned()
63    }
64
65    pub fn node_count(&self) -> usize {
66        self.entries.read().len()
67    }
68}
69
70// -- Probe -----------------------------------------------------------
71
72/// Sample local CPU/memory stats. Implementors decide how — `sysinfo`,
73/// `/proc/loadavg`, or a hand-rolled JNI-style call. Deliberately
74/// dep-free here.
75pub trait MetricsProbe: Send + Sync + 'static {
76    fn sample(&self, address: &str, timestamp: u64) -> NodeMetrics;
77}
78
79/// Static probe — useful for tests and as a baseline when no real
80/// probe is wired. Returns the supplied values.
81pub struct StaticProbe {
82    pub cpu_load: f64,
83    pub memory_used: u64,
84    pub memory_max: u64,
85}
86
87impl MetricsProbe for StaticProbe {
88    fn sample(&self, address: &str, timestamp: u64) -> NodeMetrics {
89        NodeMetrics {
90            address: address.into(),
91            timestamp,
92            cpu_load: self.cpu_load,
93            memory_used: self.memory_used,
94            memory_max: self.memory_max,
95        }
96    }
97}
98
99// -- Adaptive routing ------------------------------------------------
100
101/// Router that picks the node with the lowest `cpu_load` from a
102/// [`ClusterMetrics`] snapshot. Falls back to deterministic-by-address
103/// order when there are no metrics.
104pub struct AdaptiveLoadBalancer {
105    metrics: Arc<ClusterMetrics>,
106}
107
108impl AdaptiveLoadBalancer {
109    pub fn new(metrics: Arc<ClusterMetrics>) -> Self {
110        Self { metrics }
111    }
112
113    /// Pick a candidate from `candidates` weighted by inverse load.
114    /// Ties broken by address.
115    pub fn pick<'a>(&self, candidates: &'a [&'a str]) -> Option<&'a str> {
116        if candidates.is_empty() {
117            return None;
118        }
119        let snapshot = self.metrics.snapshot();
120        let lookup: HashMap<&str, &NodeMetrics> = snapshot.iter().map(|m| (m.address.as_str(), m)).collect();
121        let mut sorted: Vec<&&str> = candidates.iter().collect();
122        sorted.sort_by(|a, b| {
123            let load_a = lookup.get(*a).map(|m| m.cpu_load).unwrap_or(f64::INFINITY);
124            let load_b = lookup.get(*b).map(|m| m.cpu_load).unwrap_or(f64::INFINITY);
125            load_a.partial_cmp(&load_b).unwrap_or(std::cmp::Ordering::Equal).then_with(|| a.cmp(b))
126        });
127        sorted.first().copied().copied()
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134
135    #[test]
136    fn publish_and_fetch() {
137        let m = ClusterMetrics::new();
138        m.publish(NodeMetrics {
139            address: "a".into(),
140            timestamp: 1,
141            cpu_load: 0.5,
142            memory_used: 100,
143            memory_max: 1000,
144        });
145        assert_eq!(m.snapshot().len(), 1);
146        assert_eq!(m.get("a").unwrap().cpu_load, 0.5);
147    }
148
149    #[test]
150    fn memory_usage_ratio() {
151        let m = NodeMetrics {
152            address: "a".into(),
153            timestamp: 0,
154            cpu_load: 0.0,
155            memory_used: 250,
156            memory_max: 1000,
157        };
158        assert_eq!(m.memory_usage(), 0.25);
159    }
160
161    #[test]
162    fn memory_usage_handles_zero_max() {
163        let m =
164            NodeMetrics { address: "a".into(), timestamp: 0, cpu_load: 0.0, memory_used: 0, memory_max: 0 };
165        assert_eq!(m.memory_usage(), 0.0);
166    }
167
168    #[test]
169    fn static_probe_returns_configured_values() {
170        let probe = StaticProbe { cpu_load: 0.7, memory_used: 5, memory_max: 10 };
171        let m = probe.sample("nodeA", 42);
172        assert_eq!(m.address, "nodeA");
173        assert_eq!(m.timestamp, 42);
174        assert_eq!(m.cpu_load, 0.7);
175        assert_eq!(m.memory_used, 5);
176    }
177
178    #[test]
179    fn adaptive_picks_lowest_load() {
180        let m = Arc::new(ClusterMetrics::new());
181        m.publish(NodeMetrics {
182            address: "a".into(),
183            timestamp: 0,
184            cpu_load: 0.9,
185            memory_used: 0,
186            memory_max: 1,
187        });
188        m.publish(NodeMetrics {
189            address: "b".into(),
190            timestamp: 0,
191            cpu_load: 0.1,
192            memory_used: 0,
193            memory_max: 1,
194        });
195        m.publish(NodeMetrics {
196            address: "c".into(),
197            timestamp: 0,
198            cpu_load: 0.5,
199            memory_used: 0,
200            memory_max: 1,
201        });
202        let lb = AdaptiveLoadBalancer::new(m);
203        assert_eq!(lb.pick(&["a", "b", "c"]), Some("b"));
204    }
205
206    #[test]
207    fn adaptive_falls_back_to_address_order_when_no_metrics() {
208        let m = Arc::new(ClusterMetrics::new());
209        let lb = AdaptiveLoadBalancer::new(m);
210        assert_eq!(lb.pick(&["c", "a", "b"]), Some("a"));
211    }
212
213    #[test]
214    fn adaptive_returns_none_for_empty_candidates() {
215        let m = Arc::new(ClusterMetrics::new());
216        let lb = AdaptiveLoadBalancer::new(m);
217        assert_eq!(lb.pick(&[]), None);
218    }
219}
220
221// -- EWMA smoothing --------------------------------------------------
222
223/// Exponentially-weighted moving average.
224///
225/// `alpha` is the smoothing factor in `(0.0, 1.0]`; larger `alpha`
226/// follows the new sample more aggressively.
227#[derive(Debug, Clone, Copy)]
228pub struct Ewma {
229    pub alpha: f64,
230    pub value: f64,
231}
232
233impl Ewma {
234    /// Construct with an initial value and smoothing factor.
235    /// Panics if `alpha` is outside `(0.0, 1.0]`.
236    pub fn new(initial: f64, alpha: f64) -> Self {
237        assert!(alpha > 0.0 && alpha <= 1.0, "alpha must be in (0.0, 1.0]");
238        Self { alpha, value: initial }
239    }
240
241    /// Pick `alpha` from a half-life. After `half_life` samples the
242    /// previous value contributes 50% of the EWMA. Useful when the
243    /// sample interval is fixed.
244    pub fn from_half_life(initial: f64, half_life_samples: f64) -> Self {
245        assert!(half_life_samples > 0.0);
246        // alpha = 1 - 2^(-1/half_life)
247        let alpha = 1.0 - (2.0_f64).powf(-1.0 / half_life_samples);
248        Self::new(initial, alpha)
249    }
250
251    /// Fold a new sample into the EWMA and return the new smoothed value.
252    pub fn update(&mut self, sample: f64) -> f64 {
253        self.value = self.alpha * sample + (1.0 - self.alpha) * self.value;
254        self.value
255    }
256}
257
258// -- Metrics selectors ------------------------------------------------
259
260/// What dimension drives `WeightedRoutees`.
261/// `MetricsSelector` / `CpuMetricsSelector` / `HeapMetricsSelector` /
262/// `MixMetricsSelector`.
263#[derive(Debug, Clone, Copy, PartialEq)]
264pub enum MetricsSelector {
265    /// Higher weight for lower CPU load. Returns `1 - cpu_load`.
266    Cpu,
267    /// Higher weight for lower memory usage. Returns `1 - memory_usage`.
268    Heap,
269    /// Average of CPU and Heap selectors.
270    Mix,
271}
272
273impl MetricsSelector {
274    /// Compute a weight in `[0.0, 1.0]` for `m`. Larger == more
275    /// preferable as a routing target.
276    pub fn weight(&self, m: &NodeMetrics) -> f64 {
277        let cpu = (1.0 - m.cpu_load).clamp(0.0, 1.0);
278        let heap = (1.0 - m.memory_usage()).clamp(0.0, 1.0);
279        match self {
280            Self::Cpu => cpu,
281            Self::Heap => heap,
282            Self::Mix => 0.5 * (cpu + heap),
283        }
284    }
285}
286
287// -- Weighted routees -------------------------------------------------
288
289/// Pick a routee with probability proportional to its
290/// [`MetricsSelector::weight`].
291pub struct WeightedRoutees {
292    metrics: Arc<ClusterMetrics>,
293    selector: MetricsSelector,
294}
295
296impl WeightedRoutees {
297    pub fn new(metrics: Arc<ClusterMetrics>, selector: MetricsSelector) -> Self {
298        Self { metrics, selector }
299    }
300
301    /// Pick a routee using `seed` as the random draw in `[0.0, 1.0)`.
302    /// Splitting the RNG out of the call lets tests be deterministic.
303    /// The returned slice index corresponds to `candidates`.
304    pub fn pick<'a>(&self, candidates: &'a [&'a str], seed: f64) -> Option<&'a str> {
305        if candidates.is_empty() {
306            return None;
307        }
308        let snap = self.metrics.snapshot();
309        let by_addr: HashMap<&str, &NodeMetrics> = snap.iter().map(|m| (m.address.as_str(), m)).collect();
310        let weights: Vec<f64> = candidates
311            .iter()
312            .map(|c| by_addr.get(c).map(|m| self.selector.weight(m)).unwrap_or(0.5))
313            .collect();
314        let total: f64 = weights.iter().sum();
315        if total <= 0.0 {
316            return Some(candidates[0]);
317        }
318        let target = (seed.clamp(0.0, 1.0) * total).min(total);
319        let mut acc = 0.0;
320        for (i, w) in weights.iter().enumerate() {
321            acc += *w;
322            if target <= acc {
323                return Some(candidates[i]);
324            }
325        }
326        candidates.last().copied()
327    }
328}
329
330#[cfg(test)]
331mod ewma_tests {
332    use super::*;
333
334    #[test]
335    fn ewma_initial_value_unchanged_until_update() {
336        let e = Ewma::new(0.5, 0.3);
337        assert_eq!(e.value, 0.5);
338    }
339
340    #[test]
341    fn ewma_converges_to_steady_signal() {
342        let mut e = Ewma::new(0.0, 0.5);
343        for _ in 0..30 {
344            e.update(1.0);
345        }
346        assert!(e.value > 0.99, "expected ≈1.0, got {}", e.value);
347    }
348
349    #[test]
350    fn ewma_rejects_invalid_alpha() {
351        let r = std::panic::catch_unwind(|| Ewma::new(0.0, 0.0));
352        assert!(r.is_err());
353    }
354
355    #[test]
356    fn ewma_from_half_life_yields_50pct_weight_after_half_life() {
357        let mut e = Ewma::from_half_life(0.0, 4.0);
358        // after 4 samples of `1.0`, value ≥ 0.5
359        for _ in 0..4 {
360            e.update(1.0);
361        }
362        assert!(e.value >= 0.5);
363    }
364
365    #[test]
366    fn cpu_selector_prefers_lower_load() {
367        let m =
368            NodeMetrics { address: "a".into(), timestamp: 0, cpu_load: 0.2, memory_used: 0, memory_max: 1 };
369        let n =
370            NodeMetrics { address: "b".into(), timestamp: 0, cpu_load: 0.9, memory_used: 0, memory_max: 1 };
371        assert!(MetricsSelector::Cpu.weight(&m) > MetricsSelector::Cpu.weight(&n));
372    }
373
374    #[test]
375    fn mix_selector_averages_cpu_and_heap() {
376        let m = NodeMetrics {
377            address: "a".into(),
378            timestamp: 0,
379            cpu_load: 0.0,
380            memory_used: 100,
381            memory_max: 200,
382        };
383        let w = MetricsSelector::Mix.weight(&m);
384        // cpu weight 1.0, heap weight 0.5 -> mix 0.75
385        assert!((w - 0.75).abs() < 1e-6, "mix weight {w}");
386    }
387
388    #[test]
389    fn weighted_routees_picks_higher_weight_node_more_often() {
390        let m = Arc::new(ClusterMetrics::new());
391        m.publish(NodeMetrics {
392            address: "fast".into(),
393            timestamp: 0,
394            cpu_load: 0.1,
395            memory_used: 0,
396            memory_max: 1,
397        });
398        m.publish(NodeMetrics {
399            address: "slow".into(),
400            timestamp: 0,
401            cpu_load: 0.9,
402            memory_used: 0,
403            memory_max: 1,
404        });
405        let r = WeightedRoutees::new(m, MetricsSelector::Cpu);
406        let cands = ["fast", "slow"];
407        let mut fast = 0;
408        // 100 deterministic seeds across [0.0, 1.0)
409        for i in 0..100 {
410            let seed = i as f64 / 100.0;
411            if r.pick(&cands, seed) == Some("fast") {
412                fast += 1;
413            }
414        }
415        assert!(fast > 60, "expected >60 fast picks, got {fast}");
416    }
417
418    #[test]
419    fn weighted_routees_returns_first_when_all_zero() {
420        let m = Arc::new(ClusterMetrics::new());
421        m.publish(NodeMetrics {
422            address: "a".into(),
423            timestamp: 0,
424            cpu_load: 1.0,
425            memory_used: 1,
426            memory_max: 1,
427        });
428        m.publish(NodeMetrics {
429            address: "b".into(),
430            timestamp: 0,
431            cpu_load: 1.0,
432            memory_used: 1,
433            memory_max: 1,
434        });
435        let r = WeightedRoutees::new(m, MetricsSelector::Mix);
436        assert_eq!(r.pick(&["a", "b"], 0.5), Some("a"));
437    }
438}
439
440// -- Phase 10.B: optional sysinfo-backed probe -----------------------
441
442#[cfg(feature = "sysinfo-probe")]
443pub mod sys {
444    //! `sysinfo`-backed [`super::MetricsProbe`]. Enabled with the
445    //! `sysinfo-probe` feature.
446    use super::{MetricsProbe, NodeMetrics};
447    use std::sync::Mutex;
448    use sysinfo::System;
449
450    pub struct SysinfoProbe {
451        sys: Mutex<System>,
452    }
453
454    impl Default for SysinfoProbe {
455        fn default() -> Self {
456            Self::new()
457        }
458    }
459
460    impl SysinfoProbe {
461        pub fn new() -> Self {
462            Self { sys: Mutex::new(System::new_all()) }
463        }
464    }
465
466    impl MetricsProbe for SysinfoProbe {
467        fn sample(&self, address: &str, timestamp: u64) -> NodeMetrics {
468            let mut sys = self.sys.lock().unwrap();
469            sys.refresh_cpu_usage();
470            sys.refresh_memory();
471            // global_cpu_usage() is in [0..100]; normalize to [0..1].
472            let cpu_load = (sys.global_cpu_usage() as f64 / 100.0).clamp(0.0, 1.0);
473            let memory_max = sys.total_memory();
474            let memory_used = sys.used_memory();
475            NodeMetrics { address: address.into(), timestamp, cpu_load, memory_used, memory_max }
476        }
477    }
478
479    #[cfg(test)]
480    mod tests {
481        use super::*;
482
483        #[test]
484        fn sysinfo_probe_returns_finite_load() {
485            let p = SysinfoProbe::new();
486            let m = p.sample("a", 1);
487            assert!(m.cpu_load.is_finite());
488            assert!(m.memory_max >= m.memory_used);
489        }
490    }
491}
492
493// -- Phase 10.C: metrics gossip --------------------------------------
494
495/// Wire shape for cross-node metric exchange.
496#[derive(Debug, Clone, Serialize, Deserialize)]
497#[non_exhaustive]
498pub enum MetricsPdu {
499    /// Push the sender's latest sample.
500    Push(NodeMetrics),
501    /// Push a batch of samples (e.g. for catch-up sync).
502    PushBatch(Vec<NodeMetrics>),
503}
504
505/// Pluggable transport for metrics gossip. Mirrors
506/// [`atomr_cluster::GossipTransport`] in spirit but works on raw addresses.
507pub trait MetricsTransport: Send + Sync + 'static {
508    fn send(&self, target_node: &str, pdu: MetricsPdu);
509}
510
511/// Apply an inbound `MetricsPdu` into a [`ClusterMetrics`].
512pub fn apply_metrics_pdu(metrics: &ClusterMetrics, pdu: MetricsPdu) {
513    match pdu {
514        MetricsPdu::Push(m) => metrics.publish(m),
515        MetricsPdu::PushBatch(v) => {
516            for m in v {
517                metrics.publish(m);
518            }
519        }
520    }
521}
522
523/// Push the local probe sample to a peer. Caller drives this on a tick.
524pub fn gossip_local_metrics<P: MetricsProbe + ?Sized>(
525    probe: &P,
526    self_address: &str,
527    target_node: &str,
528    transport: &dyn MetricsTransport,
529    now: u64,
530) {
531    let m = probe.sample(self_address, now);
532    transport.send(target_node, MetricsPdu::Push(m));
533}
534
535#[cfg(test)]
536mod gossip_tests {
537    use super::*;
538    use std::sync::Mutex;
539
540    #[derive(Default)]
541    struct CaptureTransport {
542        sent: Mutex<Vec<(String, MetricsPdu)>>,
543    }
544    impl MetricsTransport for CaptureTransport {
545        fn send(&self, target: &str, pdu: MetricsPdu) {
546            self.sent.lock().unwrap().push((target.to_string(), pdu));
547        }
548    }
549
550    #[test]
551    fn gossip_pushes_local_sample_to_target() {
552        let probe = StaticProbe { cpu_load: 0.3, memory_used: 1, memory_max: 4 };
553        let net = CaptureTransport::default();
554        gossip_local_metrics(&probe, "self", "peer", &net, 1);
555        let sent = net.sent.lock().unwrap();
556        assert_eq!(sent.len(), 1);
557        match &sent[0].1 {
558            MetricsPdu::Push(m) => assert_eq!(m.address, "self"),
559            _ => panic!("expected Push"),
560        }
561    }
562
563    #[test]
564    fn apply_pdu_merges_into_metrics() {
565        let m = ClusterMetrics::new();
566        let pdu = MetricsPdu::Push(NodeMetrics {
567            address: "x".into(),
568            timestamp: 7,
569            cpu_load: 0.1,
570            memory_used: 1,
571            memory_max: 2,
572        });
573        apply_metrics_pdu(&m, pdu);
574        assert_eq!(m.node_count(), 1);
575        assert_eq!(m.get("x").unwrap().timestamp, 7);
576    }
577
578    #[test]
579    fn adaptive_balancer_can_be_used_as_picker_closure() {
580        let m = Arc::new(ClusterMetrics::new());
581        m.publish(NodeMetrics {
582            address: "akka.tcp://Sys@a:1".into(),
583            timestamp: 0,
584            cpu_load: 0.9,
585            memory_used: 0,
586            memory_max: 1,
587        });
588        m.publish(NodeMetrics {
589            address: "akka.tcp://Sys@b:1".into(),
590            timestamp: 0,
591            cpu_load: 0.1,
592            memory_used: 0,
593            memory_max: 1,
594        });
595        let lb = Arc::new(AdaptiveLoadBalancer::new(m));
596        type Picker = Arc<dyn Fn(&[String]) -> Option<String> + Send + Sync>;
597        let picker: Picker = {
598            let lb = lb.clone();
599            Arc::new(move |cands| {
600                let refs: Vec<&str> = cands.iter().map(String::as_str).collect();
601                lb.pick(&refs).map(|s| s.to_string())
602            })
603        };
604        let chosen = picker(&["akka.tcp://Sys@a:1".to_string(), "akka.tcp://Sys@b:1".to_string()]).unwrap();
605        assert_eq!(chosen, "akka.tcp://Sys@b:1");
606    }
607
608    #[test]
609    fn batch_pdu_merges_each() {
610        let m = ClusterMetrics::new();
611        let pdu = MetricsPdu::PushBatch(vec![
612            NodeMetrics { address: "a".into(), timestamp: 1, cpu_load: 0.0, memory_used: 0, memory_max: 0 },
613            NodeMetrics { address: "b".into(), timestamp: 2, cpu_load: 0.0, memory_used: 0, memory_max: 0 },
614        ]);
615        apply_metrics_pdu(&m, pdu);
616        assert_eq!(m.node_count(), 2);
617    }
618}