load_balancer/
interval.rs

1use async_trait::async_trait;
2use tokio::{sync::Mutex, sync::RwLock, task::yield_now};
3
4use crate::{BoxLoadBalancer, LoadBalancer};
5use std::{
6    future::Future,
7    sync::Arc,
8    time::{Duration, Instant},
9};
10
11/// A single entry in the interval load balancer.
12/// Each entry contains:
13/// - an interval (minimum duration before it can be reused),
14/// - the last time it was used,
15/// - and the associated value.
16pub struct Entry<T>
17where
18    T: Send + Sync + Clone + 'static,
19{
20    pub interval: Duration,
21    pub last: Mutex<Option<Instant>>,
22    pub value: T,
23}
24
25impl<T> Clone for Entry<T>
26where
27    T: Send + Sync + Clone + 'static,
28{
29    fn clone(&self) -> Self {
30        Self {
31            interval: self.interval.clone(),
32            last: self.last.try_lock().unwrap().clone().into(),
33            value: self.value.clone(),
34        }
35    }
36}
37
38/// A load balancer that allocates items based on a fixed interval.
39/// Each entry can only be reused after its interval has elapsed since the last allocation.
40#[derive(Clone)]
41pub struct IntervalLoadBalancer<T>
42where
43    T: Send + Sync + Clone + 'static,
44{
45    inner: Arc<RwLock<Vec<Entry<T>>>>,
46}
47
48impl<T> IntervalLoadBalancer<T>
49where
50    T: Send + Sync + Clone + 'static,
51{
52    /// Create a new `IntervalLoadBalancer` with a list of `(interval, value)` pairs.
53    /// Each value will only be available after its interval has passed since the last allocation.
54    pub fn new(entries: Vec<(Duration, T)>) -> Self {
55        Self {
56            inner: Arc::new(RwLock::new(
57                entries
58                    .into_iter()
59                    .map(|(interval, value)| Entry {
60                        interval,
61                        last: None.into(),
62                        value,
63                    })
64                    .collect(),
65            )),
66        }
67    }
68
69    /// Update the internal entries using an async callback.
70    /// This allows dynamic reconfiguration of the load balancer.
71    pub async fn update<F, R, N>(&self, handle: F) -> anyhow::Result<N>
72    where
73        F: Fn(Arc<RwLock<Vec<Entry<T>>>>) -> R,
74        R: Future<Output = anyhow::Result<N>>,
75    {
76        handle(self.inner.clone()).await
77    }
78}
79
80impl<T> LoadBalancer<T> for IntervalLoadBalancer<T>
81where
82    T: Send + Sync + Clone + 'static,
83{
84    /// Allocate a value asynchronously.
85    /// This will loop until a value becomes available, yielding in between attempts.
86    fn alloc(&self) -> impl Future<Output = T> + Send {
87        async move {
88            loop {
89                if let Some(v) = LoadBalancer::try_alloc(self) {
90                    return v;
91                }
92
93                let min_remaining = {
94                    let entries = self.inner.read().await;
95                    let mut min = None;
96
97                    for entry in entries.iter() {
98                        if entry.interval == Duration::ZERO {
99                            continue;
100                        }
101
102                        if let Some(last_time) = *entry.last.lock().await {
103                            let now = Instant::now();
104                            let elapsed = now.duration_since(last_time);
105
106                            if elapsed < entry.interval {
107                                let remaining = entry.interval - elapsed;
108
109                                if min.is_none() || remaining < min.unwrap() {
110                                    min = Some(remaining);
111                                }
112                            }
113                        }
114                    }
115
116                    min
117                };
118
119                if let Some(duration) = min_remaining {
120                    tokio::time::sleep(duration).await;
121                } else {
122                    yield_now().await;
123                }
124            }
125        }
126    }
127
128    /// Try to allocate a value immediately without waiting.
129    /// Returns `Some(value)` if an entry is available (interval elapsed),
130    /// otherwise returns `None`.
131    fn try_alloc(&self) -> Option<T> {
132        let entries = self.inner.try_read().ok()?;
133
134        for entry in entries.iter() {
135            if entry.interval == Duration::ZERO {
136                return Some(entry.value.clone());
137            }
138
139            if let Ok(mut last) = entry.last.try_lock() {
140                match *last {
141                    Some(v) => {
142                        let now = Instant::now();
143
144                        if now.duration_since(v) >= entry.interval {
145                            *last = Some(now);
146                            return Some(entry.value.clone());
147                        }
148                    }
149                    None => {
150                        *last = Some(Instant::now());
151                        return Some(entry.value.clone());
152                    }
153                }
154            }
155        }
156
157        None
158    }
159}
160
161#[async_trait]
162impl<T> BoxLoadBalancer<T> for IntervalLoadBalancer<T>
163where
164    T: Send + Sync + Clone + 'static,
165{
166    /// Asynchronous allocation (boxed trait version).
167    async fn alloc(&self) -> T {
168        LoadBalancer::alloc(self).await
169    }
170
171    /// Immediate allocation attempt (boxed trait version).
172    fn try_alloc(&self) -> Option<T> {
173        LoadBalancer::try_alloc(self)
174    }
175}