use std::{collections::BTreeMap, sync::Arc, time::Duration};
use axum::async_trait;
use nodrift::Drifter;
use notmad::Component;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use crate::{
services::consumers::{ConsumerGroup, ConsumerId, ConsumersState},
state::SharedState,
};
#[derive(Clone)]
pub struct Broker {
state: SharedState,
handlers: Arc<RwLock<BTreeMap<ConsumerId, BrokerHandler>>>,
}
impl Broker {
pub fn new(state: &SharedState) -> Self {
Self {
state: state.clone(),
handlers: Arc::default(),
}
}
}
#[async_trait]
impl Component for Broker {
async fn run(
&self,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), notmad::MadError> {
let token = nodrift::schedule_drifter(Duration::from_secs(1), self.clone());
tokio::select! {
_ = token.cancelled() => {},
_ = cancellation_token.cancelled() => {
token.cancel()
},
}
Ok(())
}
}
#[async_trait]
impl Drifter for Broker {
async fn execute(&self, token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> {
tracing::trace!("reconciling broker");
let mut handlers = self.handlers.write().await;
let consumer_groups = self.state.consumers().get_consumer_groups().await;
let mut new_handlers = Vec::new();
let delete_handlers: Vec<ConsumerGroup> = Vec::new();
for consumer_group in consumer_groups {
if handlers.contains_key(&consumer_group.get_id().await) {
} else {
new_handlers.push(consumer_group);
}
}
for new_handler in new_handlers {
let consumer_id = new_handler.get_id().await;
tracing::debug!(consumer_id = consumer_id, "creating new handler");
handlers.insert(
new_handler.get_id().await,
BrokerHandler::new(&self.state, new_handler, token.child_token()),
);
}
for delete_handler in delete_handlers {
let consumer_id = delete_handler.get_id().await;
tracing::debug!(consumer_id = consumer_id, "deleting consumer");
handlers.remove(&delete_handler.get_id().await);
}
Ok(())
}
}
pub struct BrokerHandler {
token: CancellationToken,
}
impl BrokerHandler {
pub fn new(
state: &SharedState,
consumer_group: ConsumerGroup,
_parent_token: CancellationToken,
) -> Self {
let inner_state = state.clone();
let token = nodrift::schedule(Duration::from_secs(1), move || {
let consumer_group = consumer_group.clone();
let state = inner_state.clone();
async move {
let consumer_group = consumer_group;
if let Err(e) = consumer_group.reconcile_lag(&state).await {
tracing::warn!(
error = e.to_string(),
"failed to reconcile lag for consumer_group"
)
}
Ok(())
}
});
Self { token }
}
}
impl Drop for BrokerHandler {
fn drop(&mut self) {
self.token.cancel();
}
}