Skip to main content

a3s_event/provider/
mod.rs

1//! Event provider trait — the core abstraction for event backends
2//!
3//! All event backends (NATS, Redis, Kafka, in-memory, etc.) implement
4//! `EventProvider` to provide a uniform API for publish, subscribe, and query.
5
6use crate::error::Result;
7use crate::types::{Event, PublishOptions, ReceivedEvent, SubscribeOptions};
8use async_trait::async_trait;
9
10pub mod memory;
11pub mod nats;
12
13/// Core trait for event backends
14///
15/// Implementations handle the transport-specific details of event
16/// publishing, subscription, and persistence. The `EventBus` uses
17/// a provider to perform all operations.
18#[async_trait]
19pub trait EventProvider: Send + Sync {
20    /// Publish an event, returning the provider-assigned sequence number
21    async fn publish(&self, event: &Event) -> Result<u64>;
22
23    /// Create a durable subscription (survives reconnects)
24    ///
25    /// Returns a `Subscription` handle for receiving events.
26    async fn subscribe_durable(
27        &self,
28        consumer_name: &str,
29        filter_subject: &str,
30    ) -> Result<Box<dyn Subscription>>;
31
32    /// Create an ephemeral subscription (cleaned up on disconnect)
33    async fn subscribe(
34        &self,
35        filter_subject: &str,
36    ) -> Result<Box<dyn Subscription>>;
37
38    /// Fetch historical events from the backend
39    async fn history(
40        &self,
41        filter_subject: Option<&str>,
42        limit: usize,
43    ) -> Result<Vec<Event>>;
44
45    /// Delete a durable subscription by consumer name
46    async fn unsubscribe(&self, consumer_name: &str) -> Result<()>;
47
48    /// Get provider info (message count, etc.)
49    async fn info(&self) -> Result<ProviderInfo>;
50
51    /// Build a full subject from category and topic
52    fn build_subject(&self, category: &str, topic: &str) -> String;
53
54    /// Build a wildcard subject for a category
55    fn category_subject(&self, category: &str) -> String;
56
57    /// Provider name (e.g., "nats", "memory", "redis")
58    fn name(&self) -> &str;
59
60    /// Publish an event with provider-specific options
61    ///
62    /// Default implementation ignores options and delegates to `publish()`.
63    /// Providers that support deduplication, expected sequence, or custom
64    /// timeouts should override this.
65    async fn publish_with_options(&self, event: &Event, _opts: &PublishOptions) -> Result<u64> {
66        self.publish(event).await
67    }
68
69    /// Create a durable subscription with provider-specific options
70    ///
71    /// Default implementation ignores options and delegates to `subscribe_durable()`.
72    /// Providers that support max_deliver, backoff, max_ack_pending, or
73    /// deliver_policy should override this.
74    async fn subscribe_durable_with_options(
75        &self,
76        consumer_name: &str,
77        filter_subject: &str,
78        _opts: &SubscribeOptions,
79    ) -> Result<Box<dyn Subscription>> {
80        self.subscribe_durable(consumer_name, filter_subject).await
81    }
82
83    /// Create an ephemeral subscription with provider-specific options
84    ///
85    /// Default implementation ignores options and delegates to `subscribe()`.
86    async fn subscribe_with_options(
87        &self,
88        filter_subject: &str,
89        _opts: &SubscribeOptions,
90    ) -> Result<Box<dyn Subscription>> {
91        self.subscribe(filter_subject).await
92    }
93
94    /// Health check — returns true if the provider is connected and operational
95    ///
96    /// Default implementation delegates to `info()` and returns true if it succeeds.
97    /// Providers may override for more specific health checks.
98    async fn health(&self) -> Result<bool> {
99        self.info().await.map(|_| true)
100    }
101}
102
103/// Async subscription handle for receiving events
104///
105/// Provider-agnostic interface for consuming events from any backend.
106#[async_trait]
107pub trait Subscription: Send + Sync {
108    /// Receive the next event (auto-ack)
109    async fn next(&mut self) -> Result<Option<ReceivedEvent>>;
110
111    /// Receive the next event with manual ack control
112    async fn next_manual_ack(&mut self) -> Result<Option<PendingEvent>>;
113}
114
115/// An event pending acknowledgement
116pub struct PendingEvent {
117    /// The received event
118    pub received: ReceivedEvent,
119
120    /// Ack callback — call to confirm processing
121    ack_fn: Box<dyn FnOnce() -> futures::future::BoxFuture<'static, Result<()>> + Send>,
122
123    /// Nak callback — call to request redelivery
124    nak_fn: Box<dyn FnOnce() -> futures::future::BoxFuture<'static, Result<()>> + Send>,
125}
126
127impl PendingEvent {
128    /// Create a new pending event with ack/nak callbacks
129    pub fn new(
130        received: ReceivedEvent,
131        ack_fn: impl FnOnce() -> futures::future::BoxFuture<'static, Result<()>> + Send + 'static,
132        nak_fn: impl FnOnce() -> futures::future::BoxFuture<'static, Result<()>> + Send + 'static,
133    ) -> Self {
134        Self {
135            received,
136            ack_fn: Box::new(ack_fn),
137            nak_fn: Box::new(nak_fn),
138        }
139    }
140
141    /// Acknowledge successful processing
142    pub async fn ack(self) -> Result<()> {
143        (self.ack_fn)().await
144    }
145
146    /// Negative-acknowledge (request redelivery)
147    pub async fn nak(self) -> Result<()> {
148        (self.nak_fn)().await
149    }
150}
151
152/// Provider status information
153#[derive(Debug, Clone)]
154pub struct ProviderInfo {
155    /// Provider name
156    pub provider: String,
157    /// Total messages stored
158    pub messages: u64,
159    /// Total bytes used
160    pub bytes: u64,
161    /// Number of active consumers/subscribers
162    pub consumers: usize,
163}