use crate::{
DeadLetterPayload,
EventBusError,
EventBusResult,
EventEnvelope,
IntoEventBusResult,
PublishOptions,
SubscribeOptions,
Subscription,
Topic,
};
use std::time::Duration;
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct BatchPublishFailure {
index: usize,
event_id: String,
error: EventBusError,
}
impl BatchPublishFailure {
pub(crate) fn new(index: usize, event_id: String, error: EventBusError) -> Self {
Self {
index,
event_id,
error,
}
}
pub fn index(&self) -> usize {
self.index
}
pub fn event_id(&self) -> &str {
&self.event_id
}
pub fn error(&self) -> &EventBusError {
&self.error
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct BatchPublishResult {
total_count: usize,
accepted_count: usize,
dropped_count: usize,
failures: Vec<BatchPublishFailure>,
}
impl BatchPublishResult {
pub(crate) fn new(total_count: usize) -> Self {
Self {
total_count,
accepted_count: 0,
dropped_count: 0,
failures: Vec::new(),
}
}
pub(crate) fn record_accepted(&mut self) {
self.accepted_count += 1;
}
pub(crate) fn record_dropped(&mut self) {
self.dropped_count += 1;
}
pub(crate) fn record_failure(&mut self, failure: BatchPublishFailure) {
self.failures.push(failure);
}
pub fn total_count(&self) -> usize {
self.total_count
}
pub fn accepted_count(&self) -> usize {
self.accepted_count
}
pub fn dropped_count(&self) -> usize {
self.dropped_count
}
pub fn failure_count(&self) -> usize {
self.failures.len()
}
pub fn failures(&self) -> &[BatchPublishFailure] {
&self.failures
}
pub fn is_success(&self) -> bool {
self.failures.is_empty()
}
}
pub trait EventBus: Clone + Send + Sync + 'static {
fn start(&self) -> EventBusResult<bool>;
fn shutdown(&self) -> bool;
fn publish<T>(&self, topic: &Topic<T>, payload: T) -> EventBusResult<()>
where
T: Clone + Send + Sync + 'static,
{
self.publish_envelope(EventEnvelope::create(topic.clone(), payload))
}
fn publish_with_options<T>(
&self,
topic: &Topic<T>,
payload: T,
options: PublishOptions<T>,
) -> EventBusResult<()>
where
T: Clone + Send + Sync + 'static,
{
self.publish_envelope_with_options(EventEnvelope::create(topic.clone(), payload), options)
}
fn publish_envelope<T>(&self, envelope: EventEnvelope<T>) -> EventBusResult<()>
where
T: Clone + Send + Sync + 'static,
{
self.publish_envelope_with_options(envelope, PublishOptions::empty())
}
fn publish_envelope_with_options<T>(
&self,
envelope: EventEnvelope<T>,
options: PublishOptions<T>,
) -> EventBusResult<()>
where
T: Clone + Send + Sync + 'static;
fn publish_all<T>(&self, envelopes: Vec<EventEnvelope<T>>) -> EventBusResult<BatchPublishResult>
where
T: Clone + Send + Sync + 'static,
{
self.publish_all_with_options(envelopes, PublishOptions::empty())
}
fn publish_all_with_options<T>(
&self,
envelopes: Vec<EventEnvelope<T>>,
options: PublishOptions<T>,
) -> EventBusResult<BatchPublishResult>
where
T: Clone + Send + Sync + 'static,
{
let mut result = BatchPublishResult::new(envelopes.len());
for (index, envelope) in envelopes.into_iter().enumerate() {
let event_id = envelope.id().to_string();
match self.publish_envelope_with_options(envelope, options.clone()) {
Ok(()) => result.record_accepted(),
Err(error) => {
result.record_failure(BatchPublishFailure::new(index, event_id, error));
}
}
}
Ok(result)
}
fn subscribe<T, S, F, R>(
&self,
subscriber_id: S,
topic: &Topic<T>,
handler: F,
) -> EventBusResult<Subscription<T>>
where
T: Clone + Send + Sync + 'static,
S: Into<String>,
F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
R: IntoEventBusResult + 'static,
{
self.subscribe_with_options(subscriber_id, topic, handler, SubscribeOptions::empty())
}
fn subscribe_with_options<T, S, F, R>(
&self,
subscriber_id: S,
topic: &Topic<T>,
handler: F,
options: SubscribeOptions<T>,
) -> EventBusResult<Subscription<T>>
where
T: Clone + Send + Sync + 'static,
S: Into<String>,
F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
R: IntoEventBusResult + 'static;
fn add_dead_letter_handler<F, R>(
&self,
dead_letter_topic: &Topic<DeadLetterPayload>,
handler: F,
options: SubscribeOptions<DeadLetterPayload>,
) -> EventBusResult<Subscription<DeadLetterPayload>>
where
F: Fn(EventEnvelope<DeadLetterPayload>) -> R + Send + Sync + 'static,
R: IntoEventBusResult + 'static,
{
self.subscribe_with_options(
format!("dead-letter:{}", dead_letter_topic.name()),
dead_letter_topic,
handler,
options,
)
}
fn wait_for_idle<T>(&self, topic: &Topic<T>) -> EventBusResult<()>
where
T: 'static;
fn wait_for_idle_timeout<T>(&self, topic: &Topic<T>, timeout: Duration) -> EventBusResult<bool>
where
T: 'static;
}