use crate::state::SharedState;
use super::{
consumers::Consumers,
staging::{Staging, StagingEvent},
};
#[derive(Clone)]
pub struct Ingest {
staging: Staging,
consumers: Consumers,
}
impl Ingest {
pub fn new(staging: Staging, consumers: Consumers) -> Self {
Self { staging, consumers }
}
pub async fn publish(&self, event: impl Into<StagingEvent>) -> anyhow::Result<()> {
let event: StagingEvent = event.into();
let topic = event.topic.clone();
let offset = self.staging.publish(event).await?;
self.consumers.notify_update(topic, offset).await?;
Ok(())
}
}
pub trait IngestState {
fn ingest(&self) -> Ingest;
}
impl IngestState for SharedState {
fn ingest(&self) -> Ingest {
Ingest::new(self.staging.clone(), self.consumers.clone())
}
}