load_balancer/
interval.rs1use async_trait::async_trait;
2use tokio::{sync::Mutex, sync::RwLock, task::yield_now};
3
4use crate::{BoxLoadBalancer, LoadBalancer};
5use std::{
6 future::Future,
7 sync::Arc,
8 time::{Duration, Instant},
9};
10
11pub struct Entry<T>
17where
18 T: Send + Sync + Clone + 'static,
19{
20 pub interval: Duration,
21 pub last: Mutex<Option<Instant>>,
22 pub value: T,
23}
24
25impl<T> Clone for Entry<T>
26where
27 T: Send + Sync + Clone + 'static,
28{
29 fn clone(&self) -> Self {
30 Self {
31 interval: self.interval.clone(),
32 last: self.last.try_lock().unwrap().clone().into(),
33 value: self.value.clone(),
34 }
35 }
36}
37
38#[derive(Clone)]
41pub struct IntervalLoadBalancer<T>
42where
43 T: Send + Sync + Clone + 'static,
44{
45 inner: Arc<RwLock<Vec<Entry<T>>>>,
46}
47
48impl<T> IntervalLoadBalancer<T>
49where
50 T: Send + Sync + Clone + 'static,
51{
52 pub fn new(entries: Vec<(Duration, T)>) -> Self {
55 Self {
56 inner: Arc::new(RwLock::new(
57 entries
58 .into_iter()
59 .map(|(interval, value)| Entry {
60 interval,
61 last: None.into(),
62 value,
63 })
64 .collect(),
65 )),
66 }
67 }
68
69 pub async fn update<F, R, N>(&self, handle: F) -> anyhow::Result<N>
72 where
73 F: Fn(Arc<RwLock<Vec<Entry<T>>>>) -> R,
74 R: Future<Output = anyhow::Result<N>>,
75 {
76 handle(self.inner.clone()).await
77 }
78}
79
80impl<T> LoadBalancer<T> for IntervalLoadBalancer<T>
81where
82 T: Send + Sync + Clone + 'static,
83{
84 fn alloc(&self) -> impl Future<Output = T> + Send {
87 async move {
88 loop {
89 if let Some(v) = LoadBalancer::try_alloc(self) {
90 return v;
91 }
92
93 let min_remaining = {
94 let entries = self.inner.read().await;
95 let mut min = None;
96
97 for entry in entries.iter() {
98 if entry.interval == Duration::ZERO {
99 continue;
100 }
101
102 if let Some(last_time) = *entry.last.lock().await {
103 let now = Instant::now();
104 let elapsed = now.duration_since(last_time);
105
106 if elapsed < entry.interval {
107 let remaining = entry.interval - elapsed;
108
109 if min.is_none() || remaining < min.unwrap() {
110 min = Some(remaining);
111 }
112 }
113 }
114 }
115
116 min
117 };
118
119 if let Some(duration) = min_remaining {
120 tokio::time::sleep(duration).await;
121 } else {
122 yield_now().await;
123 }
124 }
125 }
126 }
127
128 fn try_alloc(&self) -> Option<T> {
132 let entries = self.inner.try_read().ok()?;
133
134 for entry in entries.iter() {
135 if entry.interval == Duration::ZERO {
136 return Some(entry.value.clone());
137 }
138
139 if let Ok(mut last) = entry.last.try_lock() {
140 match *last {
141 Some(v) => {
142 let now = Instant::now();
143
144 if now.duration_since(v) >= entry.interval {
145 *last = Some(now);
146 return Some(entry.value.clone());
147 }
148 }
149 None => {
150 *last = Some(Instant::now());
151 return Some(entry.value.clone());
152 }
153 }
154 }
155 }
156
157 None
158 }
159}
160
161#[async_trait]
162impl<T> BoxLoadBalancer<T> for IntervalLoadBalancer<T>
163where
164 T: Send + Sync + Clone + 'static,
165{
166 async fn alloc(&self) -> T {
168 LoadBalancer::alloc(self).await
169 }
170
171 fn try_alloc(&self) -> Option<T> {
173 LoadBalancer::try_alloc(self)
174 }
175}