1use std::collections::HashMap;
17use std::sync::Arc;
18
19use parking_lot::RwLock;
20use serde::{Deserialize, Serialize};
21
22#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
23pub struct NodeMetrics {
24 pub address: String,
25 pub timestamp: u64,
26 pub cpu_load: f64,
27 pub memory_used: u64,
28 pub memory_max: u64,
29}
30
31impl NodeMetrics {
32 pub fn memory_usage(&self) -> f64 {
35 if self.memory_max == 0 {
36 0.0
37 } else {
38 self.memory_used as f64 / self.memory_max as f64
39 }
40 }
41}
42
43#[derive(Default)]
44pub struct ClusterMetrics {
45 entries: RwLock<HashMap<String, NodeMetrics>>,
46}
47
48impl ClusterMetrics {
49 pub fn new() -> Self {
50 Self::default()
51 }
52
53 pub fn publish(&self, m: NodeMetrics) {
54 self.entries.write().insert(m.address.clone(), m);
55 }
56
57 pub fn snapshot(&self) -> Vec<NodeMetrics> {
58 self.entries.read().values().cloned().collect()
59 }
60
61 pub fn get(&self, address: &str) -> Option<NodeMetrics> {
62 self.entries.read().get(address).cloned()
63 }
64
65 pub fn node_count(&self) -> usize {
66 self.entries.read().len()
67 }
68}
69
70pub trait MetricsProbe: Send + Sync + 'static {
76 fn sample(&self, address: &str, timestamp: u64) -> NodeMetrics;
77}
78
79pub struct StaticProbe {
82 pub cpu_load: f64,
83 pub memory_used: u64,
84 pub memory_max: u64,
85}
86
87impl MetricsProbe for StaticProbe {
88 fn sample(&self, address: &str, timestamp: u64) -> NodeMetrics {
89 NodeMetrics {
90 address: address.into(),
91 timestamp,
92 cpu_load: self.cpu_load,
93 memory_used: self.memory_used,
94 memory_max: self.memory_max,
95 }
96 }
97}
98
99pub struct AdaptiveLoadBalancer {
105 metrics: Arc<ClusterMetrics>,
106}
107
108impl AdaptiveLoadBalancer {
109 pub fn new(metrics: Arc<ClusterMetrics>) -> Self {
110 Self { metrics }
111 }
112
113 pub fn pick<'a>(&self, candidates: &'a [&'a str]) -> Option<&'a str> {
116 if candidates.is_empty() {
117 return None;
118 }
119 let snapshot = self.metrics.snapshot();
120 let lookup: HashMap<&str, &NodeMetrics> = snapshot.iter().map(|m| (m.address.as_str(), m)).collect();
121 let mut sorted: Vec<&&str> = candidates.iter().collect();
122 sorted.sort_by(|a, b| {
123 let load_a = lookup.get(*a).map(|m| m.cpu_load).unwrap_or(f64::INFINITY);
124 let load_b = lookup.get(*b).map(|m| m.cpu_load).unwrap_or(f64::INFINITY);
125 load_a.partial_cmp(&load_b).unwrap_or(std::cmp::Ordering::Equal).then_with(|| a.cmp(b))
126 });
127 sorted.first().copied().copied()
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134
135 #[test]
136 fn publish_and_fetch() {
137 let m = ClusterMetrics::new();
138 m.publish(NodeMetrics {
139 address: "a".into(),
140 timestamp: 1,
141 cpu_load: 0.5,
142 memory_used: 100,
143 memory_max: 1000,
144 });
145 assert_eq!(m.snapshot().len(), 1);
146 assert_eq!(m.get("a").unwrap().cpu_load, 0.5);
147 }
148
149 #[test]
150 fn memory_usage_ratio() {
151 let m = NodeMetrics {
152 address: "a".into(),
153 timestamp: 0,
154 cpu_load: 0.0,
155 memory_used: 250,
156 memory_max: 1000,
157 };
158 assert_eq!(m.memory_usage(), 0.25);
159 }
160
161 #[test]
162 fn memory_usage_handles_zero_max() {
163 let m =
164 NodeMetrics { address: "a".into(), timestamp: 0, cpu_load: 0.0, memory_used: 0, memory_max: 0 };
165 assert_eq!(m.memory_usage(), 0.0);
166 }
167
168 #[test]
169 fn static_probe_returns_configured_values() {
170 let probe = StaticProbe { cpu_load: 0.7, memory_used: 5, memory_max: 10 };
171 let m = probe.sample("nodeA", 42);
172 assert_eq!(m.address, "nodeA");
173 assert_eq!(m.timestamp, 42);
174 assert_eq!(m.cpu_load, 0.7);
175 assert_eq!(m.memory_used, 5);
176 }
177
178 #[test]
179 fn adaptive_picks_lowest_load() {
180 let m = Arc::new(ClusterMetrics::new());
181 m.publish(NodeMetrics {
182 address: "a".into(),
183 timestamp: 0,
184 cpu_load: 0.9,
185 memory_used: 0,
186 memory_max: 1,
187 });
188 m.publish(NodeMetrics {
189 address: "b".into(),
190 timestamp: 0,
191 cpu_load: 0.1,
192 memory_used: 0,
193 memory_max: 1,
194 });
195 m.publish(NodeMetrics {
196 address: "c".into(),
197 timestamp: 0,
198 cpu_load: 0.5,
199 memory_used: 0,
200 memory_max: 1,
201 });
202 let lb = AdaptiveLoadBalancer::new(m);
203 assert_eq!(lb.pick(&["a", "b", "c"]), Some("b"));
204 }
205
206 #[test]
207 fn adaptive_falls_back_to_address_order_when_no_metrics() {
208 let m = Arc::new(ClusterMetrics::new());
209 let lb = AdaptiveLoadBalancer::new(m);
210 assert_eq!(lb.pick(&["c", "a", "b"]), Some("a"));
211 }
212
213 #[test]
214 fn adaptive_returns_none_for_empty_candidates() {
215 let m = Arc::new(ClusterMetrics::new());
216 let lb = AdaptiveLoadBalancer::new(m);
217 assert_eq!(lb.pick(&[]), None);
218 }
219}
220
221#[derive(Debug, Clone, Copy)]
228pub struct Ewma {
229 pub alpha: f64,
230 pub value: f64,
231}
232
233impl Ewma {
234 pub fn new(initial: f64, alpha: f64) -> Self {
237 assert!(alpha > 0.0 && alpha <= 1.0, "alpha must be in (0.0, 1.0]");
238 Self { alpha, value: initial }
239 }
240
241 pub fn from_half_life(initial: f64, half_life_samples: f64) -> Self {
245 assert!(half_life_samples > 0.0);
246 let alpha = 1.0 - (2.0_f64).powf(-1.0 / half_life_samples);
248 Self::new(initial, alpha)
249 }
250
251 pub fn update(&mut self, sample: f64) -> f64 {
253 self.value = self.alpha * sample + (1.0 - self.alpha) * self.value;
254 self.value
255 }
256}
257
258#[derive(Debug, Clone, Copy, PartialEq)]
264pub enum MetricsSelector {
265 Cpu,
267 Heap,
269 Mix,
271}
272
273impl MetricsSelector {
274 pub fn weight(&self, m: &NodeMetrics) -> f64 {
277 let cpu = (1.0 - m.cpu_load).clamp(0.0, 1.0);
278 let heap = (1.0 - m.memory_usage()).clamp(0.0, 1.0);
279 match self {
280 Self::Cpu => cpu,
281 Self::Heap => heap,
282 Self::Mix => 0.5 * (cpu + heap),
283 }
284 }
285}
286
287pub struct WeightedRoutees {
292 metrics: Arc<ClusterMetrics>,
293 selector: MetricsSelector,
294}
295
296impl WeightedRoutees {
297 pub fn new(metrics: Arc<ClusterMetrics>, selector: MetricsSelector) -> Self {
298 Self { metrics, selector }
299 }
300
301 pub fn pick<'a>(&self, candidates: &'a [&'a str], seed: f64) -> Option<&'a str> {
305 if candidates.is_empty() {
306 return None;
307 }
308 let snap = self.metrics.snapshot();
309 let by_addr: HashMap<&str, &NodeMetrics> = snap.iter().map(|m| (m.address.as_str(), m)).collect();
310 let weights: Vec<f64> = candidates
311 .iter()
312 .map(|c| by_addr.get(c).map(|m| self.selector.weight(m)).unwrap_or(0.5))
313 .collect();
314 let total: f64 = weights.iter().sum();
315 if total <= 0.0 {
316 return Some(candidates[0]);
317 }
318 let target = (seed.clamp(0.0, 1.0) * total).min(total);
319 let mut acc = 0.0;
320 for (i, w) in weights.iter().enumerate() {
321 acc += *w;
322 if target <= acc {
323 return Some(candidates[i]);
324 }
325 }
326 candidates.last().copied()
327 }
328}
329
330#[cfg(test)]
331mod ewma_tests {
332 use super::*;
333
334 #[test]
335 fn ewma_initial_value_unchanged_until_update() {
336 let e = Ewma::new(0.5, 0.3);
337 assert_eq!(e.value, 0.5);
338 }
339
340 #[test]
341 fn ewma_converges_to_steady_signal() {
342 let mut e = Ewma::new(0.0, 0.5);
343 for _ in 0..30 {
344 e.update(1.0);
345 }
346 assert!(e.value > 0.99, "expected ≈1.0, got {}", e.value);
347 }
348
349 #[test]
350 fn ewma_rejects_invalid_alpha() {
351 let r = std::panic::catch_unwind(|| Ewma::new(0.0, 0.0));
352 assert!(r.is_err());
353 }
354
355 #[test]
356 fn ewma_from_half_life_yields_50pct_weight_after_half_life() {
357 let mut e = Ewma::from_half_life(0.0, 4.0);
358 for _ in 0..4 {
360 e.update(1.0);
361 }
362 assert!(e.value >= 0.5);
363 }
364
365 #[test]
366 fn cpu_selector_prefers_lower_load() {
367 let m =
368 NodeMetrics { address: "a".into(), timestamp: 0, cpu_load: 0.2, memory_used: 0, memory_max: 1 };
369 let n =
370 NodeMetrics { address: "b".into(), timestamp: 0, cpu_load: 0.9, memory_used: 0, memory_max: 1 };
371 assert!(MetricsSelector::Cpu.weight(&m) > MetricsSelector::Cpu.weight(&n));
372 }
373
374 #[test]
375 fn mix_selector_averages_cpu_and_heap() {
376 let m = NodeMetrics {
377 address: "a".into(),
378 timestamp: 0,
379 cpu_load: 0.0,
380 memory_used: 100,
381 memory_max: 200,
382 };
383 let w = MetricsSelector::Mix.weight(&m);
384 assert!((w - 0.75).abs() < 1e-6, "mix weight {w}");
386 }
387
388 #[test]
389 fn weighted_routees_picks_higher_weight_node_more_often() {
390 let m = Arc::new(ClusterMetrics::new());
391 m.publish(NodeMetrics {
392 address: "fast".into(),
393 timestamp: 0,
394 cpu_load: 0.1,
395 memory_used: 0,
396 memory_max: 1,
397 });
398 m.publish(NodeMetrics {
399 address: "slow".into(),
400 timestamp: 0,
401 cpu_load: 0.9,
402 memory_used: 0,
403 memory_max: 1,
404 });
405 let r = WeightedRoutees::new(m, MetricsSelector::Cpu);
406 let cands = ["fast", "slow"];
407 let mut fast = 0;
408 for i in 0..100 {
410 let seed = i as f64 / 100.0;
411 if r.pick(&cands, seed) == Some("fast") {
412 fast += 1;
413 }
414 }
415 assert!(fast > 60, "expected >60 fast picks, got {fast}");
416 }
417
418 #[test]
419 fn weighted_routees_returns_first_when_all_zero() {
420 let m = Arc::new(ClusterMetrics::new());
421 m.publish(NodeMetrics {
422 address: "a".into(),
423 timestamp: 0,
424 cpu_load: 1.0,
425 memory_used: 1,
426 memory_max: 1,
427 });
428 m.publish(NodeMetrics {
429 address: "b".into(),
430 timestamp: 0,
431 cpu_load: 1.0,
432 memory_used: 1,
433 memory_max: 1,
434 });
435 let r = WeightedRoutees::new(m, MetricsSelector::Mix);
436 assert_eq!(r.pick(&["a", "b"], 0.5), Some("a"));
437 }
438}
439
440#[cfg(feature = "sysinfo-probe")]
443pub mod sys {
444 use super::{MetricsProbe, NodeMetrics};
447 use std::sync::Mutex;
448 use sysinfo::System;
449
450 pub struct SysinfoProbe {
451 sys: Mutex<System>,
452 }
453
454 impl Default for SysinfoProbe {
455 fn default() -> Self {
456 Self::new()
457 }
458 }
459
460 impl SysinfoProbe {
461 pub fn new() -> Self {
462 Self { sys: Mutex::new(System::new_all()) }
463 }
464 }
465
466 impl MetricsProbe for SysinfoProbe {
467 fn sample(&self, address: &str, timestamp: u64) -> NodeMetrics {
468 let mut sys = self.sys.lock().unwrap();
469 sys.refresh_cpu_usage();
470 sys.refresh_memory();
471 let cpu_load = (sys.global_cpu_usage() as f64 / 100.0).clamp(0.0, 1.0);
473 let memory_max = sys.total_memory();
474 let memory_used = sys.used_memory();
475 NodeMetrics { address: address.into(), timestamp, cpu_load, memory_used, memory_max }
476 }
477 }
478
479 #[cfg(test)]
480 mod tests {
481 use super::*;
482
483 #[test]
484 fn sysinfo_probe_returns_finite_load() {
485 let p = SysinfoProbe::new();
486 let m = p.sample("a", 1);
487 assert!(m.cpu_load.is_finite());
488 assert!(m.memory_max >= m.memory_used);
489 }
490 }
491}
492
493#[derive(Debug, Clone, Serialize, Deserialize)]
497#[non_exhaustive]
498pub enum MetricsPdu {
499 Push(NodeMetrics),
501 PushBatch(Vec<NodeMetrics>),
503}
504
505pub trait MetricsTransport: Send + Sync + 'static {
508 fn send(&self, target_node: &str, pdu: MetricsPdu);
509}
510
511pub fn apply_metrics_pdu(metrics: &ClusterMetrics, pdu: MetricsPdu) {
513 match pdu {
514 MetricsPdu::Push(m) => metrics.publish(m),
515 MetricsPdu::PushBatch(v) => {
516 for m in v {
517 metrics.publish(m);
518 }
519 }
520 }
521}
522
523pub fn gossip_local_metrics<P: MetricsProbe + ?Sized>(
525 probe: &P,
526 self_address: &str,
527 target_node: &str,
528 transport: &dyn MetricsTransport,
529 now: u64,
530) {
531 let m = probe.sample(self_address, now);
532 transport.send(target_node, MetricsPdu::Push(m));
533}
534
535#[cfg(test)]
536mod gossip_tests {
537 use super::*;
538 use std::sync::Mutex;
539
540 #[derive(Default)]
541 struct CaptureTransport {
542 sent: Mutex<Vec<(String, MetricsPdu)>>,
543 }
544 impl MetricsTransport for CaptureTransport {
545 fn send(&self, target: &str, pdu: MetricsPdu) {
546 self.sent.lock().unwrap().push((target.to_string(), pdu));
547 }
548 }
549
550 #[test]
551 fn gossip_pushes_local_sample_to_target() {
552 let probe = StaticProbe { cpu_load: 0.3, memory_used: 1, memory_max: 4 };
553 let net = CaptureTransport::default();
554 gossip_local_metrics(&probe, "self", "peer", &net, 1);
555 let sent = net.sent.lock().unwrap();
556 assert_eq!(sent.len(), 1);
557 match &sent[0].1 {
558 MetricsPdu::Push(m) => assert_eq!(m.address, "self"),
559 _ => panic!("expected Push"),
560 }
561 }
562
563 #[test]
564 fn apply_pdu_merges_into_metrics() {
565 let m = ClusterMetrics::new();
566 let pdu = MetricsPdu::Push(NodeMetrics {
567 address: "x".into(),
568 timestamp: 7,
569 cpu_load: 0.1,
570 memory_used: 1,
571 memory_max: 2,
572 });
573 apply_metrics_pdu(&m, pdu);
574 assert_eq!(m.node_count(), 1);
575 assert_eq!(m.get("x").unwrap().timestamp, 7);
576 }
577
578 #[test]
579 fn adaptive_balancer_can_be_used_as_picker_closure() {
580 let m = Arc::new(ClusterMetrics::new());
581 m.publish(NodeMetrics {
582 address: "akka.tcp://Sys@a:1".into(),
583 timestamp: 0,
584 cpu_load: 0.9,
585 memory_used: 0,
586 memory_max: 1,
587 });
588 m.publish(NodeMetrics {
589 address: "akka.tcp://Sys@b:1".into(),
590 timestamp: 0,
591 cpu_load: 0.1,
592 memory_used: 0,
593 memory_max: 1,
594 });
595 let lb = Arc::new(AdaptiveLoadBalancer::new(m));
596 type Picker = Arc<dyn Fn(&[String]) -> Option<String> + Send + Sync>;
597 let picker: Picker = {
598 let lb = lb.clone();
599 Arc::new(move |cands| {
600 let refs: Vec<&str> = cands.iter().map(String::as_str).collect();
601 lb.pick(&refs).map(|s| s.to_string())
602 })
603 };
604 let chosen = picker(&["akka.tcp://Sys@a:1".to_string(), "akka.tcp://Sys@b:1".to_string()]).unwrap();
605 assert_eq!(chosen, "akka.tcp://Sys@b:1");
606 }
607
608 #[test]
609 fn batch_pdu_merges_each() {
610 let m = ClusterMetrics::new();
611 let pdu = MetricsPdu::PushBatch(vec![
612 NodeMetrics { address: "a".into(), timestamp: 1, cpu_load: 0.0, memory_used: 0, memory_max: 0 },
613 NodeMetrics { address: "b".into(), timestamp: 2, cpu_load: 0.0, memory_used: 0, memory_max: 0 },
614 ]);
615 apply_metrics_pdu(&m, pdu);
616 assert_eq!(m.node_count(), 2);
617 }
618}