nodata 0.1.0

nodata is a kafka like message broker that is simple and easy to use, while relying on either local or s3 like data storage for consistency
use std::{collections::BTreeMap, sync::Arc};

use tokio::sync::RwLock;

use crate::state::SharedState;

use super::{handler::HandlerState, staging::StagingEvent};

pub type ConsumerId = String;
pub type ConsumerIndex = String;
pub type Topic = String;
pub type PartitionKey = String;
pub type TopicOffset = usize;

// Consumer will at some point contain an interface to send events over a channel to a lib
#[derive(Debug, Clone)]
pub struct Consumer {
    pub tx: tokio::sync::mpsc::Sender<StagingEvent>,
    pub rx: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<StagingEvent>>>,
}

#[derive(Default, Clone, Debug)]
pub struct ConsumerGroup {
    inner: Arc<RwLock<InnerConsumerGroup>>,
}

#[derive(Default, Debug)]
pub struct InnerConsumerGroup {
    pub id: ConsumerId,
    topic: Topic,
    pub(crate) offset: TopicOffset,
    pub(crate) cur_offset: TopicOffset,

    members: BTreeMap<ConsumerIndex, Consumer>,
}

impl ConsumerGroup {
    pub fn new(id: impl Into<ConsumerId>, topic: impl Into<Topic>) -> Self {
        Self {
            inner: Arc::new(RwLock::new(InnerConsumerGroup::new(id, topic))),
        }
    }

    pub async fn add_consumer(&self, index: ConsumerIndex, consumer: Consumer) {
        let mut inner = self.inner.write().await;

        inner.add_consumer(index, consumer);
    }

    pub async fn update_offset(&self, offset: TopicOffset) {
        let mut inner = self.inner.write().await;
        inner.update_offset(offset);
    }

    pub async fn reconcile_lag(&self, state: &SharedState) -> anyhow::Result<()> {
        let mut inner = self.inner.write().await;
        inner.reconcile_lag(state).await?;

        Ok(())
    }

    #[allow(dead_code)]
    pub async fn get_offset(&self) -> TopicOffset {
        let inner = self.inner.read().await;

        inner.offset
    }

    pub async fn get_id(&self) -> ConsumerId {
        let inner = self.inner.read().await;

        inner.id.clone()
    }
}

impl InnerConsumerGroup {
    pub fn new(id: impl Into<ConsumerId>, topic: impl Into<Topic>) -> Self {
        Self {
            id: id.into(),
            topic: topic.into(),
            offset: 0,
            cur_offset: 0,
            members: BTreeMap::default(),
        }
    }

    pub fn add_consumer(&mut self, index: ConsumerIndex, consumer: Consumer) {
        if self.members.insert(index.to_owned(), consumer).is_some() {
            tracing::warn!(index = index, "consumer replaced");
        }
    }

    pub fn update_offset(&mut self, offset: TopicOffset) {
        self.offset = offset;
    }

    pub async fn reconcile_lag(&mut self, state: &SharedState) -> anyhow::Result<()> {
        if self.offset <= self.cur_offset {
            return Ok(());
        }

        // TODO: replace with round robin or something;
        let consumer = match self.members.first_key_value() {
            Some((_, consumer)) => consumer,
            None => return Ok(()),
        };

        tracing::debug!(consumer_id = self.id, "sending update for consumer");
        state
            .handler()
            .handle_offset(&self.topic, consumer, self.cur_offset, self.offset)
            .await?;

        self.cur_offset = self.offset;

        Ok(())
    }
}

impl Default for Consumer {
    fn default() -> Self {
        Self::new()
    }
}

impl Consumer {
    pub fn new() -> Self {
        let (tx, rx) = tokio::sync::mpsc::channel(128);

        Self {
            tx,
            rx: Arc::new(tokio::sync::Mutex::new(rx)),
        }
    }
}

#[derive(Clone)]
pub struct Consumers {
    storage: Arc<RwLock<BTreeMap<ConsumerId, ConsumerGroup>>>,
    subscriptions: Arc<RwLock<BTreeMap<Topic, Vec<ConsumerId>>>>,
}

// message arrives in queue
// message is stored in staging
// after staging, topic and key as a partition is notifing active consumers.
// active consumers now listen and receive events, if they process ok, the offset of a consumer will move ahead in the partition
// so we keep a system of record of each consumer
impl Consumers {
    pub fn new() -> Self {
        Self {
            storage: Arc::default(),
            subscriptions: Arc::default(),
        }
    }

    pub async fn add_consumer(
        &self,
        id: impl Into<ConsumerId>,
        index: impl Into<ConsumerIndex>,
        topic: impl Into<Topic>,
    ) -> anyhow::Result<Option<Consumer>> {
        let id = id.into();
        let index = index.into();
        let topic = topic.into();

        let consumer = {
            let mut storage = self.storage.write().await;

            if !storage.contains_key(&id) {
                storage.insert(id.clone(), ConsumerGroup::new(&id, &topic));
            }

            let consumer_group = storage.get_mut(&id).unwrap();
            let consumer = Consumer::default();
            consumer_group
                .add_consumer(index.clone(), consumer.clone())
                .await;
            consumer
        };

        {
            let mut subscriptions = self.subscriptions.write().await;
            if !subscriptions.contains_key(&topic) {
                subscriptions.insert(topic.clone(), Vec::default());
            }

            let subscription_consumers = subscriptions.get_mut(&topic).unwrap();
            if !subscription_consumers.contains(&id) {
                subscription_consumers.push(id.clone());
            }
        }

        Ok(Some(consumer))
    }

    #[allow(dead_code)]
    pub async fn get_consumer_group(&self, id: impl Into<ConsumerId>) -> Option<ConsumerGroup> {
        let storage = self.storage.read().await;
        let consumer_group = storage.get(&id.into())?;

        Some(consumer_group.to_owned())
    }

    pub async fn get_consumer_groups(&self) -> Vec<ConsumerGroup> {
        let storage = self.storage.read().await;

        storage.iter().map(|(_, v)| v).cloned().collect()
    }

    pub async fn notify_update(
        &self,
        topic: impl Into<Topic>,
        offset: impl Into<TopicOffset>,
    ) -> anyhow::Result<()> {
        let topic = topic.into();
        let offset = offset.into();

        let subscriptions = self.subscriptions.read().await;
        let subscription = match subscriptions.get(&topic) {
            Some(s) => s,
            None => {
                tracing::debug!(topic = &topic, "no subscription for topic");
                return Ok(());
            }
        };

        let mut storage = self.storage.write().await;
        for consumer_id in subscription {
            match storage.get_mut(consumer_id) {
                Some(consumer_groups) => {
                    consumer_groups.update_offset(offset).await;
                }
                None => {
                    tracing::trace!(
                        topic = &topic,
                        consumer_id = &consumer_id,
                        "found no consumer"
                    )
                }
            }
        }

        Ok(())
    }
}

pub trait ConsumersState {
    fn consumers(&self) -> Consumers;
}

impl ConsumersState for SharedState {
    fn consumers(&self) -> Consumers {
        self.consumers.clone()
    }
}

#[cfg(test)]
mod test {
    use tracing_test::traced_test;

    use crate::services::staging::{Staging, StagingEvent};

    use super::*;

    #[tokio::test]
    async fn can_add_consumer() -> anyhow::Result<()> {
        let consumer_id = "some-consumer-id";
        let consumer_index = "some-consumer-index";
        let topic = "some-topic";

        let consumers = Consumers::new();

        consumers
            .add_consumer(consumer_id, consumer_index, topic)
            .await?;
        let consumer = consumers.get_consumer_group(consumer_id).await.unwrap();

        assert_eq!(0, consumer.get_offset().await);

        Ok(())
    }

    #[tokio::test]
    #[traced_test]
    async fn can_notify_consumer() -> anyhow::Result<()> {
        let consumer_id = "some-consumer-id".to_string();
        let consumer_index = "some-consumer-index".to_string();
        let topic = "some-topic".to_string();
        let offset = 9usize;

        let staging = Staging::new().await?;
        // Publish 10 messages
        for _ in 0..10 {
            let offset = staging
                .publish(StagingEvent {
                    topic: topic.clone(),
                    published: chrono::Utc::now(),
                    value: Vec::new(),
                })
                .await?;

            tracing::trace!("published offset: {}", offset);
        }

        let consumers = Consumers::new();

        consumers
            .add_consumer(&consumer_id, &consumer_index, &topic)
            .await?;
        let consumer = consumers.get_consumer_group(&consumer_id).await.unwrap();
        assert_eq!(0, consumer.get_offset().await);

        consumers.notify_update(&topic, offset).await?;
        let consumer = consumers.get_consumer_group(&consumer_id).await.unwrap();

        assert_eq!(9, consumer.get_offset().await);

        Ok(())
    }
}