1use crate::{BoxLoadBalancer, LoadBalancer};
2use async_trait::async_trait;
3use std::future::Future;
4use std::sync::{
5 Arc,
6 atomic::{AtomicUsize, Ordering},
7};
8use tokio::sync::RwLock;
9
10#[derive(Clone)]
12pub struct Entry<T>
13where
14 T: Send + Sync + Clone + 'static,
15{
16 pub value: T,
18}
19
20pub struct SimpleLoadBalancerRef<T>
22where
23 T: Send + Sync + Clone + 'static,
24{
25 pub entries: RwLock<Vec<Entry<T>>>,
27 pub cursor: AtomicUsize,
29}
30
31#[derive(Clone)]
33pub struct SimpleLoadBalancer<T>
34where
35 T: Send + Sync + Clone + 'static,
36{
37 inner: Arc<SimpleLoadBalancerRef<T>>,
39}
40
41impl<T> SimpleLoadBalancer<T>
42where
43 T: Send + Sync + Clone + 'static,
44{
45 pub fn new(entries: Vec<T>) -> Self {
47 Self {
48 inner: Arc::new(SimpleLoadBalancerRef {
49 entries: RwLock::new(entries.into_iter().map(|v| Entry { value: v }).collect()),
50 cursor: AtomicUsize::new(0),
51 }),
52 }
53 }
54
55 pub async fn update<F, R>(&self, handle: F) -> anyhow::Result<()>
57 where
58 F: Fn(Arc<SimpleLoadBalancerRef<T>>) -> R,
59 R: Future<Output = anyhow::Result<()>>,
60 {
61 handle(self.inner.clone()).await
62 }
63}
64
65impl<T> LoadBalancer<T> for SimpleLoadBalancer<T>
66where
67 T: Send + Sync + Clone + 'static,
68{
69 async fn alloc(&self) -> T {
71 let entries = self.inner.entries.read().await;
72
73 entries[self.inner.cursor.fetch_add(1, Ordering::Relaxed) % entries.len()]
74 .value
75 .clone()
76 }
77
78 fn try_alloc(&self) -> Option<T> {
80 let entries = self.inner.entries.try_read().ok()?;
81
82 if entries.is_empty() {
83 return None;
84 }
85
86 Some(
87 entries[self.inner.cursor.fetch_add(1, Ordering::Relaxed) % entries.len()]
88 .value
89 .clone(),
90 )
91 }
92}
93
94#[async_trait]
95impl<T> BoxLoadBalancer<T> for SimpleLoadBalancer<T>
96where
97 T: Send + Sync + Clone + 'static,
98{
99 async fn alloc(&self) -> T {
101 LoadBalancer::alloc(self).await
102 }
103
104 fn try_alloc(&self) -> Option<T> {
106 LoadBalancer::try_alloc(self)
107 }
108}