apigate_core/balancing/
least_time.rs1use std::sync::OnceLock;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3
4use super::{BalanceCtx, Balancer, ResultEvent};
5
6const EWMA_WEIGHT: u64 = 10;
7
8pub struct LeastTime {
9 ewma_us: OnceLock<Box<[AtomicU64]>>,
10 offset: AtomicUsize,
11}
12
13impl LeastTime {
14 pub fn new() -> Self {
15 Self {
16 ewma_us: OnceLock::new(),
17 offset: AtomicUsize::new(0),
18 }
19 }
20
21 fn latencies(&self, pool_len: usize) -> &[AtomicU64] {
22 self.ewma_us.get_or_init(|| {
23 (0..pool_len)
24 .map(|_| AtomicU64::new(0))
25 .collect::<Vec<_>>()
26 .into_boxed_slice()
27 })
28 }
29}
30
31impl Default for LeastTime {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl Balancer for LeastTime {
38 fn pick(&self, ctx: &BalanceCtx) -> Option<usize> {
39 let len = ctx.candidate_len();
40 if len == 0 {
41 return None;
42 }
43
44 let latencies = self.latencies(ctx.pool.len());
45 let offset = self.offset.fetch_add(1, Ordering::Relaxed);
46 let mut best_index = None;
47 let mut best_latency = u64::MAX;
48
49 for i in 0..len {
50 let nth = (offset + i) % len;
51 if let Some(idx) = ctx.candidate_index(nth) {
52 let latency = latencies
53 .get(idx)
54 .map(|a| a.load(Ordering::Relaxed))
55 .unwrap_or(0);
56 if latency < best_latency {
57 best_latency = latency;
58 best_index = Some(idx);
59 }
60 }
61 }
62
63 best_index
64 }
65
66 fn on_result(&self, event: &ResultEvent<'_>) {
67 if event.error.is_some() {
68 return;
69 }
70
71 if let Some(latencies) = self.ewma_us.get() {
72 if let Some(slot) = latencies.get(event.backend_index) {
73 let sample = event.head_latency.as_micros() as u64;
74 let mut old = slot.load(Ordering::Relaxed);
75 loop {
76 let new = if old == 0 {
77 sample
78 } else {
79 old - old / EWMA_WEIGHT + sample / EWMA_WEIGHT
80 };
81 match slot.compare_exchange_weak(old, new, Ordering::Relaxed, Ordering::Relaxed)
82 {
83 Ok(_) => break,
84 Err(actual) => old = actual,
85 }
86 }
87 }
88 }
89 }
90}