Skip to main content

apigate_core/balancing/
least_time.rs

1use std::sync::OnceLock;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3
4use super::{BalanceCtx, Balancer, ResultEvent};
5
6const EWMA_WEIGHT: u64 = 10;
7
8pub struct LeastTime {
9    ewma_us: OnceLock<Box<[AtomicU64]>>,
10    offset: AtomicUsize,
11}
12
13impl LeastTime {
14    pub fn new() -> Self {
15        Self {
16            ewma_us: OnceLock::new(),
17            offset: AtomicUsize::new(0),
18        }
19    }
20
21    fn latencies(&self, pool_len: usize) -> &[AtomicU64] {
22        self.ewma_us.get_or_init(|| {
23            (0..pool_len)
24                .map(|_| AtomicU64::new(0))
25                .collect::<Vec<_>>()
26                .into_boxed_slice()
27        })
28    }
29}
30
31impl Balancer for LeastTime {
32    fn pick(&self, ctx: &BalanceCtx) -> Option<usize> {
33        let len = ctx.candidate_len();
34        if len == 0 {
35            return None;
36        }
37
38        let latencies = self.latencies(ctx.pool.len());
39        let offset = self.offset.fetch_add(1, Ordering::Relaxed);
40        let mut best_index = None;
41        let mut best_latency = u64::MAX;
42
43        for i in 0..len {
44            let nth = (offset + i) % len;
45            if let Some(idx) = ctx.candidate_index(nth) {
46                let latency = latencies
47                    .get(idx)
48                    .map(|a| a.load(Ordering::Relaxed))
49                    .unwrap_or(0);
50                if latency < best_latency {
51                    best_latency = latency;
52                    best_index = Some(idx);
53                }
54            }
55        }
56
57        best_index
58    }
59
60    fn on_result(&self, event: &ResultEvent<'_>) {
61        if event.error.is_some() {
62            return;
63        }
64
65        if let Some(latencies) = self.ewma_us.get() {
66            if let Some(slot) = latencies.get(event.backend_index) {
67                let sample = event.head_latency.as_micros() as u64;
68                let mut old = slot.load(Ordering::Relaxed);
69                loop {
70                    let new = if old == 0 {
71                        sample
72                    } else {
73                        old - old / EWMA_WEIGHT + sample / EWMA_WEIGHT
74                    };
75                    match slot.compare_exchange_weak(old, new, Ordering::Relaxed, Ordering::Relaxed)
76                    {
77                        Ok(_) => break,
78                        Err(actual) => old = actual,
79                    }
80                }
81            }
82        }
83    }
84}