load_balancer/
simple.rs

1use crate::{BoxLoadBalancer, LoadBalancer};
2use async_trait::async_trait;
3use std::future::Future;
4use std::sync::{
5    Arc,
6    atomic::{AtomicUsize, Ordering},
7};
8use tokio::sync::RwLock;
9use tokio::task::yield_now;
10
11/// A single entry in the simple load balancer.
12#[derive(Clone)]
13pub struct Entry<T>
14where
15    T: Send + Sync + Clone + 'static,
16{
17    /// The underlying value stored in this entry.
18    pub value: T,
19}
20
21/// Internal reference structure for `SimpleLoadBalancer`.
22pub struct SimpleLoadBalancerRef<T>
23where
24    T: Send + Sync + Clone + 'static,
25{
26    /// The list of entries managed by the load balancer.
27    pub entries: RwLock<Vec<Entry<T>>>,
28    /// The current index for sequential allocation.
29    pub cursor: AtomicUsize,
30}
31
32/// A simple load balancer that selects entries in sequential order.
33#[derive(Clone)]
34pub struct SimpleLoadBalancer<T>
35where
36    T: Send + Sync + Clone + 'static,
37{
38    /// Shared inner state.
39    inner: Arc<SimpleLoadBalancerRef<T>>,
40}
41
42impl<T> SimpleLoadBalancer<T>
43where
44    T: Send + Sync + Clone + 'static,
45{
46    /// Create a new `SimpleLoadBalancer` from a list of values.
47    pub fn new(entries: Vec<T>) -> Self {
48        Self {
49            inner: Arc::new(SimpleLoadBalancerRef {
50                entries: RwLock::new(entries.into_iter().map(|v| Entry { value: v }).collect()),
51                cursor: AtomicUsize::new(0),
52            }),
53        }
54    }
55
56    /// Update the inner state using an async callback.
57    pub async fn update<F, R>(&self, handle: F) -> anyhow::Result<()>
58    where
59        F: Fn(Arc<SimpleLoadBalancerRef<T>>) -> R,
60        R: Future<Output = anyhow::Result<()>>,
61    {
62        handle(self.inner.clone()).await
63    }
64}
65
66impl<T> LoadBalancer<T> for SimpleLoadBalancer<T>
67where
68    T: Send + Sync + Clone + 'static,
69{
70    /// Asynchronously allocate the next entry in sequence.
71    async fn alloc(&self) -> T {
72        loop {
73            let entries = self.inner.entries.read().await;
74
75            if entries.is_empty() {
76                drop(entries);
77                yield_now().await;
78            } else {
79                return entries[self.inner.cursor.fetch_add(1, Ordering::Relaxed) % entries.len()]
80                    .value
81                    .clone();
82            }
83        }
84    }
85
86    /// Try to allocate the next entry in sequence without awaiting.
87    fn try_alloc(&self) -> Option<T> {
88        let entries = self.inner.entries.try_read().ok()?;
89
90        if entries.is_empty() {
91            return None;
92        }
93
94        Some(
95            entries[self.inner.cursor.fetch_add(1, Ordering::Relaxed) % entries.len()]
96                .value
97                .clone(),
98        )
99    }
100}
101
102#[async_trait]
103impl<T> BoxLoadBalancer<T> for SimpleLoadBalancer<T>
104where
105    T: Send + Sync + Clone + 'static,
106{
107    /// Asynchronously allocate the next entry (BoxLoadBalancer version).
108    async fn alloc(&self) -> T {
109        LoadBalancer::alloc(self).await
110    }
111
112    /// Try to allocate the next entry (BoxLoadBalancer version).
113    fn try_alloc(&self) -> Option<T> {
114        LoadBalancer::try_alloc(self)
115    }
116}