use crate::{EventEnvelope, Result};
use async_trait::async_trait;
pub mod channel;
pub mod worker;
pub use channel::ChannelDispatcher;
pub use worker::{WorkerConfig, WorkerPool};
#[async_trait]
pub trait EventDispatcher: Send + Sync {
async fn start(&mut self) -> Result<()>;
async fn stop(&mut self) -> Result<()>;
async fn shutdown_gracefully(&mut self) -> Result<()> {
self.stop().await
}
async fn dispatch(&self, envelope: EventEnvelope) -> Result<()>;
async fn replay_pending(&self) -> Result<()> {
Ok(())
}
fn is_running(&self) -> bool;
fn stats(&self) -> DispatcherStats;
}
#[derive(Debug, Clone, Default)]
pub struct DispatcherStats {
pub events_dispatched: u64,
pub queue_size: usize,
pub dispatch_errors: u64,
pub avg_dispatch_time_us: u64,
pub max_queue_size: usize,
}
impl DispatcherStats {
pub fn new() -> Self {
Self::default()
}
pub fn update_dispatch_time(&mut self, time_us: u64) {
if self.events_dispatched == 0 {
self.avg_dispatch_time_us = time_us;
} else {
self.avg_dispatch_time_us = (self.avg_dispatch_time_us * self.events_dispatched
+ time_us)
/ (self.events_dispatched + 1);
}
}
}
#[derive(Debug, Clone)]
pub struct DispatcherConfig {
pub max_queue_size: usize,
pub worker_threads: usize,
pub drop_on_full: bool,
pub processing_timeout_ms: u64,
pub enable_metrics: bool,
}
impl Default for DispatcherConfig {
fn default() -> Self {
Self {
max_queue_size: 10_000,
worker_threads: num_cpus::get(),
drop_on_full: false,
processing_timeout_ms: 5_000,
enable_metrics: true,
}
}
}
impl DispatcherConfig {
pub fn new() -> Self {
Self::default()
}
pub fn max_queue_size(mut self, size: usize) -> Self {
self.max_queue_size = size;
self
}
pub fn worker_threads(mut self, threads: usize) -> Self {
self.worker_threads = threads;
self
}
pub fn drop_on_full(mut self, drop: bool) -> Self {
self.drop_on_full = drop;
self
}
pub fn processing_timeout_ms(mut self, timeout: u64) -> Self {
self.processing_timeout_ms = timeout;
self
}
pub fn enable_metrics(mut self, enable: bool) -> Self {
self.enable_metrics = enable;
self
}
}
#[derive(Debug)]
pub struct NoOpDispatcher;
#[async_trait]
impl EventDispatcher for NoOpDispatcher {
async fn start(&mut self) -> Result<()> {
Ok(())
}
async fn stop(&mut self) -> Result<()> {
Ok(())
}
async fn dispatch(&self, _envelope: EventEnvelope) -> Result<()> {
Ok(())
}
fn is_running(&self) -> bool {
true
}
fn stats(&self) -> DispatcherStats {
DispatcherStats::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dispatcher_config() {
let config = DispatcherConfig::new()
.max_queue_size(5000)
.worker_threads(4)
.drop_on_full(true);
assert_eq!(config.max_queue_size, 5000);
assert_eq!(config.worker_threads, 4);
assert!(config.drop_on_full);
}
#[test]
fn test_dispatcher_stats() {
let mut stats = DispatcherStats::new();
stats.update_dispatch_time(100);
assert_eq!(stats.avg_dispatch_time_us, 100);
stats.events_dispatched = 1;
stats.update_dispatch_time(200);
assert_eq!(stats.avg_dispatch_time_us, 150); }
}