load_balancer/
interval.rs

1use async_trait::async_trait;
2use tokio::task::yield_now;
3
4use crate::{BoxLoadBalancer, LoadBalancer};
5use std::{
6    future::Future,
7    sync::{Arc, RwLock},
8    time::{Duration, Instant},
9};
10
11/// A single entry in the interval load balancer.
12/// Each entry contains:
13/// - an interval (minimum duration before it can be reused),
14/// - the last time it was used,
15/// - and the associated value.
16pub struct Entry<T>
17where
18    T: Send + Sync + Clone + 'static,
19{
20    pub interval: Duration,
21    pub last: Option<RwLock<Instant>>,
22    pub value: T,
23}
24
25impl<T> Clone for Entry<T>
26where
27    T: Send + Sync + Clone + 'static,
28{
29    fn clone(&self) -> Self {
30        Self {
31            interval: self.interval,
32            last: self
33                .last
34                .as_ref()
35                .and_then(|v| Some(RwLock::new(*v.try_read().unwrap()))),
36            value: self.value.clone(),
37        }
38    }
39}
40
41/// A load balancer that allocates items based on a fixed interval.
42/// Each entry can only be reused after its interval has elapsed since the last allocation.
43#[derive(Clone)]
44pub struct IntervalLoadBalancer<T>
45where
46    T: Send + Sync + Clone + 'static,
47{
48    inner: Arc<RwLock<Vec<Entry<T>>>>,
49}
50
51impl<T> IntervalLoadBalancer<T>
52where
53    T: Send + Sync + Clone + 'static,
54{
55    /// Create a new `IntervalLoadBalancer` with a list of `(interval, value)` pairs.
56    /// Each value will only be available after its interval has passed since the last allocation.
57    pub fn new(entries: Vec<(Duration, T)>) -> Self {
58        Self {
59            inner: Arc::new(RwLock::new(
60                entries
61                    .into_iter()
62                    .map(|(interval, value)| Entry {
63                        interval,
64                        last: None,
65                        value,
66                    })
67                    .collect(),
68            )),
69        }
70    }
71
72    /// Update the internal entries using an async callback.
73    /// This allows dynamic reconfiguration of the load balancer.
74    pub async fn update<F, R>(&self, handle: F) -> anyhow::Result<()>
75    where
76        F: Fn(Arc<RwLock<Vec<Entry<T>>>>) -> R,
77        R: Future<Output = anyhow::Result<()>>,
78    {
79        handle(self.inner.clone()).await
80    }
81}
82
83impl<T> LoadBalancer<T> for IntervalLoadBalancer<T>
84where
85    T: Send + Sync + Clone + 'static,
86{
87    /// Allocate a value asynchronously.
88    /// This will loop until a value becomes available, yielding in between attempts.
89    fn alloc(&self) -> impl Future<Output = T> + Send {
90        async move {
91            loop {
92                match LoadBalancer::try_alloc(self) {
93                    Some(v) => return v,
94                    None => yield_now().await,
95                }
96            }
97        }
98    }
99
100    /// Try to allocate a value immediately without waiting.
101    /// Returns `Some(value)` if an entry is available (interval elapsed),
102    /// otherwise returns `None`.
103    fn try_alloc(&self) -> Option<T> {
104        let mut entries = self.inner.try_write().ok()?;
105
106        for entry in entries.iter_mut() {
107            if entry.last.is_none() {
108                entry.last = Some(RwLock::new(Instant::now()));
109                return Some(entry.value.clone());
110            }
111
112            let last = entry.last.as_ref().and_then(|lock| lock.try_read().ok())?;
113            let now = Instant::now();
114
115            if now.duration_since(*last) >= entry.interval {
116                drop(last);
117                entry.last = Some(RwLock::new(now));
118                return Some(entry.value.clone());
119            }
120        }
121
122        None
123    }
124}
125
126#[async_trait]
127impl<T> BoxLoadBalancer<T> for IntervalLoadBalancer<T>
128where
129    T: Send + Sync + Clone + 'static,
130{
131    /// Asynchronous allocation (boxed trait version).
132    async fn alloc(&self) -> T {
133        LoadBalancer::alloc(self).await
134    }
135
136    /// Immediate allocation attempt (boxed trait version).
137    fn try_alloc(&self) -> Option<T> {
138        LoadBalancer::try_alloc(self)
139    }
140}