mod bus;
mod emitter;
mod store;
mod webhook;
pub use bus::{EventBus, EventReceiver, EventSubscription};
pub use emitter::EventEmitter;
pub use store::InMemoryEventStore;
pub use webhook::{
Webhook, WebhookConfig, WebhookDelivery, WebhookManager, WebhookRegistrationError,
};
#[cfg(feature = "sqlite-events")]
pub use store::SqliteEventStore;
#[cfg(feature = "postgres")]
pub use store::PostgresEventStore;
use stateset_core::{CommerceEvent, EventStore};
use std::sync::Arc;
#[derive(Clone)]
pub struct EventConfig {
pub channel_capacity: usize,
pub persist_events: bool,
pub event_store: Option<Arc<dyn EventStore + Send + Sync>>,
pub max_in_memory_events: usize,
pub enable_webhooks: bool,
pub webhook_max_retries: u32,
pub webhook_timeout_secs: u64,
pub webhook_max_in_flight: usize,
pub webhook_retry_delay_ms: u64,
pub webhook_max_delivery_history: usize,
pub webhook_outbound_allowlist: Vec<String>,
}
impl Default for EventConfig {
fn default() -> Self {
Self {
channel_capacity: 1024,
persist_events: true,
event_store: None,
max_in_memory_events: 10_000,
enable_webhooks: true,
webhook_max_retries: 3,
webhook_timeout_secs: 30,
webhook_max_in_flight: 8,
webhook_retry_delay_ms: 1000,
webhook_max_delivery_history: 1_000,
webhook_outbound_allowlist: Vec::new(),
}
}
}
impl std::fmt::Debug for EventConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventConfig")
.field("channel_capacity", &self.channel_capacity)
.field("persist_events", &self.persist_events)
.field("event_store", &self.event_store.as_ref().map(|_| "<custom>"))
.field("max_in_memory_events", &self.max_in_memory_events)
.field("enable_webhooks", &self.enable_webhooks)
.field("webhook_max_retries", &self.webhook_max_retries)
.field("webhook_timeout_secs", &self.webhook_timeout_secs)
.field("webhook_max_in_flight", &self.webhook_max_in_flight)
.field("webhook_retry_delay_ms", &self.webhook_retry_delay_ms)
.field("webhook_max_delivery_history", &self.webhook_max_delivery_history)
.field("webhook_outbound_allowlist", &self.webhook_outbound_allowlist)
.finish()
}
}
pub struct EventSystem {
bus: Arc<EventBus>,
emitter: EventEmitter,
webhook_manager: Option<WebhookManager>,
event_store: Option<Arc<dyn EventStore + Send + Sync>>,
config: EventConfig,
}
impl std::fmt::Debug for EventSystem {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventSystem").field("config", &self.config).finish_non_exhaustive()
}
}
impl EventSystem {
pub fn new() -> Self {
Self::with_config(EventConfig::default())
}
pub fn with_config(config: EventConfig) -> Self {
let config = EventConfig {
channel_capacity: config.channel_capacity.max(1),
max_in_memory_events: config.max_in_memory_events.max(1),
webhook_max_in_flight: config.webhook_max_in_flight.max(1),
webhook_timeout_secs: config.webhook_timeout_secs.max(1),
webhook_retry_delay_ms: config.webhook_retry_delay_ms.max(1),
..config
};
let bus = Arc::new(EventBus::new(config.channel_capacity));
let emitter = EventEmitter::new(bus.clone());
let webhook_manager = if config.enable_webhooks {
Some(WebhookManager::with_config(WebhookConfig {
max_retries: config.webhook_max_retries,
timeout_secs: config.webhook_timeout_secs,
max_in_flight: config.webhook_max_in_flight,
retry_delay_ms: config.webhook_retry_delay_ms,
max_delivery_history: config.webhook_max_delivery_history,
outbound_allowlist: config.webhook_outbound_allowlist.clone(),
}))
} else {
None
};
let event_store =
if config.persist_events {
Some(config.event_store.clone().unwrap_or_else(|| {
Arc::new(InMemoryEventStore::new(config.max_in_memory_events))
}))
} else {
None
};
Self { bus, emitter, webhook_manager, event_store, config }
}
#[cfg(test)]
const fn is_webhooks_enabled(&self) -> bool {
self.webhook_manager.is_some()
}
}
impl EventSystem {
pub const fn emitter(&self) -> &EventEmitter {
&self.emitter
}
pub fn subscribe(&self) -> EventSubscription {
self.bus.subscribe()
}
pub fn subscribe_filtered<F>(&self, filter: F) -> FilteredSubscription<F>
where
F: Fn(&CommerceEvent) -> bool + Send + 'static,
{
FilteredSubscription { inner: self.bus.subscribe(), filter }
}
pub fn register_webhook(&self, webhook: Webhook) -> uuid::Uuid {
self.register_webhook_strict(webhook).unwrap_or_else(|error| {
tracing::warn!(error = %error, "Webhook registration fallback returned nil");
uuid::Uuid::nil()
})
}
pub fn register_webhook_strict(
&self,
webhook: Webhook,
) -> Result<uuid::Uuid, WebhookRegistrationError> {
self.webhook_manager
.as_ref()
.ok_or(WebhookRegistrationError::WebhooksDisabled)
.and_then(|wm| wm.register_strict(webhook))
}
pub fn try_register_webhook(&self, webhook: Webhook) -> Option<uuid::Uuid> {
self.webhook_manager.as_ref().and_then(|wm| wm.try_register(webhook))
}
pub fn unregister_webhook(&self, id: uuid::Uuid) -> bool {
self.webhook_manager.as_ref().map(|wm| wm.unregister(id)).unwrap_or(false)
}
pub fn list_webhooks(&self) -> Vec<Webhook> {
self.webhook_manager.as_ref().map(|wm| wm.list()).unwrap_or_default()
}
pub fn webhook_deliveries(&self, webhook_id: uuid::Uuid) -> Vec<WebhookDelivery> {
self.webhook_manager.as_ref().map(|wm| wm.deliveries(webhook_id)).unwrap_or_default()
}
pub const fn bus(&self) -> &Arc<EventBus> {
&self.bus
}
pub const fn config(&self) -> &EventConfig {
&self.config
}
pub fn event_store(&self) -> Option<&(dyn EventStore + Send + Sync)> {
self.event_store.as_deref()
}
pub fn emit(&self, event: CommerceEvent) {
if let Some(store) = &self.event_store {
if let Err(err) = store.append(&event) {
tracing::error!(
error = %err,
event_type = event.event_type(),
"Failed to persist event"
);
}
}
self.emitter.emit(event.clone());
if let Some(ref wm) = self.webhook_manager {
wm.deliver(event);
}
}
pub fn subscriber_count(&self) -> usize {
self.bus.receiver_count()
}
pub fn bus_publish_failures(&self) -> u64 {
self.emitter.total_publish_failures()
}
}
impl Default for EventSystem {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_event_system_config_normalizes_zero_limits() {
let event_system = EventSystem::with_config(EventConfig {
channel_capacity: 0,
max_in_memory_events: 0,
webhook_max_in_flight: 0,
webhook_timeout_secs: 0,
webhook_retry_delay_ms: 0,
persist_events: false,
enable_webhooks: false,
..Default::default()
});
let config = event_system.config();
assert_eq!(config.channel_capacity, 1);
assert_eq!(config.max_in_memory_events, 1);
assert_eq!(config.webhook_max_in_flight, 1);
assert_eq!(config.webhook_timeout_secs, 1);
assert_eq!(config.webhook_retry_delay_ms, 1);
assert!(!event_system.is_webhooks_enabled());
assert!(event_system.event_store().is_none());
}
#[test]
fn test_event_system_config_keeps_webhook_disabled() {
let event_system = EventSystem::with_config(EventConfig {
enable_webhooks: false,
persist_events: false,
..Default::default()
});
assert!(!event_system.is_webhooks_enabled());
assert!(event_system.event_store().is_none());
}
}
pub struct FilteredSubscription<F> {
inner: EventSubscription,
filter: F,
}
impl<F> std::fmt::Debug for FilteredSubscription<F> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FilteredSubscription").finish_non_exhaustive()
}
}
impl<F> FilteredSubscription<F>
where
F: Fn(&CommerceEvent) -> bool,
{
pub async fn recv(&mut self) -> Option<CommerceEvent> {
loop {
match self.inner.recv().await {
Some(event) if (self.filter)(&event) => return Some(event),
Some(_) => continue,
None => return None,
}
}
}
}
pub mod filters {
use stateset_core::CommerceEvent;
pub const fn orders_only(event: &CommerceEvent) -> bool {
matches!(
event,
CommerceEvent::OrderCreated { .. }
| CommerceEvent::OrderStatusChanged { .. }
| CommerceEvent::OrderPaymentStatusChanged { .. }
| CommerceEvent::OrderFulfillmentStatusChanged { .. }
| CommerceEvent::OrderCancelled { .. }
| CommerceEvent::OrderItemAdded { .. }
| CommerceEvent::OrderItemRemoved { .. }
)
}
pub const fn inventory_only(event: &CommerceEvent) -> bool {
matches!(
event,
CommerceEvent::InventoryItemCreated { .. }
| CommerceEvent::InventoryAdjusted { .. }
| CommerceEvent::InventoryReserved { .. }
| CommerceEvent::InventoryReservationReleased { .. }
| CommerceEvent::InventoryReservationConfirmed { .. }
| CommerceEvent::LowStockAlert { .. }
)
}
pub const fn customers_only(event: &CommerceEvent) -> bool {
matches!(
event,
CommerceEvent::CustomerCreated { .. }
| CommerceEvent::CustomerUpdated { .. }
| CommerceEvent::CustomerStatusChanged { .. }
| CommerceEvent::CustomerAddressAdded { .. }
)
}
pub const fn products_only(event: &CommerceEvent) -> bool {
matches!(
event,
CommerceEvent::ProductCreated { .. }
| CommerceEvent::ProductUpdated { .. }
| CommerceEvent::ProductStatusChanged { .. }
| CommerceEvent::ProductVariantAdded { .. }
| CommerceEvent::ProductVariantUpdated { .. }
)
}
pub const fn returns_only(event: &CommerceEvent) -> bool {
matches!(
event,
CommerceEvent::ReturnRequested { .. }
| CommerceEvent::ReturnStatusChanged { .. }
| CommerceEvent::ReturnApproved { .. }
| CommerceEvent::ReturnRejected { .. }
| CommerceEvent::ReturnCompleted { .. }
| CommerceEvent::RefundIssued { .. }
)
}
pub const fn low_stock_alerts(event: &CommerceEvent) -> bool {
matches!(event, CommerceEvent::LowStockAlert { .. })
}
pub fn event_types(types: &'static [&'static str]) -> impl Fn(&CommerceEvent) -> bool {
move |event| types.contains(&event.event_type())
}
}