a3s_event/provider/mod.rs
1//! Event provider trait — the core abstraction for event backends
2//!
3//! All event backends (NATS, Redis, Kafka, in-memory, etc.) implement
4//! `EventProvider` to provide a uniform API for publish, subscribe, and query.
5
6use crate::error::Result;
7use crate::types::{Event, PublishOptions, ReceivedEvent, SubscribeOptions};
8use async_trait::async_trait;
9
10pub mod memory;
11pub mod nats;
12
13/// Core trait for event backends
14///
15/// Implementations handle the transport-specific details of event
16/// publishing, subscription, and persistence. The `EventBus` uses
17/// a provider to perform all operations.
18#[async_trait]
19pub trait EventProvider: Send + Sync {
20 /// Publish an event, returning the provider-assigned sequence number
21 async fn publish(&self, event: &Event) -> Result<u64>;
22
23 /// Create a durable subscription (survives reconnects)
24 ///
25 /// Returns a `Subscription` handle for receiving events.
26 async fn subscribe_durable(
27 &self,
28 consumer_name: &str,
29 filter_subject: &str,
30 ) -> Result<Box<dyn Subscription>>;
31
32 /// Create an ephemeral subscription (cleaned up on disconnect)
33 async fn subscribe(
34 &self,
35 filter_subject: &str,
36 ) -> Result<Box<dyn Subscription>>;
37
38 /// Fetch historical events from the backend
39 async fn history(
40 &self,
41 filter_subject: Option<&str>,
42 limit: usize,
43 ) -> Result<Vec<Event>>;
44
45 /// Delete a durable subscription by consumer name
46 async fn unsubscribe(&self, consumer_name: &str) -> Result<()>;
47
48 /// Get provider info (message count, etc.)
49 async fn info(&self) -> Result<ProviderInfo>;
50
51 /// Build a full subject from category and topic
52 fn build_subject(&self, category: &str, topic: &str) -> String;
53
54 /// Build a wildcard subject for a category
55 fn category_subject(&self, category: &str) -> String;
56
57 /// Provider name (e.g., "nats", "memory", "redis")
58 fn name(&self) -> &str;
59
60 /// Publish an event with provider-specific options
61 ///
62 /// Default implementation ignores options and delegates to `publish()`.
63 /// Providers that support deduplication, expected sequence, or custom
64 /// timeouts should override this.
65 async fn publish_with_options(&self, event: &Event, _opts: &PublishOptions) -> Result<u64> {
66 self.publish(event).await
67 }
68
69 /// Create a durable subscription with provider-specific options
70 ///
71 /// Default implementation ignores options and delegates to `subscribe_durable()`.
72 /// Providers that support max_deliver, backoff, max_ack_pending, or
73 /// deliver_policy should override this.
74 async fn subscribe_durable_with_options(
75 &self,
76 consumer_name: &str,
77 filter_subject: &str,
78 _opts: &SubscribeOptions,
79 ) -> Result<Box<dyn Subscription>> {
80 self.subscribe_durable(consumer_name, filter_subject).await
81 }
82
83 /// Create an ephemeral subscription with provider-specific options
84 ///
85 /// Default implementation ignores options and delegates to `subscribe()`.
86 async fn subscribe_with_options(
87 &self,
88 filter_subject: &str,
89 _opts: &SubscribeOptions,
90 ) -> Result<Box<dyn Subscription>> {
91 self.subscribe(filter_subject).await
92 }
93
94 /// Health check — returns true if the provider is connected and operational
95 ///
96 /// Default implementation delegates to `info()` and returns true if it succeeds.
97 /// Providers may override for more specific health checks.
98 async fn health(&self) -> Result<bool> {
99 self.info().await.map(|_| true)
100 }
101}
102
103/// Async subscription handle for receiving events
104///
105/// Provider-agnostic interface for consuming events from any backend.
106#[async_trait]
107pub trait Subscription: Send + Sync {
108 /// Receive the next event (auto-ack)
109 async fn next(&mut self) -> Result<Option<ReceivedEvent>>;
110
111 /// Receive the next event with manual ack control
112 async fn next_manual_ack(&mut self) -> Result<Option<PendingEvent>>;
113}
114
115/// An event pending acknowledgement
116pub struct PendingEvent {
117 /// The received event
118 pub received: ReceivedEvent,
119
120 /// Ack callback — call to confirm processing
121 ack_fn: Box<dyn FnOnce() -> futures::future::BoxFuture<'static, Result<()>> + Send>,
122
123 /// Nak callback — call to request redelivery
124 nak_fn: Box<dyn FnOnce() -> futures::future::BoxFuture<'static, Result<()>> + Send>,
125}
126
127impl PendingEvent {
128 /// Create a new pending event with ack/nak callbacks
129 pub fn new(
130 received: ReceivedEvent,
131 ack_fn: impl FnOnce() -> futures::future::BoxFuture<'static, Result<()>> + Send + 'static,
132 nak_fn: impl FnOnce() -> futures::future::BoxFuture<'static, Result<()>> + Send + 'static,
133 ) -> Self {
134 Self {
135 received,
136 ack_fn: Box::new(ack_fn),
137 nak_fn: Box::new(nak_fn),
138 }
139 }
140
141 /// Acknowledge successful processing
142 pub async fn ack(self) -> Result<()> {
143 (self.ack_fn)().await
144 }
145
146 /// Negative-acknowledge (request redelivery)
147 pub async fn nak(self) -> Result<()> {
148 (self.nak_fn)().await
149 }
150}
151
152/// Provider status information
153#[derive(Debug, Clone)]
154pub struct ProviderInfo {
155 /// Provider name
156 pub provider: String,
157 /// Total messages stored
158 pub messages: u64,
159 /// Total bytes used
160 pub bytes: u64,
161 /// Number of active consumers/subscribers
162 pub consumers: usize,
163}