nodata 0.1.0

nodata is a kafka like message broker that is simple and easy to use, while relying on either local or s3 like data storage for consistency
use std::{collections::BTreeMap, sync::Arc};

use nodata_storage::backend::local::LocalStorageBackend;
use tokio::sync::RwLock;

use crate::state::SharedState;

use super::consumers::{Topic, TopicOffset};

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StagingEvent {
    pub topic: String,
    pub published: chrono::DateTime<chrono::Utc>,
    pub value: Vec<u8>,
}

#[derive(Clone)]
pub struct Staging {
    // Temporary until we've got an actual file disk store
    #[allow(clippy::complexity)]
    store: Arc<RwLock<BTreeMap<Topic, Vec<StagingEvent>>>>,

    storage: nodata_storage::Storage,
}

impl Staging {
    pub async fn new() -> anyhow::Result<Self> {
        Ok(Self {
            store: Arc::default(),
            storage: nodata_storage::Storage::new_from_env().await?,
        })
    }

    pub async fn publish(
        &self,
        staging_event: impl Into<StagingEvent>,
    ) -> anyhow::Result<TopicOffset> {
        let staging_event: StagingEvent = staging_event.into();
        let mut store = self.store.write().await;
        tracing::trace!(topic = staging_event.topic, "moving event to staging");

        self.storage
            .push_message(&staging_event.topic, &staging_event.value)
            .await?;

        let offset = match store.get_mut(&staging_event.topic) {
            Some(part) => {
                part.push(staging_event);
                part.len() - 1
            }
            None => {
                tracing::debug!(topic = staging_event.topic, "new topic, creating partition");
                store.insert(staging_event.topic.to_owned(), vec![staging_event]);

                0
            }
        };

        Ok(offset)
    }

    pub async fn get_topics(&self) -> anyhow::Result<Vec<String>> {
        let store = self.store.read().await;

        Ok(store.keys().cloned().collect::<Vec<_>>())
    }

    pub async fn get_topic_offset(
        &self,
        topic: impl Into<String>,
        start: impl Into<TopicOffset>,
        end: impl Into<TopicOffset>,
    ) -> anyhow::Result<Vec<StagingEvent>> {
        let topic = topic.into();
        let start = start.into();
        let end = end.into();

        if start == end {
            return Ok(Vec::new());
        }

        if start > end {
            anyhow::bail!(
                "start cannot be greater than end, (start={}, end={})",
                start,
                end
            )
        }

        let store = self.store.read().await;

        let partitions = match store.get(&topic) {
            Some(partitions) => partitions,
            None => {
                anyhow::bail!("topic doesn't exist in storage: {}", &topic);
            }
        };

        if partitions.len() < end {
            anyhow::bail!(
                "partition len is less than the offset, (partition_len={}, offset={})",
                partitions.len(),
                end
            )
        }

        Ok(partitions[start + 1..=end].to_vec())
    }
}

#[allow(dead_code)]
pub trait StagingState {
    fn staging(&self) -> Staging;
}

impl StagingState for SharedState {
    fn staging(&self) -> Staging {
        self.staging.clone()
    }
}