nodata 0.1.0

nodata is a kafka like message broker that is simple and easy to use, while relying on either local or s3 like data storage for consistency
use std::{collections::BTreeMap, sync::Arc, time::Duration};

use axum::async_trait;
use nodrift::Drifter;
use notmad::Component;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;

use crate::{
    services::consumers::{ConsumerGroup, ConsumerId, ConsumersState},
    state::SharedState,
};

#[derive(Clone)]
pub struct Broker {
    state: SharedState,

    handlers: Arc<RwLock<BTreeMap<ConsumerId, BrokerHandler>>>,
}

impl Broker {
    pub fn new(state: &SharedState) -> Self {
        Self {
            state: state.clone(),
            handlers: Arc::default(),
        }
    }
}

#[async_trait]
impl Component for Broker {
    async fn run(
        &self,
        cancellation_token: tokio_util::sync::CancellationToken,
    ) -> Result<(), notmad::MadError> {
        let token = nodrift::schedule_drifter(Duration::from_secs(1), self.clone());

        tokio::select! {
            _ = token.cancelled() => {},
            _ = cancellation_token.cancelled() => {
                token.cancel()
            },
        }

        Ok(())
    }
}

#[async_trait]
impl Drifter for Broker {
    async fn execute(&self, token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> {
        tracing::trace!("reconciling broker");

        // Execute listen to broker, and execute and register a potential drifter for each consumer group available

        let mut handlers = self.handlers.write().await;
        let consumer_groups = self.state.consumers().get_consumer_groups().await;

        let mut new_handlers = Vec::new();
        let delete_handlers: Vec<ConsumerGroup> = Vec::new();
        for consumer_group in consumer_groups {
            if handlers.contains_key(&consumer_group.get_id().await) {
                //delete_handlers.push(consumer_group);
            } else {
                new_handlers.push(consumer_group);
            }
        }

        for new_handler in new_handlers {
            let consumer_id = new_handler.get_id().await;
            tracing::debug!(consumer_id = consumer_id, "creating new handler");
            handlers.insert(
                new_handler.get_id().await,
                BrokerHandler::new(&self.state, new_handler, token.child_token()),
            );
        }

        for delete_handler in delete_handlers {
            let consumer_id = delete_handler.get_id().await;
            tracing::debug!(consumer_id = consumer_id, "deleting consumer");
            handlers.remove(&delete_handler.get_id().await);
        }

        Ok(())
    }
}

pub struct BrokerHandler {
    token: CancellationToken,
}
impl BrokerHandler {
    pub fn new(
        state: &SharedState,
        consumer_group: ConsumerGroup,
        _parent_token: CancellationToken,
    ) -> Self {
        let inner_state = state.clone();
        let token = nodrift::schedule(Duration::from_secs(1), move || {
            let consumer_group = consumer_group.clone();
            let state = inner_state.clone();

            async move {
                let consumer_group = consumer_group;

                if let Err(e) = consumer_group.reconcile_lag(&state).await {
                    tracing::warn!(
                        error = e.to_string(),
                        "failed to reconcile lag for consumer_group"
                    )
                }

                // Look at offset diffs between, current and lag
                // if diff, pick a member
                // execute update for member

                Ok(())
            }
        });

        Self { token }
    }
}

impl Drop for BrokerHandler {
    fn drop(&mut self) {
        self.token.cancel();
    }
}