use std::{collections::BTreeMap, sync::Arc};
use nodata_storage::backend::local::LocalStorageBackend;
use tokio::sync::RwLock;
use crate::state::SharedState;
use super::consumers::{Topic, TopicOffset};
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StagingEvent {
pub topic: String,
pub published: chrono::DateTime<chrono::Utc>,
pub value: Vec<u8>,
}
#[derive(Clone)]
pub struct Staging {
#[allow(clippy::complexity)]
store: Arc<RwLock<BTreeMap<Topic, Vec<StagingEvent>>>>,
storage: nodata_storage::Storage,
}
impl Staging {
pub async fn new() -> anyhow::Result<Self> {
Ok(Self {
store: Arc::default(),
storage: nodata_storage::Storage::new_from_env().await?,
})
}
pub async fn publish(
&self,
staging_event: impl Into<StagingEvent>,
) -> anyhow::Result<TopicOffset> {
let staging_event: StagingEvent = staging_event.into();
let mut store = self.store.write().await;
tracing::trace!(topic = staging_event.topic, "moving event to staging");
self.storage
.push_message(&staging_event.topic, &staging_event.value)
.await?;
let offset = match store.get_mut(&staging_event.topic) {
Some(part) => {
part.push(staging_event);
part.len() - 1
}
None => {
tracing::debug!(topic = staging_event.topic, "new topic, creating partition");
store.insert(staging_event.topic.to_owned(), vec![staging_event]);
0
}
};
Ok(offset)
}
pub async fn get_topics(&self) -> anyhow::Result<Vec<String>> {
let store = self.store.read().await;
Ok(store.keys().cloned().collect::<Vec<_>>())
}
pub async fn get_topic_offset(
&self,
topic: impl Into<String>,
start: impl Into<TopicOffset>,
end: impl Into<TopicOffset>,
) -> anyhow::Result<Vec<StagingEvent>> {
let topic = topic.into();
let start = start.into();
let end = end.into();
if start == end {
return Ok(Vec::new());
}
if start > end {
anyhow::bail!(
"start cannot be greater than end, (start={}, end={})",
start,
end
)
}
let store = self.store.read().await;
let partitions = match store.get(&topic) {
Some(partitions) => partitions,
None => {
anyhow::bail!("topic doesn't exist in storage: {}", &topic);
}
};
if partitions.len() < end {
anyhow::bail!(
"partition len is less than the offset, (partition_len={}, offset={})",
partitions.len(),
end
)
}
Ok(partitions[start + 1..=end].to_vec())
}
}
#[allow(dead_code)]
pub trait StagingState {
fn staging(&self) -> Staging;
}
impl StagingState for SharedState {
fn staging(&self) -> Staging {
self.staging.clone()
}
}