1use crate::{
2 config::ConsumerMetadata,
3 error::{BusError, Result},
4 filter::{Event, EventFilter, EventFilterTrait},
5 notification::WakeStrategy,
6};
7use azoth::{typed_values::TypedValue, AsyncTransaction, AzothDb, Transaction};
8use azoth_core::{
9 traits::canonical::{CanonicalReadTxn, CanonicalStore},
10 EventId,
11};
12use std::sync::Arc;
13
14pub struct Consumer {
16 db: Arc<AzothDb>,
17 stream: String,
18 name: String,
19 filter: EventFilter,
20 cursor_key: Vec<u8>,
21 meta_key: Vec<u8>,
22 wake_strategy: WakeStrategy,
23}
24
25impl Consumer {
26 pub fn new(
28 db: Arc<AzothDb>,
29 stream: String,
30 name: String,
31 wake_strategy: WakeStrategy,
32 ) -> Result<Self> {
33 let cursor_key = format!("bus:consumer:{}:{}:cursor", stream, name).into_bytes();
34 let meta_key = format!("bus:consumer:{}:{}:meta", stream, name).into_bytes();
35
36 let meta = ConsumerMetadata::new(stream.clone(), name.clone());
38 let meta_bytes = serde_json::to_vec(&meta)?;
39
40 Transaction::new(&db)
41 .keys(vec![meta_key.clone()])
42 .execute(|ctx| {
43 if ctx.get_opt(&meta_key)?.is_none() {
45 ctx.set(&meta_key, &TypedValue::Bytes(meta_bytes))?;
46 }
47 Ok(())
48 })?;
49
50 let stream_filter = EventFilter::prefix(format!("{}:", stream));
52
53 Ok(Self {
54 db,
55 stream,
56 name,
57 filter: stream_filter,
58 cursor_key,
59 meta_key,
60 wake_strategy,
61 })
62 }
63
64 pub fn with_filter(mut self, filter: EventFilter) -> Self {
66 self.filter = self.filter.and(filter);
67 self
68 }
69
70 pub fn position(&self) -> Result<Option<u64>> {
74 let txn = self.db.canonical().read_txn()?;
75 match txn.get_state(&self.cursor_key)? {
76 Some(bytes) => {
77 let value = TypedValue::from_bytes(&bytes)?;
78 Ok(Some(value.as_i64()? as u64))
79 }
80 None => Ok(None),
81 }
82 }
83
84 pub fn seek(&mut self, event_id: u64) -> Result<()> {
89 if event_id == 0 {
90 Transaction::new(&self.db)
92 .keys(vec![self.cursor_key.clone()])
93 .execute(|ctx| {
94 ctx.delete(&self.cursor_key)?;
95 Ok(())
96 })?;
97 } else {
98 Transaction::new(&self.db)
100 .keys(vec![self.cursor_key.clone()])
101 .execute(|ctx| {
102 ctx.set(&self.cursor_key, &TypedValue::I64((event_id - 1) as i64))?;
103 Ok(())
104 })?;
105 }
106 Ok(())
107 }
108
109 #[allow(clippy::should_implement_trait)]
117 pub fn next(&mut self) -> Result<Option<Event>> {
118 let cursor = self.position()?;
119 let start_id = match cursor {
121 None => 0,
122 Some(n) => n + 1,
123 };
124 let mut iter = self.db.canonical().iter_events(start_id, None)?;
125
126 while let Some((id, bytes)) = iter.next()? {
127 let event = Event::decode(id, &bytes)?;
128
129 if self.filter.matches(&event) {
130 return Ok(Some(event));
131 }
132 }
133
134 Ok(None)
135 }
136
137 pub fn ack(&mut self, event_id: EventId) -> Result<()> {
139 let meta_key = self.meta_key.clone();
140 let cursor_key = self.cursor_key.clone();
141
142 Transaction::new(&self.db)
143 .keys(vec![cursor_key.clone(), meta_key.clone()])
144 .execute(|ctx| {
145 ctx.set(&cursor_key, &TypedValue::I64(event_id as i64))?;
147
148 if let Some(meta_value) = ctx.get_opt(&meta_key)? {
150 let meta_bytes = match meta_value {
151 TypedValue::Bytes(b) => b,
152 _ => {
153 return Err(azoth_core::AzothError::InvalidState(
154 "Consumer metadata must be bytes".into(),
155 ))
156 }
157 };
158 let mut meta: ConsumerMetadata = serde_json::from_slice(&meta_bytes)
159 .map_err(|e| azoth_core::AzothError::Serialization(e.to_string()))?;
160 meta.last_ack_at = Some(chrono::Utc::now());
161
162 let updated_bytes = serde_json::to_vec(&meta)
163 .map_err(|e| azoth_core::AzothError::Serialization(e.to_string()))?;
164 ctx.set(&meta_key, &TypedValue::Bytes(updated_bytes))?;
165 }
166
167 Ok(())
168 })?;
169 Ok(())
170 }
171
172 pub async fn ack_async(&mut self, event_id: EventId) -> Result<()> {
177 let meta_key = self.meta_key.clone();
178 let cursor_key = self.cursor_key.clone();
179
180 AsyncTransaction::new(self.db.clone())
181 .keys(vec![cursor_key.clone(), meta_key.clone()])
182 .execute(move |ctx| {
183 ctx.set(&cursor_key, &TypedValue::I64(event_id as i64))?;
185
186 if let Some(meta_value) = ctx.get_opt(&meta_key)? {
188 let meta_bytes = match meta_value {
189 TypedValue::Bytes(b) => b,
190 _ => {
191 return Err(azoth_core::AzothError::InvalidState(
192 "Consumer metadata must be bytes".into(),
193 ))
194 }
195 };
196 let mut meta: ConsumerMetadata = serde_json::from_slice(&meta_bytes)
197 .map_err(|e| azoth_core::AzothError::Serialization(e.to_string()))?;
198 meta.last_ack_at = Some(chrono::Utc::now());
199
200 let updated_bytes = serde_json::to_vec(&meta)
201 .map_err(|e| azoth_core::AzothError::Serialization(e.to_string()))?;
202 ctx.set(&meta_key, &TypedValue::Bytes(updated_bytes))?;
203 }
204
205 Ok(())
206 })
207 .await?;
208 Ok(())
209 }
210
211 pub fn metadata(&self) -> Result<ConsumerMetadata> {
213 let txn = self.db.canonical().read_txn()?;
214 match txn.get_state(&self.meta_key)? {
215 Some(bytes) => {
216 let value = TypedValue::from_bytes(&bytes)?;
217 let meta_bytes = match value {
218 TypedValue::Bytes(b) => b,
219 _ => {
220 return Err(BusError::InvalidState(
221 "Consumer metadata must be bytes".into(),
222 ))
223 }
224 };
225 Ok(serde_json::from_slice(&meta_bytes)?)
226 }
227 None => Err(BusError::ConsumerNotFound(format!(
228 "{}:{}",
229 self.stream, self.name
230 ))),
231 }
232 }
233
234 pub fn stream(&self) -> &str {
236 &self.stream
237 }
238
239 pub fn name(&self) -> &str {
241 &self.name
242 }
243
244 pub async fn next_async(&mut self) -> Result<Option<Event>> {
253 loop {
254 if let Some(event) = self.next()? {
256 return Ok(Some(event));
257 }
258
259 self.wake_strategy.wait(&self.stream).await;
261 }
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268 use azoth::Transaction;
269 use tempfile::TempDir;
270
271 fn test_db() -> (Arc<AzothDb>, TempDir) {
272 let temp = TempDir::new().unwrap();
273 let db = AzothDb::open(temp.path()).unwrap();
274 (Arc::new(db), temp)
275 }
276
277 fn publish_event(db: &AzothDb, event_type: &str, data: &str) -> Result<()> {
278 Transaction::new(db).execute(|ctx| {
279 ctx.log(event_type, &data)?;
280 Ok(())
281 })?;
282 Ok(())
283 }
284
285 fn test_consumer(db: Arc<AzothDb>, stream: &str, name: &str) -> Result<Consumer> {
286 Consumer::new(
287 db,
288 stream.to_string(),
289 name.to_string(),
290 WakeStrategy::default(),
291 )
292 }
293
294 #[test]
295 fn test_consumer_creation() {
296 let (db, _temp) = test_db();
297 let consumer = test_consumer(db.clone(), "test", "c1").unwrap();
298
299 assert_eq!(consumer.stream(), "test");
300 assert_eq!(consumer.name(), "c1");
301 assert_eq!(consumer.position().unwrap(), None); }
303
304 #[test]
305 fn test_consumer_next_with_stream_filter() {
306 let (db, _temp) = test_db();
307
308 publish_event(&db, "test:event1", "data1").unwrap();
310 publish_event(&db, "other:event2", "data2").unwrap();
311 publish_event(&db, "test:event3", "data3").unwrap();
312
313 let mut consumer = test_consumer(db.clone(), "test", "c1").unwrap();
315
316 let event1 = consumer.next().unwrap().unwrap();
317 assert_eq!(event1.event_type, "test:event1");
318 consumer.ack(event1.id).unwrap();
319
320 let event2 = consumer.next().unwrap().unwrap();
321 assert_eq!(event2.event_type, "test:event3");
322 consumer.ack(event2.id).unwrap();
323
324 assert!(consumer.next().unwrap().is_none());
325 }
326
327 #[test]
328 fn test_consumer_with_additional_filter() {
329 let (db, _temp) = test_db();
330
331 publish_event(&db, "test:doc_updated", "data1").unwrap();
332 publish_event(&db, "test:index_updated", "data2").unwrap();
333 publish_event(&db, "test:doc_deleted", "data3").unwrap();
334
335 let mut consumer = test_consumer(db.clone(), "test", "c1")
337 .unwrap()
338 .with_filter(EventFilter::prefix("test:doc_"));
339
340 let event1 = consumer.next().unwrap().unwrap();
341 assert_eq!(event1.event_type, "test:doc_updated");
342 consumer.ack(event1.id).unwrap();
343
344 let event2 = consumer.next().unwrap().unwrap();
345 assert_eq!(event2.event_type, "test:doc_deleted");
346 consumer.ack(event2.id).unwrap();
347
348 assert!(consumer.next().unwrap().is_none());
349 }
350
351 #[test]
352 fn test_consumer_ack() {
353 let (db, _temp) = test_db();
354
355 publish_event(&db, "test:event1", "data1").unwrap();
356 publish_event(&db, "test:event2", "data2").unwrap();
357
358 let mut consumer = test_consumer(db.clone(), "test", "c1").unwrap();
359
360 let event1 = consumer.next().unwrap().unwrap();
361 consumer.ack(event1.id).unwrap();
362
363 assert_eq!(consumer.position().unwrap(), Some(event1.id));
364
365 let mut consumer2 = test_consumer(db.clone(), "test", "c1").unwrap();
367 let event2 = consumer2.next().unwrap().unwrap();
368 assert_eq!(event2.event_type, "test:event2");
369 }
370
371 #[test]
372 fn test_consumer_seek() {
373 let (db, _temp) = test_db();
374
375 publish_event(&db, "test:event1", "data1").unwrap();
376 publish_event(&db, "test:event2", "data2").unwrap();
377 publish_event(&db, "test:event3", "data3").unwrap();
378
379 let mut consumer = test_consumer(db.clone(), "test", "c1").unwrap();
380
381 consumer.seek(1).unwrap();
383
384 let event = consumer.next().unwrap().unwrap();
385 assert_eq!(event.event_type, "test:event2");
386 assert_eq!(event.id, 1);
387 }
388
389 #[test]
390 fn test_independent_consumers() {
391 let (db, _temp) = test_db();
392
393 publish_event(&db, "test:event1", "data1").unwrap();
394 publish_event(&db, "test:event2", "data2").unwrap();
395 publish_event(&db, "test:event3", "data3").unwrap();
396
397 let mut c1 = test_consumer(db.clone(), "test", "c1").unwrap();
398 let mut c2 = test_consumer(db.clone(), "test", "c2").unwrap();
399
400 let e1 = c1.next().unwrap().unwrap();
402 assert_eq!(e1.id, 0);
403 c1.ack(e1.id).unwrap();
404
405 let e1 = c2.next().unwrap().unwrap();
407 assert_eq!(e1.id, 0);
408 c2.ack(e1.id).unwrap();
409
410 let e2 = c2.next().unwrap().unwrap();
411 assert_eq!(e2.id, 1);
412 c2.ack(e2.id).unwrap();
413
414 assert_eq!(c1.position().unwrap(), Some(0));
416 assert_eq!(c2.position().unwrap(), Some(1));
417 }
418
419 #[test]
420 fn test_consumer_catches_new_events() {
421 let (db, _temp) = test_db();
422
423 let mut consumer = test_consumer(db.clone(), "test", "c1").unwrap();
425
426 assert!(consumer.next().unwrap().is_none());
428
429 publish_event(&db, "test:event1", "data1").unwrap();
431 publish_event(&db, "test:event2", "data2").unwrap();
432
433 let event1 = consumer.next().unwrap();
435 assert!(
436 event1.is_some(),
437 "Consumer should catch events published after creation"
438 );
439 let event1 = event1.unwrap();
440 assert_eq!(event1.event_type, "test:event1");
441 consumer.ack(event1.id).unwrap();
442
443 let event2 = consumer.next().unwrap().unwrap();
444 assert_eq!(event2.event_type, "test:event2");
445 consumer.ack(event2.id).unwrap();
446
447 assert!(consumer.next().unwrap().is_none());
448 }
449
450 #[test]
451 fn test_consumer_polling_loop_catches_events() {
452 let (db, _temp) = test_db();
453
454 let mut consumer = test_consumer(db.clone(), "test", "c1").unwrap();
456
457 let mut found_event = false;
459 for iteration in 0..50 {
460 if iteration == 10 {
461 publish_event(&db, "test:event1", "data1").unwrap();
463 }
464
465 if let Some(event) = consumer.next().unwrap() {
466 assert_eq!(event.event_type, "test:event1");
467 consumer.ack(event.id).unwrap();
468 found_event = true;
469 break;
470 }
471
472 std::thread::sleep(std::time::Duration::from_millis(10));
474 }
475
476 assert!(
477 found_event,
478 "Consumer polling loop should catch event published during polling"
479 );
480 }
481}