Skip to main content

azoth_bus/
consumer.rs

1use crate::{
2    config::ConsumerMetadata,
3    error::{BusError, Result},
4    filter::{Event, EventFilter, EventFilterTrait},
5    notification::WakeStrategy,
6};
7use azoth::{typed_values::TypedValue, AsyncTransaction, AzothDb, Transaction};
8use azoth_core::{
9    traits::canonical::{CanonicalReadTxn, CanonicalStore},
10    EventId,
11};
12use std::sync::Arc;
13
14/// A consumer of events from a stream
15pub struct Consumer {
16    db: Arc<AzothDb>,
17    stream: String,
18    name: String,
19    filter: EventFilter,
20    cursor_key: Vec<u8>,
21    meta_key: Vec<u8>,
22    wake_strategy: WakeStrategy,
23}
24
25impl Consumer {
26    /// Create a new consumer
27    pub fn new(
28        db: Arc<AzothDb>,
29        stream: String,
30        name: String,
31        wake_strategy: WakeStrategy,
32    ) -> Result<Self> {
33        let cursor_key = format!("bus:consumer:{}:{}:cursor", stream, name).into_bytes();
34        let meta_key = format!("bus:consumer:{}:{}:meta", stream, name).into_bytes();
35
36        // Initialize metadata if this is a new consumer
37        let meta = ConsumerMetadata::new(stream.clone(), name.clone());
38        let meta_bytes = serde_json::to_vec(&meta)?;
39
40        Transaction::new(&db)
41            .keys(vec![meta_key.clone()])
42            .execute(|ctx| {
43                // Only write if not exists
44                if ctx.get_opt(&meta_key)?.is_none() {
45                    ctx.set(&meta_key, &TypedValue::Bytes(meta_bytes))?;
46                }
47                Ok(())
48            })?;
49
50        // Auto-filter to stream prefix
51        let stream_filter = EventFilter::prefix(format!("{}:", stream));
52
53        Ok(Self {
54            db,
55            stream,
56            name,
57            filter: stream_filter,
58            cursor_key,
59            meta_key,
60            wake_strategy,
61        })
62    }
63
64    /// Add an additional filter on top of the stream filter
65    pub fn with_filter(mut self, filter: EventFilter) -> Self {
66        self.filter = self.filter.and(filter);
67        self
68    }
69
70    /// Get the current cursor position (last acknowledged event ID)
71    ///
72    /// Returns None if no events have been acknowledged yet.
73    pub fn position(&self) -> Result<Option<u64>> {
74        let txn = self.db.canonical().read_txn()?;
75        match txn.get_state(&self.cursor_key)? {
76            Some(bytes) => {
77                let value = TypedValue::from_bytes(&bytes)?;
78                Ok(Some(value.as_i64()? as u64))
79            }
80            None => Ok(None),
81        }
82    }
83
84    /// Seek to read from a specific event ID
85    ///
86    /// The next call to `next()` will return the event at `event_id`.
87    /// If `event_id` is 0, resets to the beginning.
88    pub fn seek(&mut self, event_id: u64) -> Result<()> {
89        if event_id == 0 {
90            // Reset to beginning - delete cursor
91            Transaction::new(&self.db)
92                .keys(vec![self.cursor_key.clone()])
93                .execute(|ctx| {
94                    ctx.delete(&self.cursor_key)?;
95                    Ok(())
96                })?;
97        } else {
98            // Set cursor to event_id - 1 so next() reads from event_id
99            Transaction::new(&self.db)
100                .keys(vec![self.cursor_key.clone()])
101                .execute(|ctx| {
102                    ctx.set(&self.cursor_key, &TypedValue::I64((event_id - 1) as i64))?;
103                    Ok(())
104                })?;
105        }
106        Ok(())
107    }
108
109    /// Read the next event (blocking poll)
110    ///
111    /// Returns the next unprocessed event that matches the filter.
112    /// Call `ack()` to advance the cursor after processing.
113    ///
114    /// Note: This method is intentionally not implementing Iterator
115    /// because it returns Result and requires error handling.
116    #[allow(clippy::should_implement_trait)]
117    pub fn next(&mut self) -> Result<Option<Event>> {
118        let cursor = self.position()?;
119        // Start from cursor + 1, or 0 if no cursor (never processed anything)
120        let start_id = match cursor {
121            None => 0,
122            Some(n) => n + 1,
123        };
124        let mut iter = self.db.canonical().iter_events(start_id, None)?;
125
126        while let Some((id, bytes)) = iter.next()? {
127            let event = Event::decode(id, &bytes)?;
128
129            if self.filter.matches(&event) {
130                return Ok(Some(event));
131            }
132        }
133
134        Ok(None)
135    }
136
137    /// Acknowledge processing of an event (update cursor)
138    pub fn ack(&mut self, event_id: EventId) -> Result<()> {
139        let meta_key = self.meta_key.clone();
140        let cursor_key = self.cursor_key.clone();
141
142        Transaction::new(&self.db)
143            .keys(vec![cursor_key.clone(), meta_key.clone()])
144            .execute(|ctx| {
145                // Update cursor
146                ctx.set(&cursor_key, &TypedValue::I64(event_id as i64))?;
147
148                // Update last ack timestamp in metadata
149                if let Some(meta_value) = ctx.get_opt(&meta_key)? {
150                    let meta_bytes = match meta_value {
151                        TypedValue::Bytes(b) => b,
152                        _ => {
153                            return Err(azoth_core::AzothError::InvalidState(
154                                "Consumer metadata must be bytes".into(),
155                            ))
156                        }
157                    };
158                    let mut meta: ConsumerMetadata = serde_json::from_slice(&meta_bytes)
159                        .map_err(|e| azoth_core::AzothError::Serialization(e.to_string()))?;
160                    meta.last_ack_at = Some(chrono::Utc::now());
161
162                    let updated_bytes = serde_json::to_vec(&meta)
163                        .map_err(|e| azoth_core::AzothError::Serialization(e.to_string()))?;
164                    ctx.set(&meta_key, &TypedValue::Bytes(updated_bytes))?;
165                }
166
167                Ok(())
168            })?;
169        Ok(())
170    }
171
172    /// Acknowledge processing of an event asynchronously (safe for async context)
173    ///
174    /// This is the async-safe version of `ack()`. Use this when calling from
175    /// async code to avoid blocking the runtime.
176    pub async fn ack_async(&mut self, event_id: EventId) -> Result<()> {
177        let meta_key = self.meta_key.clone();
178        let cursor_key = self.cursor_key.clone();
179
180        AsyncTransaction::new(self.db.clone())
181            .keys(vec![cursor_key.clone(), meta_key.clone()])
182            .execute(move |ctx| {
183                // Update cursor
184                ctx.set(&cursor_key, &TypedValue::I64(event_id as i64))?;
185
186                // Update last ack timestamp in metadata
187                if let Some(meta_value) = ctx.get_opt(&meta_key)? {
188                    let meta_bytes = match meta_value {
189                        TypedValue::Bytes(b) => b,
190                        _ => {
191                            return Err(azoth_core::AzothError::InvalidState(
192                                "Consumer metadata must be bytes".into(),
193                            ))
194                        }
195                    };
196                    let mut meta: ConsumerMetadata = serde_json::from_slice(&meta_bytes)
197                        .map_err(|e| azoth_core::AzothError::Serialization(e.to_string()))?;
198                    meta.last_ack_at = Some(chrono::Utc::now());
199
200                    let updated_bytes = serde_json::to_vec(&meta)
201                        .map_err(|e| azoth_core::AzothError::Serialization(e.to_string()))?;
202                    ctx.set(&meta_key, &TypedValue::Bytes(updated_bytes))?;
203                }
204
205                Ok(())
206            })
207            .await?;
208        Ok(())
209    }
210
211    /// Get consumer metadata
212    pub fn metadata(&self) -> Result<ConsumerMetadata> {
213        let txn = self.db.canonical().read_txn()?;
214        match txn.get_state(&self.meta_key)? {
215            Some(bytes) => {
216                let value = TypedValue::from_bytes(&bytes)?;
217                let meta_bytes = match value {
218                    TypedValue::Bytes(b) => b,
219                    _ => {
220                        return Err(BusError::InvalidState(
221                            "Consumer metadata must be bytes".into(),
222                        ))
223                    }
224                };
225                Ok(serde_json::from_slice(&meta_bytes)?)
226            }
227            None => Err(BusError::ConsumerNotFound(format!(
228                "{}:{}",
229                self.stream, self.name
230            ))),
231        }
232    }
233
234    /// Get the stream name
235    pub fn stream(&self) -> &str {
236        &self.stream
237    }
238
239    /// Get the consumer name
240    pub fn name(&self) -> &str {
241        &self.name
242    }
243
244    /// Read the next event asynchronously (non-blocking)
245    ///
246    /// This method will wait for new events using the configured wake strategy.
247    /// With polling (default), it checks periodically. With notifications, it
248    /// waits for explicit wake-up signals.
249    ///
250    /// Returns the next unprocessed event that matches the filter.
251    /// Call `ack()` to advance the cursor after processing.
252    pub async fn next_async(&mut self) -> Result<Option<Event>> {
253        loop {
254            // Try to get an event
255            if let Some(event) = self.next()? {
256                return Ok(Some(event));
257            }
258
259            // No event available, wait for wake signal
260            self.wake_strategy.wait(&self.stream).await;
261        }
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use super::*;
268    use azoth::Transaction;
269    use tempfile::TempDir;
270
271    fn test_db() -> (Arc<AzothDb>, TempDir) {
272        let temp = TempDir::new().unwrap();
273        let db = AzothDb::open(temp.path()).unwrap();
274        (Arc::new(db), temp)
275    }
276
277    fn publish_event(db: &AzothDb, event_type: &str, data: &str) -> Result<()> {
278        Transaction::new(db).execute(|ctx| {
279            ctx.log(event_type, &data)?;
280            Ok(())
281        })?;
282        Ok(())
283    }
284
285    fn test_consumer(db: Arc<AzothDb>, stream: &str, name: &str) -> Result<Consumer> {
286        Consumer::new(
287            db,
288            stream.to_string(),
289            name.to_string(),
290            WakeStrategy::default(),
291        )
292    }
293
294    #[test]
295    fn test_consumer_creation() {
296        let (db, _temp) = test_db();
297        let consumer = test_consumer(db.clone(), "test", "c1").unwrap();
298
299        assert_eq!(consumer.stream(), "test");
300        assert_eq!(consumer.name(), "c1");
301        assert_eq!(consumer.position().unwrap(), None); // No events processed yet
302    }
303
304    #[test]
305    fn test_consumer_next_with_stream_filter() {
306        let (db, _temp) = test_db();
307
308        // Publish events to different streams
309        publish_event(&db, "test:event1", "data1").unwrap();
310        publish_event(&db, "other:event2", "data2").unwrap();
311        publish_event(&db, "test:event3", "data3").unwrap();
312
313        // Consumer should only see "test:" events
314        let mut consumer = test_consumer(db.clone(), "test", "c1").unwrap();
315
316        let event1 = consumer.next().unwrap().unwrap();
317        assert_eq!(event1.event_type, "test:event1");
318        consumer.ack(event1.id).unwrap();
319
320        let event2 = consumer.next().unwrap().unwrap();
321        assert_eq!(event2.event_type, "test:event3");
322        consumer.ack(event2.id).unwrap();
323
324        assert!(consumer.next().unwrap().is_none());
325    }
326
327    #[test]
328    fn test_consumer_with_additional_filter() {
329        let (db, _temp) = test_db();
330
331        publish_event(&db, "test:doc_updated", "data1").unwrap();
332        publish_event(&db, "test:index_updated", "data2").unwrap();
333        publish_event(&db, "test:doc_deleted", "data3").unwrap();
334
335        // Filter further to only "test:doc_" events
336        let mut consumer = test_consumer(db.clone(), "test", "c1")
337            .unwrap()
338            .with_filter(EventFilter::prefix("test:doc_"));
339
340        let event1 = consumer.next().unwrap().unwrap();
341        assert_eq!(event1.event_type, "test:doc_updated");
342        consumer.ack(event1.id).unwrap();
343
344        let event2 = consumer.next().unwrap().unwrap();
345        assert_eq!(event2.event_type, "test:doc_deleted");
346        consumer.ack(event2.id).unwrap();
347
348        assert!(consumer.next().unwrap().is_none());
349    }
350
351    #[test]
352    fn test_consumer_ack() {
353        let (db, _temp) = test_db();
354
355        publish_event(&db, "test:event1", "data1").unwrap();
356        publish_event(&db, "test:event2", "data2").unwrap();
357
358        let mut consumer = test_consumer(db.clone(), "test", "c1").unwrap();
359
360        let event1 = consumer.next().unwrap().unwrap();
361        consumer.ack(event1.id).unwrap();
362
363        assert_eq!(consumer.position().unwrap(), Some(event1.id));
364
365        // After restart, should resume from acknowledged position
366        let mut consumer2 = test_consumer(db.clone(), "test", "c1").unwrap();
367        let event2 = consumer2.next().unwrap().unwrap();
368        assert_eq!(event2.event_type, "test:event2");
369    }
370
371    #[test]
372    fn test_consumer_seek() {
373        let (db, _temp) = test_db();
374
375        publish_event(&db, "test:event1", "data1").unwrap();
376        publish_event(&db, "test:event2", "data2").unwrap();
377        publish_event(&db, "test:event3", "data3").unwrap();
378
379        let mut consumer = test_consumer(db.clone(), "test", "c1").unwrap();
380
381        // Seek to read from event ID 1 (test:event2, which is the second event)
382        consumer.seek(1).unwrap();
383
384        let event = consumer.next().unwrap().unwrap();
385        assert_eq!(event.event_type, "test:event2");
386        assert_eq!(event.id, 1);
387    }
388
389    #[test]
390    fn test_independent_consumers() {
391        let (db, _temp) = test_db();
392
393        publish_event(&db, "test:event1", "data1").unwrap();
394        publish_event(&db, "test:event2", "data2").unwrap();
395        publish_event(&db, "test:event3", "data3").unwrap();
396
397        let mut c1 = test_consumer(db.clone(), "test", "c1").unwrap();
398        let mut c2 = test_consumer(db.clone(), "test", "c2").unwrap();
399
400        // c1 reads and acks first event (ID 0)
401        let e1 = c1.next().unwrap().unwrap();
402        assert_eq!(e1.id, 0);
403        c1.ack(e1.id).unwrap();
404
405        // c2 reads first event (ID 0), acks it, then reads and acks second event (ID 1)
406        let e1 = c2.next().unwrap().unwrap();
407        assert_eq!(e1.id, 0);
408        c2.ack(e1.id).unwrap();
409
410        let e2 = c2.next().unwrap().unwrap();
411        assert_eq!(e2.id, 1);
412        c2.ack(e2.id).unwrap();
413
414        // c1 acked event 0, c2 acked event 1
415        assert_eq!(c1.position().unwrap(), Some(0));
416        assert_eq!(c2.position().unwrap(), Some(1));
417    }
418
419    #[test]
420    fn test_consumer_catches_new_events() {
421        let (db, _temp) = test_db();
422
423        // Create consumer first (no events yet)
424        let mut consumer = test_consumer(db.clone(), "test", "c1").unwrap();
425
426        // Consumer should see no events initially
427        assert!(consumer.next().unwrap().is_none());
428
429        // Publish events AFTER consumer is created
430        publish_event(&db, "test:event1", "data1").unwrap();
431        publish_event(&db, "test:event2", "data2").unwrap();
432
433        // Consumer should now see the new events
434        let event1 = consumer.next().unwrap();
435        assert!(
436            event1.is_some(),
437            "Consumer should catch events published after creation"
438        );
439        let event1 = event1.unwrap();
440        assert_eq!(event1.event_type, "test:event1");
441        consumer.ack(event1.id).unwrap();
442
443        let event2 = consumer.next().unwrap().unwrap();
444        assert_eq!(event2.event_type, "test:event2");
445        consumer.ack(event2.id).unwrap();
446
447        assert!(consumer.next().unwrap().is_none());
448    }
449
450    #[test]
451    fn test_consumer_polling_loop_catches_events() {
452        let (db, _temp) = test_db();
453
454        // Create consumer first (no events yet)
455        let mut consumer = test_consumer(db.clone(), "test", "c1").unwrap();
456
457        // Start a polling loop (similar to projector behavior)
458        let mut found_event = false;
459        for iteration in 0..50 {
460            if iteration == 10 {
461                // Publish event during polling loop
462                publish_event(&db, "test:event1", "data1").unwrap();
463            }
464
465            if let Some(event) = consumer.next().unwrap() {
466                assert_eq!(event.event_type, "test:event1");
467                consumer.ack(event.id).unwrap();
468                found_event = true;
469                break;
470            }
471
472            // Small sleep to simulate polling interval
473            std::thread::sleep(std::time::Duration::from_millis(10));
474        }
475
476        assert!(
477            found_event,
478            "Consumer polling loop should catch event published during polling"
479        );
480    }
481}