load_balancer/
interval.rs1use async_trait::async_trait;
2use tokio::{sync::Mutex, task::yield_now};
3
4use crate::{BoxLoadBalancer, LoadBalancer};
5use std::{
6 future::Future,
7 sync::{Arc, RwLock},
8 time::{Duration, Instant},
9};
10
11pub struct Entry<T>
17where
18 T: Send + Sync + Clone + 'static,
19{
20 pub interval: Duration,
21 pub last: Mutex<Option<Instant>>,
22 pub value: T,
23}
24
25impl<T> Clone for Entry<T>
26where
27 T: Send + Sync + Clone + 'static,
28{
29 fn clone(&self) -> Self {
30 Self {
31 interval: self.interval.clone(),
32 last: self.last.try_lock().unwrap().clone().into(),
33 value: self.value.clone(),
34 }
35 }
36}
37
38#[derive(Clone)]
41pub struct IntervalLoadBalancer<T>
42where
43 T: Send + Sync + Clone + 'static,
44{
45 inner: Arc<RwLock<Vec<Entry<T>>>>,
46}
47
48impl<T> IntervalLoadBalancer<T>
49where
50 T: Send + Sync + Clone + 'static,
51{
52 pub fn new(entries: Vec<(Duration, T)>) -> Self {
55 Self {
56 inner: Arc::new(RwLock::new(
57 entries
58 .into_iter()
59 .map(|(interval, value)| Entry {
60 interval,
61 last: None.into(),
62 value,
63 })
64 .collect(),
65 )),
66 }
67 }
68
69 pub async fn update<F, R>(&self, handle: F) -> anyhow::Result<()>
72 where
73 F: Fn(Arc<RwLock<Vec<Entry<T>>>>) -> R,
74 R: Future<Output = anyhow::Result<()>>,
75 {
76 handle(self.inner.clone()).await
77 }
78}
79
80impl<T> LoadBalancer<T> for IntervalLoadBalancer<T>
81where
82 T: Send + Sync + Clone + 'static,
83{
84 fn alloc(&self) -> impl Future<Output = T> + Send {
87 async move {
88 loop {
89 match LoadBalancer::try_alloc(self) {
90 Some(v) => return v,
91 None => yield_now().await,
92 }
93 }
94 }
95 }
96
97 fn try_alloc(&self) -> Option<T> {
101 let entries = self.inner.try_read().ok()?;
102
103 for entry in entries.iter() {
104 if let Ok(mut last) = entry.last.try_lock() {
105 match *last {
106 Some(v) => {
107 let now = Instant::now();
108
109 if now.duration_since(v) >= entry.interval {
110 *last = Some(now);
111 return Some(entry.value.clone());
112 }
113 }
114 None => {
115 *last = Some(Instant::now());
116 return Some(entry.value.clone());
117 }
118 }
119 }
120 }
121
122 None
123 }
124}
125
126#[async_trait]
127impl<T> BoxLoadBalancer<T> for IntervalLoadBalancer<T>
128where
129 T: Send + Sync + Clone + 'static,
130{
131 async fn alloc(&self) -> T {
133 LoadBalancer::alloc(self).await
134 }
135
136 fn try_alloc(&self) -> Option<T> {
138 LoadBalancer::try_alloc(self)
139 }
140}