1use crate::{BoxLoadBalancer, LoadBalancer};
2use async_trait::async_trait;
3use std::future::Future;
4use std::sync::{
5 Arc,
6 atomic::{AtomicUsize, Ordering},
7};
8use tokio::sync::RwLock;
9use tokio::task::yield_now;
10
11#[derive(Clone)]
13pub struct Entry<T>
14where
15 T: Send + Sync + Clone + 'static,
16{
17 pub value: T,
19}
20
21pub struct SimpleLoadBalancerRef<T>
23where
24 T: Send + Sync + Clone + 'static,
25{
26 pub entries: RwLock<Vec<Entry<T>>>,
28 pub cursor: AtomicUsize,
30}
31
32#[derive(Clone)]
34pub struct SimpleLoadBalancer<T>
35where
36 T: Send + Sync + Clone + 'static,
37{
38 inner: Arc<SimpleLoadBalancerRef<T>>,
40}
41
42impl<T> SimpleLoadBalancer<T>
43where
44 T: Send + Sync + Clone + 'static,
45{
46 pub fn new(entries: Vec<T>) -> Self {
48 Self {
49 inner: Arc::new(SimpleLoadBalancerRef {
50 entries: RwLock::new(entries.into_iter().map(|v| Entry { value: v }).collect()),
51 cursor: AtomicUsize::new(0),
52 }),
53 }
54 }
55
56 pub async fn update<F, R, N>(&self, handle: F) -> anyhow::Result<N>
58 where
59 F: Fn(Arc<SimpleLoadBalancerRef<T>>) -> R,
60 R: Future<Output = anyhow::Result<N>>,
61 {
62 handle(self.inner.clone()).await
63 }
64}
65
66impl<T> LoadBalancer<T> for SimpleLoadBalancer<T>
67where
68 T: Send + Sync + Clone + 'static,
69{
70 async fn alloc(&self) -> T {
72 loop {
73 match LoadBalancer::try_alloc(self) {
74 Some(v) => return v,
75 None => yield_now().await,
76 }
77 }
78 }
79
80 fn try_alloc(&self) -> Option<T> {
82 let entries = self.inner.entries.try_read().ok()?;
83
84 if entries.is_empty() {
85 return None;
86 }
87
88 Some(
89 entries[self.inner.cursor.fetch_add(1, Ordering::Relaxed) % entries.len()]
90 .value
91 .clone(),
92 )
93 }
94}
95
96#[async_trait]
97impl<T> BoxLoadBalancer<T> for SimpleLoadBalancer<T>
98where
99 T: Send + Sync + Clone + 'static,
100{
101 async fn alloc(&self) -> T {
103 LoadBalancer::alloc(self).await
104 }
105
106 fn try_alloc(&self) -> Option<T> {
108 LoadBalancer::try_alloc(self)
109 }
110}