load_balancer/
interval.rs1use async_trait::async_trait;
2use tokio::{sync::Mutex, task::yield_now};
3
4use crate::{BoxLoadBalancer, LoadBalancer};
5use std::{
6 future::Future,
7 sync::{Arc, RwLock},
8 time::{Duration, Instant},
9};
10
11pub struct Entry<T>
17where
18 T: Send + Sync + Clone + 'static,
19{
20 pub interval: Duration,
21 pub last: Mutex<Option<Instant>>,
22 pub value: T,
23}
24
25impl<T> Clone for Entry<T>
26where
27 T: Send + Sync + Clone + 'static,
28{
29 fn clone(&self) -> Self {
30 Self {
31 interval: self.interval.clone(),
32 last: self.last.try_lock().unwrap().clone().into(),
33 value: self.value.clone(),
34 }
35 }
36}
37
38#[derive(Clone)]
41pub struct IntervalLoadBalancer<T>
42where
43 T: Send + Sync + Clone + 'static,
44{
45 inner: Arc<RwLock<Vec<Entry<T>>>>,
46}
47
48impl<T> IntervalLoadBalancer<T>
49where
50 T: Send + Sync + Clone + 'static,
51{
52 pub fn new(entries: Vec<(Duration, T)>) -> Self {
55 Self {
56 inner: Arc::new(RwLock::new(
57 entries
58 .into_iter()
59 .map(|(interval, value)| Entry {
60 interval,
61 last: None.into(),
62 value,
63 })
64 .collect(),
65 )),
66 }
67 }
68
69 pub async fn update<F, R>(&self, handle: F) -> anyhow::Result<()>
72 where
73 F: Fn(Arc<RwLock<Vec<Entry<T>>>>) -> R,
74 R: Future<Output = anyhow::Result<()>>,
75 {
76 handle(self.inner.clone()).await
77 }
78}
79
80impl<T> LoadBalancer<T> for IntervalLoadBalancer<T>
81where
82 T: Send + Sync + Clone + 'static,
83{
84 fn alloc(&self) -> impl Future<Output = T> + Send {
87 async move {
88 loop {
89 match LoadBalancer::try_alloc(self) {
90 Some(v) => return v,
91 None => yield_now().await,
92 }
93 }
94 }
95 }
96
97 fn try_alloc(&self) -> Option<T> {
101 let entries = self.inner.try_read().ok()?;
102
103 for entry in entries.iter() {
104 if entry.interval == Duration::ZERO {
105 return Some(entry.value.clone());
106 }
107
108 if let Ok(mut last) = entry.last.try_lock() {
109 match *last {
110 Some(v) => {
111 let now = Instant::now();
112
113 if now.duration_since(v) >= entry.interval {
114 *last = Some(now);
115 return Some(entry.value.clone());
116 }
117 }
118 None => {
119 *last = Some(Instant::now());
120 return Some(entry.value.clone());
121 }
122 }
123 }
124 }
125
126 None
127 }
128}
129
130#[async_trait]
131impl<T> BoxLoadBalancer<T> for IntervalLoadBalancer<T>
132where
133 T: Send + Sync + Clone + 'static,
134{
135 async fn alloc(&self) -> T {
137 LoadBalancer::alloc(self).await
138 }
139
140 fn try_alloc(&self) -> Option<T> {
142 LoadBalancer::try_alloc(self)
143 }
144}