a3s_event/provider/mod.rs
1//! Event provider trait — the core abstraction for event backends
2//!
3//! All event backends (NATS, Redis, Kafka, in-memory, etc.) implement
4//! `EventProvider` to provide a uniform API for publish, subscribe, and query.
5
6use crate::error::Result;
7use crate::types::{Event, PublishOptions, ReceivedEvent, SubscribeOptions};
8use crate::types::BoxFuture;
9use async_trait::async_trait;
10
11pub mod memory;
12#[cfg(feature = "nats")]
13pub mod nats;
14
15/// Core trait for event backends
16///
17/// Implementations handle the transport-specific details of event
18/// publishing, subscription, and persistence. The `EventBus` uses
19/// a provider to perform all operations.
20#[async_trait]
21pub trait EventProvider: Send + Sync {
22 /// Publish an event, returning the provider-assigned sequence number
23 async fn publish(&self, event: &Event) -> Result<u64>;
24
25 /// Create a durable subscription (survives reconnects)
26 ///
27 /// Returns a `Subscription` handle for receiving events.
28 async fn subscribe_durable(
29 &self,
30 consumer_name: &str,
31 filter_subject: &str,
32 ) -> Result<Box<dyn Subscription>>;
33
34 /// Create an ephemeral subscription (cleaned up on disconnect)
35 async fn subscribe(
36 &self,
37 filter_subject: &str,
38 ) -> Result<Box<dyn Subscription>>;
39
40 /// Fetch historical events from the backend
41 async fn history(
42 &self,
43 filter_subject: Option<&str>,
44 limit: usize,
45 ) -> Result<Vec<Event>>;
46
47 /// Delete a durable subscription by consumer name
48 async fn unsubscribe(&self, consumer_name: &str) -> Result<()>;
49
50 /// Get provider info (message count, etc.)
51 async fn info(&self) -> Result<ProviderInfo>;
52
53 /// Build a full subject from category and topic
54 ///
55 /// Default: `"{prefix}.{category}.{topic}"` using `subject_prefix()`.
56 fn build_subject(&self, category: &str, topic: &str) -> String {
57 format!("{}.{}.{}", self.subject_prefix(), category, topic)
58 }
59
60 /// Build a wildcard subject for a category
61 ///
62 /// Default: `"{prefix}.{category}.>"` using `subject_prefix()`.
63 fn category_subject(&self, category: &str) -> String {
64 format!("{}.{}.>", self.subject_prefix(), category)
65 }
66
67 /// Subject prefix for this provider (e.g., "events")
68 ///
69 /// Used by the default `build_subject()` and `category_subject()` implementations.
70 fn subject_prefix(&self) -> &str;
71
72 /// Provider name (e.g., "nats", "memory", "redis")
73 fn name(&self) -> &str;
74
75 /// Publish an event with provider-specific options
76 ///
77 /// Default implementation ignores options and delegates to `publish()`.
78 /// Providers that support deduplication, expected sequence, or custom
79 /// timeouts should override this.
80 async fn publish_with_options(&self, event: &Event, _opts: &PublishOptions) -> Result<u64> {
81 self.publish(event).await
82 }
83
84 /// Create a durable subscription with provider-specific options
85 ///
86 /// Default implementation ignores options and delegates to `subscribe_durable()`.
87 /// Providers that support max_deliver, backoff, max_ack_pending, or
88 /// deliver_policy should override this.
89 async fn subscribe_durable_with_options(
90 &self,
91 consumer_name: &str,
92 filter_subject: &str,
93 _opts: &SubscribeOptions,
94 ) -> Result<Box<dyn Subscription>> {
95 self.subscribe_durable(consumer_name, filter_subject).await
96 }
97
98 /// Create an ephemeral subscription with provider-specific options
99 ///
100 /// Default implementation ignores options and delegates to `subscribe()`.
101 async fn subscribe_with_options(
102 &self,
103 filter_subject: &str,
104 _opts: &SubscribeOptions,
105 ) -> Result<Box<dyn Subscription>> {
106 self.subscribe(filter_subject).await
107 }
108
109 /// Health check — returns true if the provider is connected and operational
110 ///
111 /// Default implementation delegates to `info()` and returns true if it succeeds.
112 /// Providers may override for more specific health checks.
113 async fn health(&self) -> Result<bool> {
114 self.info().await.map(|_| true)
115 }
116}
117
118/// Async subscription handle for receiving events
119///
120/// Provider-agnostic interface for consuming events from any backend.
121#[async_trait]
122pub trait Subscription: Send + Sync {
123 /// Receive the next event (auto-ack)
124 async fn next(&mut self) -> Result<Option<ReceivedEvent>>;
125
126 /// Receive the next event with manual ack control
127 async fn next_manual_ack(&mut self) -> Result<Option<PendingEvent>>;
128}
129
130/// An event pending acknowledgement
131pub struct PendingEvent {
132 /// The received event
133 pub received: ReceivedEvent,
134
135 /// Ack callback — call to confirm processing
136 ack_fn: Box<dyn FnOnce() -> BoxFuture<'static, Result<()>> + Send>,
137
138 /// Nak callback — call to request redelivery
139 nak_fn: Box<dyn FnOnce() -> BoxFuture<'static, Result<()>> + Send>,
140}
141
142impl PendingEvent {
143 /// Create a new pending event with ack/nak callbacks
144 pub fn new(
145 received: ReceivedEvent,
146 ack_fn: impl FnOnce() -> BoxFuture<'static, Result<()>> + Send + 'static,
147 nak_fn: impl FnOnce() -> BoxFuture<'static, Result<()>> + Send + 'static,
148 ) -> Self {
149 Self {
150 received,
151 ack_fn: Box::new(ack_fn),
152 nak_fn: Box::new(nak_fn),
153 }
154 }
155
156 /// Acknowledge successful processing
157 pub async fn ack(self) -> Result<()> {
158 (self.ack_fn)().await
159 }
160
161 /// Negative-acknowledge (request redelivery)
162 pub async fn nak(self) -> Result<()> {
163 (self.nak_fn)().await
164 }
165}
166
167/// Provider status information
168#[derive(Debug, Clone)]
169pub struct ProviderInfo {
170 /// Provider name
171 pub provider: String,
172 /// Total messages stored
173 pub messages: u64,
174 /// Total bytes used
175 pub bytes: u64,
176 /// Number of active consumers/subscribers
177 pub consumers: usize,
178}