use parking_lot::Mutex;
use std::{collections::VecDeque, sync::Arc};
use tokio::sync::Notify;
#[derive(Debug, Default)]
pub(crate) struct LoadBalancer {
connection_uris: Mutex<VecDeque<String>>,
notify_waiters: Arc<Notify>,
}
impl LoadBalancer {
pub fn new() -> Self {
Self::default()
}
pub fn add_connection(&self, endpoint_uri: String) {
let mut uris_guard = self.connection_uris.lock();
if !uris_guard.contains(&endpoint_uri) {
uris_guard.push_back(endpoint_uri.clone()); tracing::trace!(uri = %endpoint_uri, "LoadBalancer added connection URI");
self.notify_waiters.notify_one();
} else {
tracing::trace!(uri = %endpoint_uri, "LoadBalancer: Connection URI already present, not adding again.");
}
}
pub fn remove_connection(&self, endpoint_uri: &str) {
let mut uris_guard = self.connection_uris.lock();
if let Some(pos) = uris_guard.iter().position(|uri| uri == endpoint_uri) {
uris_guard.remove(pos);
tracing::trace!(uri = %endpoint_uri, "LoadBalancer removed connection URI");
} else {
tracing::trace!(uri = %endpoint_uri, "LoadBalancer: Connection URI not found for removal.");
}
}
pub fn get_next_connection_uri(&self) -> Option<String> {
let mut uris_guard = self.connection_uris.lock();
if let Some(uri) = uris_guard.pop_front() {
uris_guard.push_back(uri.clone()); Some(uri) } else {
None
}
}
pub async fn wait_for_connection(&self) {
let notify = self.notify_waiters.clone();
loop {
if !self.connection_uris.lock().is_empty() {
return;
}
notify.notified().await;
}
}
pub fn has_connections(&self) -> bool {
!self.connection_uris.lock().is_empty()
}
pub async fn connection_count(&self) -> usize {
self.connection_uris.lock().len()
}
}