Skip to main content

a3s_event/provider/
mod.rs

1//! Event provider trait — the core abstraction for event backends
2//!
3//! All event backends (NATS, Redis, Kafka, in-memory, etc.) implement
4//! `EventProvider` to provide a uniform API for publish, subscribe, and query.
5
6use crate::error::Result;
7use crate::types::{Event, PublishOptions, ReceivedEvent, SubscribeOptions};
8use crate::types::BoxFuture;
9use async_trait::async_trait;
10
11pub mod memory;
12#[cfg(feature = "nats")]
13pub mod nats;
14
15/// Core trait for event backends
16///
17/// Implementations handle the transport-specific details of event
18/// publishing, subscription, and persistence. The `EventBus` uses
19/// a provider to perform all operations.
20#[async_trait]
21pub trait EventProvider: Send + Sync {
22    /// Publish an event, returning the provider-assigned sequence number
23    async fn publish(&self, event: &Event) -> Result<u64>;
24
25    /// Create a durable subscription (survives reconnects)
26    ///
27    /// Returns a `Subscription` handle for receiving events.
28    async fn subscribe_durable(
29        &self,
30        consumer_name: &str,
31        filter_subject: &str,
32    ) -> Result<Box<dyn Subscription>>;
33
34    /// Create an ephemeral subscription (cleaned up on disconnect)
35    async fn subscribe(
36        &self,
37        filter_subject: &str,
38    ) -> Result<Box<dyn Subscription>>;
39
40    /// Fetch historical events from the backend
41    async fn history(
42        &self,
43        filter_subject: Option<&str>,
44        limit: usize,
45    ) -> Result<Vec<Event>>;
46
47    /// Delete a durable subscription by consumer name
48    async fn unsubscribe(&self, consumer_name: &str) -> Result<()>;
49
50    /// Get provider info (message count, etc.)
51    async fn info(&self) -> Result<ProviderInfo>;
52
53    /// Build a full subject from category and topic
54    ///
55    /// Default: `"{prefix}.{category}.{topic}"` using `subject_prefix()`.
56    fn build_subject(&self, category: &str, topic: &str) -> String {
57        format!("{}.{}.{}", self.subject_prefix(), category, topic)
58    }
59
60    /// Build a wildcard subject for a category
61    ///
62    /// Default: `"{prefix}.{category}.>"` using `subject_prefix()`.
63    fn category_subject(&self, category: &str) -> String {
64        format!("{}.{}.>", self.subject_prefix(), category)
65    }
66
67    /// Subject prefix for this provider (e.g., "events")
68    ///
69    /// Used by the default `build_subject()` and `category_subject()` implementations.
70    fn subject_prefix(&self) -> &str;
71
72    /// Provider name (e.g., "nats", "memory", "redis")
73    fn name(&self) -> &str;
74
75    /// Publish an event with provider-specific options
76    ///
77    /// Default implementation ignores options and delegates to `publish()`.
78    /// Providers that support deduplication, expected sequence, or custom
79    /// timeouts should override this.
80    async fn publish_with_options(&self, event: &Event, _opts: &PublishOptions) -> Result<u64> {
81        self.publish(event).await
82    }
83
84    /// Create a durable subscription with provider-specific options
85    ///
86    /// Default implementation ignores options and delegates to `subscribe_durable()`.
87    /// Providers that support max_deliver, backoff, max_ack_pending, or
88    /// deliver_policy should override this.
89    async fn subscribe_durable_with_options(
90        &self,
91        consumer_name: &str,
92        filter_subject: &str,
93        _opts: &SubscribeOptions,
94    ) -> Result<Box<dyn Subscription>> {
95        self.subscribe_durable(consumer_name, filter_subject).await
96    }
97
98    /// Create an ephemeral subscription with provider-specific options
99    ///
100    /// Default implementation ignores options and delegates to `subscribe()`.
101    async fn subscribe_with_options(
102        &self,
103        filter_subject: &str,
104        _opts: &SubscribeOptions,
105    ) -> Result<Box<dyn Subscription>> {
106        self.subscribe(filter_subject).await
107    }
108
109    /// Health check — returns true if the provider is connected and operational
110    ///
111    /// Default implementation delegates to `info()` and returns true if it succeeds.
112    /// Providers may override for more specific health checks.
113    async fn health(&self) -> Result<bool> {
114        self.info().await.map(|_| true)
115    }
116}
117
118/// Async subscription handle for receiving events
119///
120/// Provider-agnostic interface for consuming events from any backend.
121#[async_trait]
122pub trait Subscription: Send + Sync {
123    /// Receive the next event (auto-ack)
124    async fn next(&mut self) -> Result<Option<ReceivedEvent>>;
125
126    /// Receive the next event with manual ack control
127    async fn next_manual_ack(&mut self) -> Result<Option<PendingEvent>>;
128}
129
130/// An event pending acknowledgement
131pub struct PendingEvent {
132    /// The received event
133    pub received: ReceivedEvent,
134
135    /// Ack callback — call to confirm processing
136    ack_fn: Box<dyn FnOnce() -> BoxFuture<'static, Result<()>> + Send>,
137
138    /// Nak callback — call to request redelivery
139    nak_fn: Box<dyn FnOnce() -> BoxFuture<'static, Result<()>> + Send>,
140}
141
142impl PendingEvent {
143    /// Create a new pending event with ack/nak callbacks
144    pub fn new(
145        received: ReceivedEvent,
146        ack_fn: impl FnOnce() -> BoxFuture<'static, Result<()>> + Send + 'static,
147        nak_fn: impl FnOnce() -> BoxFuture<'static, Result<()>> + Send + 'static,
148    ) -> Self {
149        Self {
150            received,
151            ack_fn: Box::new(ack_fn),
152            nak_fn: Box::new(nak_fn),
153        }
154    }
155
156    /// Acknowledge successful processing
157    pub async fn ack(self) -> Result<()> {
158        (self.ack_fn)().await
159    }
160
161    /// Negative-acknowledge (request redelivery)
162    pub async fn nak(self) -> Result<()> {
163        (self.nak_fn)().await
164    }
165}
166
167/// Provider status information
168#[derive(Debug, Clone)]
169pub struct ProviderInfo {
170    /// Provider name
171    pub provider: String,
172    /// Total messages stored
173    pub messages: u64,
174    /// Total bytes used
175    pub bytes: u64,
176    /// Number of active consumers/subscribers
177    pub consumers: usize,
178}