load_balancer/
interval.rs1use async_trait::async_trait;
2use tokio::task::yield_now;
3
4use crate::{BoxLoadBalancer, LoadBalancer};
5use std::{
6 future::Future,
7 sync::{Arc, RwLock},
8 time::{Duration, Instant},
9};
10
11pub struct Entry<T>
17where
18 T: Send + Sync + Clone + 'static,
19{
20 pub interval: Duration,
21 pub last: Option<RwLock<Instant>>,
22 pub value: T,
23}
24
25impl<T> Clone for Entry<T>
26where
27 T: Send + Sync + Clone + 'static,
28{
29 fn clone(&self) -> Self {
30 Self {
31 interval: self.interval,
32 last: self
33 .last
34 .as_ref()
35 .and_then(|v| Some(RwLock::new(*v.try_read().unwrap()))),
36 value: self.value.clone(),
37 }
38 }
39}
40
41#[derive(Clone)]
44pub struct IntervalLoadBalancer<T>
45where
46 T: Send + Sync + Clone + 'static,
47{
48 inner: Arc<RwLock<Vec<Entry<T>>>>,
49}
50
51impl<T> IntervalLoadBalancer<T>
52where
53 T: Send + Sync + Clone + 'static,
54{
55 pub fn new(entries: Vec<(Duration, T)>) -> Self {
58 Self {
59 inner: Arc::new(RwLock::new(
60 entries
61 .into_iter()
62 .map(|(interval, value)| Entry {
63 interval,
64 last: None,
65 value,
66 })
67 .collect(),
68 )),
69 }
70 }
71
72 pub async fn update<F, R>(&self, handle: F) -> anyhow::Result<()>
75 where
76 F: Fn(Arc<RwLock<Vec<Entry<T>>>>) -> R,
77 R: Future<Output = anyhow::Result<()>>,
78 {
79 handle(self.inner.clone()).await
80 }
81}
82
83impl<T> LoadBalancer<T> for IntervalLoadBalancer<T>
84where
85 T: Send + Sync + Clone + 'static,
86{
87 fn alloc(&self) -> impl Future<Output = T> + Send {
90 async move {
91 loop {
92 match LoadBalancer::try_alloc(self) {
93 Some(v) => return v,
94 None => yield_now().await,
95 }
96 }
97 }
98 }
99
100 fn try_alloc(&self) -> Option<T> {
104 let mut entries = self.inner.try_write().ok()?;
105
106 for entry in entries.iter_mut() {
107 if entry.last.is_none() {
108 entry.last = Some(RwLock::new(Instant::now()));
109 return Some(entry.value.clone());
110 }
111
112 let last = entry.last.as_ref().and_then(|lock| lock.try_read().ok())?;
113 let now = Instant::now();
114
115 if now.duration_since(*last) >= entry.interval {
116 drop(last);
117 entry.last = Some(RwLock::new(now));
118 return Some(entry.value.clone());
119 }
120 }
121
122 None
123 }
124}
125
126#[async_trait]
127impl<T> BoxLoadBalancer<T> for IntervalLoadBalancer<T>
128where
129 T: Send + Sync + Clone + 'static,
130{
131 async fn alloc(&self) -> T {
133 LoadBalancer::alloc(self).await
134 }
135
136 fn try_alloc(&self) -> Option<T> {
138 LoadBalancer::try_alloc(self)
139 }
140}