Skip to main content

azoth_bus/
stream.rs

1//! Stream-based event consumption for async/await workflows
2//!
3//! Provides `futures::Stream` implementations for consuming events.
4
5use crate::{consumer::Consumer, error::Result, filter::Event};
6use futures::Stream;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10/// Type alias for the pending event future to reduce type complexity
11type PendingEventFuture = Pin<Box<dyn std::future::Future<Output = Result<Option<Event>>> + Send>>;
12
13/// A stream wrapper for a Consumer that yields events
14///
15/// This provides an ergonomic way to consume events using async/await:
16///
17/// ```ignore
18/// use futures::StreamExt;
19///
20/// let consumer = bus.subscribe("tasks", "worker")?;
21/// let mut stream = consumer.into_stream();
22///
23/// while let Some(result) = stream.next().await {
24///     let event = result?;
25///     println!("Got event: {}", event.id);
26///     stream.ack(event.id)?;
27/// }
28/// ```
29#[pin_project::pin_project]
30pub struct ConsumerStream {
31    consumer: Consumer,
32    #[pin]
33    pending_future: Option<PendingEventFuture>,
34}
35
36impl ConsumerStream {
37    /// Create a new ConsumerStream from a Consumer
38    pub fn new(consumer: Consumer) -> Self {
39        Self {
40            consumer,
41            pending_future: None,
42        }
43    }
44
45    /// Acknowledge an event after processing
46    ///
47    /// Call this after successfully processing an event to advance the cursor.
48    pub fn ack(&mut self, event_id: u64) -> Result<()> {
49        self.consumer.ack(event_id)
50    }
51
52    /// Get the consumer's stream name
53    pub fn stream_name(&self) -> &str {
54        self.consumer.stream()
55    }
56
57    /// Get the consumer's name
58    pub fn consumer_name(&self) -> &str {
59        self.consumer.name()
60    }
61
62    /// Get the current cursor position
63    pub fn position(&self) -> Result<Option<u64>> {
64        self.consumer.position()
65    }
66}
67
68// Note: Implementing Stream properly with async operations requires careful handling.
69// For now, we provide a simpler synchronous stream that yields available events.
70// For full async support with wake notifications, use Consumer.next_async() directly.
71
72impl Stream for ConsumerStream {
73    type Item = Result<Event>;
74
75    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76        let this = self.project();
77
78        // Try to get the next event synchronously
79        match this.consumer.next() {
80            Ok(Some(event)) => Poll::Ready(Some(Ok(event))),
81            Ok(None) => Poll::Pending, // No events available, will need to be woken
82            Err(e) => Poll::Ready(Some(Err(e))),
83        }
84    }
85}
86
87impl Consumer {
88    /// Convert this consumer into an async Stream
89    ///
90    /// The returned stream yields `Result<Event>` items.
91    /// After processing each event, call `ack()` on the stream.
92    ///
93    /// Note: For true async wake-up behavior with notifications,
94    /// use `next_async()` directly in a loop instead.
95    ///
96    /// # Example
97    ///
98    /// ```ignore
99    /// use futures::StreamExt;
100    ///
101    /// let consumer = bus.subscribe("orders", "processor")?;
102    /// let mut stream = consumer.into_stream();
103    ///
104    /// while let Some(result) = stream.next().await {
105    ///     let event = result?;
106    ///     process(&event).await?;
107    ///     stream.ack(event.id)?;
108    /// }
109    /// ```
110    pub fn into_stream(self) -> ConsumerStream {
111        ConsumerStream::new(self)
112    }
113}
114
115/// Create an async event stream using `futures::stream::unfold`
116///
117/// This provides a properly async stream that integrates with the wake strategy.
118/// The stream will yield events as they become available, waiting asynchronously
119/// when no events are present.
120///
121/// Note: This stream does not auto-acknowledge events. You need to manually
122/// acknowledge events using the consumer after the stream is dropped or by
123/// tracking event IDs. For most use cases, prefer `auto_ack_stream()`.
124///
125/// # Example
126///
127/// ```ignore
128/// use futures::StreamExt;
129/// use azoth_bus::stream::event_stream;
130///
131/// let consumer = bus.subscribe("tasks", "worker")?;
132/// let mut stream = event_stream(consumer);
133///
134/// while let Some(result) = stream.next().await {
135///     match result {
136///         Ok(event) => {
137///             println!("Processing: {}", event.id);
138///             // Note: events are NOT auto-acknowledged
139///         }
140///         Err(e) => eprintln!("Error: {}", e),
141///     }
142/// }
143/// ```
144pub fn event_stream(consumer: Consumer) -> impl Stream<Item = Result<Event>> {
145    futures::stream::unfold(consumer, |mut consumer| async move {
146        match consumer.next_async().await {
147            Ok(Some(event)) => Some((Ok(event), consumer)),
148            Ok(None) => None, // Stream ended (shouldn't happen with async wait)
149            Err(e) => Some((Err(e), consumer)),
150        }
151    })
152}
153
154/// Create an auto-acknowledging async event stream
155///
156/// Events are automatically acknowledged after being yielded.
157/// Use this when you want simple fire-and-forget processing.
158///
159/// # Example
160///
161/// ```ignore
162/// use futures::StreamExt;
163/// use azoth_bus::stream::auto_ack_stream;
164///
165/// let consumer = bus.subscribe("logs", "processor")?;
166/// let mut stream = auto_ack_stream(consumer);
167///
168/// while let Some(result) = stream.next().await {
169///     let event = result?;
170///     println!("Logged: {:?}", event.payload);
171///     // Event is automatically acknowledged
172/// }
173/// ```
174pub fn auto_ack_stream(consumer: Consumer) -> impl Stream<Item = Result<Event>> {
175    futures::stream::unfold(consumer, |mut consumer| async move {
176        match consumer.next_async().await {
177            Ok(Some(event)) => {
178                let event_id = event.id;
179                // Auto-acknowledge after receiving
180                if let Err(e) = consumer.ack(event_id) {
181                    return Some((Err(e), consumer));
182                }
183                Some((Ok(event), consumer))
184            }
185            Ok(None) => None,
186            Err(e) => Some((Err(e), consumer)),
187        }
188    })
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194    use crate::notification::WakeStrategy;
195    use azoth::{AzothDb, Transaction};
196    use std::sync::Arc;
197    use tempfile::TempDir;
198
199    fn test_db() -> (Arc<AzothDb>, TempDir) {
200        let temp = TempDir::new().unwrap();
201        let db = AzothDb::open(temp.path()).unwrap();
202        (Arc::new(db), temp)
203    }
204
205    fn publish_event(db: &AzothDb, event_type: &str, data: &str) -> Result<()> {
206        Transaction::new(db).execute(|ctx| {
207            ctx.log(event_type, &data)?;
208            Ok(())
209        })?;
210        Ok(())
211    }
212
213    #[test]
214    fn test_consumer_stream_creation() {
215        let (db, _temp) = test_db();
216
217        let consumer = Consumer::new(
218            db,
219            "test".to_string(),
220            "c1".to_string(),
221            WakeStrategy::default(),
222        )
223        .unwrap();
224
225        let stream = consumer.into_stream();
226        assert_eq!(stream.stream_name(), "test");
227        assert_eq!(stream.consumer_name(), "c1");
228    }
229
230    // Note: Async stream tests are tricky because Transaction::execute()
231    // panics in async context. We test the synchronous parts here and
232    // rely on the notification tests for async behavior.
233
234    #[test]
235    fn test_event_stream_sync() {
236        let (db, _temp) = test_db();
237
238        // Publish events first
239        publish_event(&db, "test:event1", "data1").unwrap();
240        publish_event(&db, "test:event2", "data2").unwrap();
241
242        let consumer = Consumer::new(
243            db,
244            "test".to_string(),
245            "c1".to_string(),
246            WakeStrategy::default(),
247        )
248        .unwrap();
249
250        // Use into_stream which provides a sync-compatible stream
251        let stream = consumer.into_stream();
252
253        // Stream name should be preserved
254        assert_eq!(stream.stream_name(), "test");
255        assert_eq!(stream.consumer_name(), "c1");
256
257        // We can ack through the stream
258        // Note: actual async streaming is tested in integration tests
259    }
260
261    #[test]
262    fn test_consumer_into_stream_preserves_state() {
263        let (db, _temp) = test_db();
264
265        publish_event(&db, "test:event1", "data1").unwrap();
266
267        let consumer = Consumer::new(
268            db,
269            "test".to_string(),
270            "c1".to_string(),
271            WakeStrategy::default(),
272        )
273        .unwrap();
274
275        let stream = consumer.into_stream();
276
277        // Position should be None initially (no events processed)
278        assert_eq!(stream.position().unwrap(), None);
279    }
280}