use super::subscription_poller::{PollResult, SubscriptionPoller};
use obzenflow_core::event::payloads::flow_control_payload::FlowControlPayload;
use obzenflow_core::event::{ChainEvent, ChainEventContent, JournalEvent};
use obzenflow_core::journal::journal_reader::JournalReader;
use obzenflow_core::EventEnvelope;
use std::any::Any;
pub struct SystemSubscription<T>
where
T: JournalEvent,
{
reader: Box<dyn JournalReader<T>>,
eof_received: bool,
stage_name: String,
}
impl<T> SystemSubscription<T>
where
T: JournalEvent + 'static,
{
pub fn new(reader: Box<dyn JournalReader<T>>, stage_name: String) -> Self {
Self {
reader,
eof_received: false,
stage_name,
}
}
fn is_eof_event(&self, envelope: &EventEnvelope<T>) -> bool {
if let Some(chain_event) = (&envelope.event as &dyn Any).downcast_ref::<ChainEvent>() {
return matches!(
&chain_event.content,
ChainEventContent::FlowControl(FlowControlPayload::Eof { .. })
);
}
false
}
pub fn stage_name(&self) -> &str {
&self.stage_name
}
pub fn is_eof(&self) -> bool {
self.eof_received
}
}
#[async_trait::async_trait]
impl<T> SubscriptionPoller for SystemSubscription<T>
where
T: JournalEvent + 'static,
{
type Event = T;
async fn poll_next(&mut self) -> PollResult<Self::Event> {
if self.eof_received {
return PollResult::NoEvents;
}
match self.reader.next().await {
Ok(Some(envelope)) => {
if self.is_eof_event(&envelope) {
self.eof_received = true;
tracing::info!("SystemSubscription[{}] received EOF event", self.stage_name);
}
tracing::trace!(
"SystemSubscription[{}] received event: {}",
self.stage_name,
envelope.event.id()
);
PollResult::Event(envelope)
}
Ok(None) => {
tracing::trace!(
"SystemSubscription[{}] no events available",
self.stage_name
);
PollResult::NoEvents
}
Err(e) => {
tracing::error!("SystemSubscription[{}] error: {}", self.stage_name, e);
PollResult::Error(Box::new(e))
}
}
}
fn name(&self) -> &str {
&self.stage_name
}
}