azoth_bus/stream.rs
1//! Stream-based event consumption for async/await workflows
2//!
3//! Provides `futures::Stream` implementations for consuming events.
4
5use crate::{consumer::Consumer, error::Result, filter::Event};
6use futures::Stream;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10/// Type alias for the pending event future to reduce type complexity
11type PendingEventFuture = Pin<Box<dyn std::future::Future<Output = Result<Option<Event>>> + Send>>;
12
13/// A stream wrapper for a Consumer that yields events
14///
15/// This provides an ergonomic way to consume events using async/await:
16///
17/// ```ignore
18/// use futures::StreamExt;
19///
20/// let consumer = bus.subscribe("tasks", "worker")?;
21/// let mut stream = consumer.into_stream();
22///
23/// while let Some(result) = stream.next().await {
24/// let event = result?;
25/// println!("Got event: {}", event.id);
26/// stream.ack(event.id)?;
27/// }
28/// ```
29#[pin_project::pin_project]
30pub struct ConsumerStream {
31 consumer: Consumer,
32 #[pin]
33 pending_future: Option<PendingEventFuture>,
34}
35
36impl ConsumerStream {
37 /// Create a new ConsumerStream from a Consumer
38 pub fn new(consumer: Consumer) -> Self {
39 Self {
40 consumer,
41 pending_future: None,
42 }
43 }
44
45 /// Acknowledge an event after processing
46 ///
47 /// Call this after successfully processing an event to advance the cursor.
48 pub fn ack(&mut self, event_id: u64) -> Result<()> {
49 self.consumer.ack(event_id)
50 }
51
52 /// Get the consumer's stream name
53 pub fn stream_name(&self) -> &str {
54 self.consumer.stream()
55 }
56
57 /// Get the consumer's name
58 pub fn consumer_name(&self) -> &str {
59 self.consumer.name()
60 }
61
62 /// Get the current cursor position
63 pub fn position(&self) -> Result<Option<u64>> {
64 self.consumer.position()
65 }
66}
67
68// Note: Implementing Stream properly with async operations requires careful handling.
69// For now, we provide a simpler synchronous stream that yields available events.
70// For full async support with wake notifications, use Consumer.next_async() directly.
71
72impl Stream for ConsumerStream {
73 type Item = Result<Event>;
74
75 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76 let this = self.project();
77
78 // Try to get the next event synchronously
79 match this.consumer.next() {
80 Ok(Some(event)) => Poll::Ready(Some(Ok(event))),
81 Ok(None) => Poll::Pending, // No events available, will need to be woken
82 Err(e) => Poll::Ready(Some(Err(e))),
83 }
84 }
85}
86
87impl Consumer {
88 /// Convert this consumer into an async Stream
89 ///
90 /// The returned stream yields `Result<Event>` items.
91 /// After processing each event, call `ack()` on the stream.
92 ///
93 /// Note: For true async wake-up behavior with notifications,
94 /// use `next_async()` directly in a loop instead.
95 ///
96 /// # Example
97 ///
98 /// ```ignore
99 /// use futures::StreamExt;
100 ///
101 /// let consumer = bus.subscribe("orders", "processor")?;
102 /// let mut stream = consumer.into_stream();
103 ///
104 /// while let Some(result) = stream.next().await {
105 /// let event = result?;
106 /// process(&event).await?;
107 /// stream.ack(event.id)?;
108 /// }
109 /// ```
110 pub fn into_stream(self) -> ConsumerStream {
111 ConsumerStream::new(self)
112 }
113}
114
115/// Create an async event stream using `futures::stream::unfold`
116///
117/// This provides a properly async stream that integrates with the wake strategy.
118/// The stream will yield events as they become available, waiting asynchronously
119/// when no events are present.
120///
121/// Note: This stream does not auto-acknowledge events. You need to manually
122/// acknowledge events using the consumer after the stream is dropped or by
123/// tracking event IDs. For most use cases, prefer `auto_ack_stream()`.
124///
125/// # Example
126///
127/// ```ignore
128/// use futures::StreamExt;
129/// use azoth_bus::stream::event_stream;
130///
131/// let consumer = bus.subscribe("tasks", "worker")?;
132/// let mut stream = event_stream(consumer);
133///
134/// while let Some(result) = stream.next().await {
135/// match result {
136/// Ok(event) => {
137/// println!("Processing: {}", event.id);
138/// // Note: events are NOT auto-acknowledged
139/// }
140/// Err(e) => eprintln!("Error: {}", e),
141/// }
142/// }
143/// ```
144pub fn event_stream(consumer: Consumer) -> impl Stream<Item = Result<Event>> {
145 futures::stream::unfold(consumer, |mut consumer| async move {
146 match consumer.next_async().await {
147 Ok(Some(event)) => Some((Ok(event), consumer)),
148 Ok(None) => None, // Stream ended (shouldn't happen with async wait)
149 Err(e) => Some((Err(e), consumer)),
150 }
151 })
152}
153
154/// Create an auto-acknowledging async event stream
155///
156/// Events are automatically acknowledged after being yielded.
157/// Use this when you want simple fire-and-forget processing.
158///
159/// # Example
160///
161/// ```ignore
162/// use futures::StreamExt;
163/// use azoth_bus::stream::auto_ack_stream;
164///
165/// let consumer = bus.subscribe("logs", "processor")?;
166/// let mut stream = auto_ack_stream(consumer);
167///
168/// while let Some(result) = stream.next().await {
169/// let event = result?;
170/// println!("Logged: {:?}", event.payload);
171/// // Event is automatically acknowledged
172/// }
173/// ```
174pub fn auto_ack_stream(consumer: Consumer) -> impl Stream<Item = Result<Event>> {
175 futures::stream::unfold(consumer, |mut consumer| async move {
176 match consumer.next_async().await {
177 Ok(Some(event)) => {
178 let event_id = event.id;
179 // Auto-acknowledge after receiving
180 if let Err(e) = consumer.ack(event_id) {
181 return Some((Err(e), consumer));
182 }
183 Some((Ok(event), consumer))
184 }
185 Ok(None) => None,
186 Err(e) => Some((Err(e), consumer)),
187 }
188 })
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194 use crate::notification::WakeStrategy;
195 use azoth::{AzothDb, Transaction};
196 use std::sync::Arc;
197 use tempfile::TempDir;
198
199 fn test_db() -> (Arc<AzothDb>, TempDir) {
200 let temp = TempDir::new().unwrap();
201 let db = AzothDb::open(temp.path()).unwrap();
202 (Arc::new(db), temp)
203 }
204
205 fn publish_event(db: &AzothDb, event_type: &str, data: &str) -> Result<()> {
206 Transaction::new(db).execute(|ctx| {
207 ctx.log(event_type, &data)?;
208 Ok(())
209 })?;
210 Ok(())
211 }
212
213 #[test]
214 fn test_consumer_stream_creation() {
215 let (db, _temp) = test_db();
216
217 let consumer = Consumer::new(
218 db,
219 "test".to_string(),
220 "c1".to_string(),
221 WakeStrategy::default(),
222 )
223 .unwrap();
224
225 let stream = consumer.into_stream();
226 assert_eq!(stream.stream_name(), "test");
227 assert_eq!(stream.consumer_name(), "c1");
228 }
229
230 // Note: Async stream tests are tricky because Transaction::execute()
231 // panics in async context. We test the synchronous parts here and
232 // rely on the notification tests for async behavior.
233
234 #[test]
235 fn test_event_stream_sync() {
236 let (db, _temp) = test_db();
237
238 // Publish events first
239 publish_event(&db, "test:event1", "data1").unwrap();
240 publish_event(&db, "test:event2", "data2").unwrap();
241
242 let consumer = Consumer::new(
243 db,
244 "test".to_string(),
245 "c1".to_string(),
246 WakeStrategy::default(),
247 )
248 .unwrap();
249
250 // Use into_stream which provides a sync-compatible stream
251 let stream = consumer.into_stream();
252
253 // Stream name should be preserved
254 assert_eq!(stream.stream_name(), "test");
255 assert_eq!(stream.consumer_name(), "c1");
256
257 // We can ack through the stream
258 // Note: actual async streaming is tested in integration tests
259 }
260
261 #[test]
262 fn test_consumer_into_stream_preserves_state() {
263 let (db, _temp) = test_db();
264
265 publish_event(&db, "test:event1", "data1").unwrap();
266
267 let consumer = Consumer::new(
268 db,
269 "test".to_string(),
270 "c1".to_string(),
271 WakeStrategy::default(),
272 )
273 .unwrap();
274
275 let stream = consumer.into_stream();
276
277 // Position should be None initially (no events processed)
278 assert_eq!(stream.position().unwrap(), None);
279 }
280}