apigate_core/balancing/
least_time.rs1use std::sync::OnceLock;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3
4use super::{BalanceCtx, Balancer, ResultEvent};
5
6const EWMA_WEIGHT: u64 = 10;
7
8pub struct LeastTime {
9 ewma_us: OnceLock<Box<[AtomicU64]>>,
10 offset: AtomicUsize,
11}
12
13impl LeastTime {
14 pub fn new() -> Self {
15 Self {
16 ewma_us: OnceLock::new(),
17 offset: AtomicUsize::new(0),
18 }
19 }
20
21 fn latencies(&self, pool_len: usize) -> &[AtomicU64] {
22 self.ewma_us.get_or_init(|| {
23 (0..pool_len)
24 .map(|_| AtomicU64::new(0))
25 .collect::<Vec<_>>()
26 .into_boxed_slice()
27 })
28 }
29}
30
31impl Balancer for LeastTime {
32 fn pick<'a>(&self, ctx: &'a BalanceCtx<'a>) -> Option<usize> {
33 let len = ctx.candidate_len();
34 if len == 0 {
35 return None;
36 }
37
38 let latencies = self.latencies(ctx.pool.len());
39 let offset = self.offset.fetch_add(1, Ordering::Relaxed);
40 let mut best_index = None;
41 let mut best_latency = u64::MAX;
42
43 for i in 0..len {
44 let nth = (offset + i) % len;
45 if let Some(idx) = ctx.candidate_index(nth) {
46 let latency = latencies
47 .get(idx)
48 .map(|a| a.load(Ordering::Relaxed))
49 .unwrap_or(0);
50 if latency < best_latency {
51 best_latency = latency;
52 best_index = Some(idx);
53 }
54 }
55 }
56
57 best_index
58 }
59
60 fn on_result(&self, event: &ResultEvent<'_>) {
61 if event.error.is_some() {
62 return;
63 }
64
65 if let Some(latencies) = self.ewma_us.get() {
66 if let Some(slot) = latencies.get(event.backend_index) {
67 let sample = event.head_latency.as_micros() as u64;
68 let mut old = slot.load(Ordering::Relaxed);
69 loop {
70 let new = if old == 0 {
71 sample
72 } else {
73 old - old / EWMA_WEIGHT + sample / EWMA_WEIGHT
74 };
75 match slot.compare_exchange_weak(old, new, Ordering::Relaxed, Ordering::Relaxed)
76 {
77 Ok(_) => break,
78 Err(actual) => old = actual,
79 }
80 }
81 }
82 }
83 }
84}