mod construction;
mod contract_checking;
mod polling;
mod types;
#[cfg(test)]
mod tests;
pub use super::subscription_poller::{PollResult, SubscriptionPoller};
pub use types::{
ContractConfig, ContractStatus, ContractTracker, ContractsWiring, EofOutcome, ReaderProgress,
SubscriptionState,
};
use crate::contracts::ContractChain;
use crate::messaging::upstream_subscription_policy::ContractPolicyStack;
use obzenflow_core::control_middleware::ControlMiddlewareProvider;
use obzenflow_core::event::types::SeqNo;
use obzenflow_core::event::{ChainEvent, JournalEvent};
use obzenflow_core::journal::journal_reader::JournalReader;
use obzenflow_core::StageId;
use std::sync::Arc;
pub struct UpstreamSubscription<T>
where
T: JournalEvent,
{
delivery_filter: DeliveryFilter,
owner_label: String,
readers: Vec<(StageId, String, Box<dyn JournalReader<T>>)>,
state: SubscriptionState,
contract_tracker: Option<ContractTracker>,
contract_chains: Vec<Option<ContractChain>>,
contract_policies: Vec<Option<ContractPolicyStack>>,
control_middleware: Arc<dyn ControlMiddlewareProvider>,
last_eof_outcome: Option<EofOutcome>,
last_delivered_upstream_stage: Option<StageId>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum DeliveryFilter {
All,
TransportOnly,
}
impl<T> UpstreamSubscription<T>
where
T: JournalEvent + 'static,
{
pub fn last_delivered_upstream_stage(&self) -> Option<StageId> {
self.last_delivered_upstream_stage
}
pub fn notify_delivery_receipt(&mut self, receipt: &ChainEvent, upstream_stage: StageId) {
let Some(reader_stage) = self.contract_tracker.as_ref().and_then(|t| t.reader_stage) else {
return;
};
let Some(index) = self
.readers
.iter()
.position(|(id, _, _)| *id == upstream_stage)
else {
tracing::warn!(
owner = %self.owner_label,
?upstream_stage,
"notify_delivery_receipt: no reader slot for upstream stage"
);
return;
};
let Some(chain_slot) = self.contract_chains.get_mut(index) else {
return;
};
let Some(chain) = chain_slot.as_mut() else {
return;
};
chain.on_write(receipt, reader_stage, SeqNo(0));
}
pub fn take_last_eof_outcome(&mut self) -> Option<EofOutcome> {
self.last_eof_outcome.take()
}
pub fn last_eof_outcome(&self) -> Option<&EofOutcome> {
self.last_eof_outcome.as_ref()
}
pub fn has_pending(&self) -> bool {
self.state.has_pending()
}
pub fn upstream_count(&self) -> usize {
self.readers.len()
}
pub fn all_readers_eof(&self) -> bool {
self.state.eof_count() == self.readers.len()
}
pub fn all_readers_logically_eof(&self) -> bool {
self.state.logical_eof_count() == self.readers.len()
}
pub fn has_upstream(&self) -> bool {
!self.readers.is_empty()
}
}
#[async_trait::async_trait]
impl<T> SubscriptionPoller for UpstreamSubscription<T>
where
T: JournalEvent + 'static,
{
type Event = T;
async fn poll_next(&mut self) -> PollResult<Self::Event> {
UpstreamSubscription::poll_next(self).await
}
fn name(&self) -> &str {
"upstream_subscription"
}
}