Skip to main content

apigate_core/balancing/
least_time.rs

1use std::sync::OnceLock;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3
4use super::{BalanceCtx, Balancer, ResultEvent};
5
6const EWMA_WEIGHT: u64 = 10;
7
8pub struct LeastTime {
9    ewma_us: OnceLock<Box<[AtomicU64]>>,
10    offset: AtomicUsize,
11}
12
13impl LeastTime {
14    pub fn new() -> Self {
15        Self {
16            ewma_us: OnceLock::new(),
17            offset: AtomicUsize::new(0),
18        }
19    }
20
21    fn latencies(&self, pool_len: usize) -> &[AtomicU64] {
22        self.ewma_us.get_or_init(|| {
23            (0..pool_len)
24                .map(|_| AtomicU64::new(0))
25                .collect::<Vec<_>>()
26                .into_boxed_slice()
27        })
28    }
29}
30
31impl Default for LeastTime {
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37impl Balancer for LeastTime {
38    fn pick(&self, ctx: &BalanceCtx) -> Option<usize> {
39        let len = ctx.candidate_len();
40        if len == 0 {
41            return None;
42        }
43
44        let latencies = self.latencies(ctx.pool.len());
45        let offset = self.offset.fetch_add(1, Ordering::Relaxed);
46        let mut best_index = None;
47        let mut best_latency = u64::MAX;
48
49        for i in 0..len {
50            let nth = (offset + i) % len;
51            if let Some(idx) = ctx.candidate_index(nth) {
52                let latency = latencies
53                    .get(idx)
54                    .map(|a| a.load(Ordering::Relaxed))
55                    .unwrap_or(0);
56                if latency < best_latency {
57                    best_latency = latency;
58                    best_index = Some(idx);
59                }
60            }
61        }
62
63        best_index
64    }
65
66    fn on_result(&self, event: &ResultEvent<'_>) {
67        if event.error.is_some() {
68            return;
69        }
70
71        if let Some(latencies) = self.ewma_us.get() {
72            if let Some(slot) = latencies.get(event.backend_index) {
73                let sample = event.head_latency.as_micros() as u64;
74                let mut old = slot.load(Ordering::Relaxed);
75                loop {
76                    let new = if old == 0 {
77                        sample
78                    } else {
79                        old - old / EWMA_WEIGHT + sample / EWMA_WEIGHT
80                    };
81                    match slot.compare_exchange_weak(old, new, Ordering::Relaxed, Ordering::Relaxed)
82                    {
83                        Ok(_) => break,
84                        Err(actual) => old = actual,
85                    }
86                }
87            }
88        }
89    }
90}