load_balancer/
interval.rs

1use async_trait::async_trait;
2use tokio::{sync::Mutex, task::yield_now};
3
4use crate::{BoxLoadBalancer, LoadBalancer};
5use std::{
6    future::Future,
7    sync::{Arc, RwLock},
8    time::{Duration, Instant},
9};
10
11/// A single entry in the interval load balancer.
12/// Each entry contains:
13/// - an interval (minimum duration before it can be reused),
14/// - the last time it was used,
15/// - and the associated value.
16pub struct Entry<T>
17where
18    T: Send + Sync + Clone + 'static,
19{
20    pub interval: Duration,
21    pub last: Mutex<Option<Instant>>,
22    pub value: T,
23}
24
25impl<T> Clone for Entry<T>
26where
27    T: Send + Sync + Clone + 'static,
28{
29    fn clone(&self) -> Self {
30        Self {
31            interval: self.interval.clone(),
32            last: self.last.try_lock().unwrap().clone().into(),
33            value: self.value.clone(),
34        }
35    }
36}
37
38/// A load balancer that allocates items based on a fixed interval.
39/// Each entry can only be reused after its interval has elapsed since the last allocation.
40#[derive(Clone)]
41pub struct IntervalLoadBalancer<T>
42where
43    T: Send + Sync + Clone + 'static,
44{
45    inner: Arc<RwLock<Vec<Entry<T>>>>,
46}
47
48impl<T> IntervalLoadBalancer<T>
49where
50    T: Send + Sync + Clone + 'static,
51{
52    /// Create a new `IntervalLoadBalancer` with a list of `(interval, value)` pairs.
53    /// Each value will only be available after its interval has passed since the last allocation.
54    pub fn new(entries: Vec<(Duration, T)>) -> Self {
55        Self {
56            inner: Arc::new(RwLock::new(
57                entries
58                    .into_iter()
59                    .map(|(interval, value)| Entry {
60                        interval,
61                        last: None.into(),
62                        value,
63                    })
64                    .collect(),
65            )),
66        }
67    }
68
69    /// Update the internal entries using an async callback.
70    /// This allows dynamic reconfiguration of the load balancer.
71    pub async fn update<F, R>(&self, handle: F) -> anyhow::Result<()>
72    where
73        F: Fn(Arc<RwLock<Vec<Entry<T>>>>) -> R,
74        R: Future<Output = anyhow::Result<()>>,
75    {
76        handle(self.inner.clone()).await
77    }
78}
79
80impl<T> LoadBalancer<T> for IntervalLoadBalancer<T>
81where
82    T: Send + Sync + Clone + 'static,
83{
84    /// Allocate a value asynchronously.
85    /// This will loop until a value becomes available, yielding in between attempts.
86    fn alloc(&self) -> impl Future<Output = T> + Send {
87        async move {
88            loop {
89                match LoadBalancer::try_alloc(self) {
90                    Some(v) => return v,
91                    None => yield_now().await,
92                }
93            }
94        }
95    }
96
97    /// Try to allocate a value immediately without waiting.
98    /// Returns `Some(value)` if an entry is available (interval elapsed),
99    /// otherwise returns `None`.
100    fn try_alloc(&self) -> Option<T> {
101        let entries = self.inner.try_read().ok()?;
102
103        for entry in entries.iter() {
104            if let Ok(mut last) = entry.last.try_lock() {
105                match *last {
106                    Some(v) => {
107                        let now = Instant::now();
108
109                        if now.duration_since(v) >= entry.interval {
110                            *last = Some(now);
111                            return Some(entry.value.clone());
112                        }
113                    }
114                    None => {
115                        *last = Some(Instant::now());
116                        return Some(entry.value.clone());
117                    }
118                }
119            }
120        }
121
122        None
123    }
124}
125
126#[async_trait]
127impl<T> BoxLoadBalancer<T> for IntervalLoadBalancer<T>
128where
129    T: Send + Sync + Clone + 'static,
130{
131    /// Asynchronous allocation (boxed trait version).
132    async fn alloc(&self) -> T {
133        LoadBalancer::alloc(self).await
134    }
135
136    /// Immediate allocation attempt (boxed trait version).
137    fn try_alloc(&self) -> Option<T> {
138        LoadBalancer::try_alloc(self)
139    }
140}