use crate::state::SharedState;
use super::{
consumers::{Consumer, Topic, TopicOffset},
staging::Staging,
};
#[derive(Clone)]
pub struct Handler {
staging: Staging,
}
impl Handler {
pub fn new(staging: Staging) -> Self {
Self { staging }
}
pub async fn handle_offset(
&self,
topic: &Topic,
consumer: &Consumer,
start_offset: TopicOffset,
end_offset: TopicOffset,
) -> anyhow::Result<()> {
let events = self
.staging
.get_topic_offset(topic, start_offset, end_offset)
.await?;
for event in events {
tracing::trace!("handling event: {:?}", event);
consumer.tx.send(event).await?;
}
Ok(())
}
}
pub trait HandlerState {
fn handler(&self) -> Handler;
}
impl HandlerState for SharedState {
fn handler(&self) -> Handler {
self.handler.clone()
}
}