1use std::path::Path;
2
3use hermes_proto::EventEnvelope;
4use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
5use tracing::{debug, trace};
6
7use crate::error::StoreError;
8use crate::{DeliveryState, MessageStore, StoredMessage};
9
10const MESSAGES: TableDefinition<&str, &[u8]> = TableDefinition::new("messages");
20const DELIVERIES: TableDefinition<&str, &[u8]> = TableDefinition::new("deliveries");
21const CONSUMERS: TableDefinition<&str, &[u8]> = TableDefinition::new("consumers");
22const SUBJECT_INDEX: TableDefinition<&[u8], ()> = TableDefinition::new("subject_index_v2");
23const META: TableDefinition<&str, &[u8]> = TableDefinition::new("meta");
24
25const SCHEMA_VERSION: u32 = 2;
27
28const RECORD_VERSION: u8 = 2;
30
31#[derive(Debug, Clone)]
34struct MessageRecord {
35 id: String,
36 subject: Vec<u8>,
37 payload: Vec<u8>,
38 headers: Vec<(String, String)>,
39 timestamp_nanos: i64,
40 created_at_ms: u64,
41}
42
43impl MessageRecord {
44 fn from_envelope(envelope: &EventEnvelope, created_at_ms: u64) -> Self {
45 Self {
46 id: envelope.id.clone(),
47 subject: envelope.subject.clone(),
48 payload: envelope.payload.clone(),
49 headers: envelope
50 .headers
51 .iter()
52 .map(|(k, v)| (k.clone(), v.clone()))
53 .collect(),
54 timestamp_nanos: envelope.timestamp_nanos,
55 created_at_ms,
56 }
57 }
58
59 fn to_envelope(&self) -> EventEnvelope {
60 EventEnvelope {
61 id: self.id.clone(),
62 subject: self.subject.clone(),
63 payload: self.payload.clone(),
64 headers: self.headers.iter().cloned().collect(),
65 timestamp_nanos: self.timestamp_nanos,
66 }
67 }
68
69 fn serialize(&self) -> Vec<u8> {
70 let mut buf = Vec::new();
71 buf.push(RECORD_VERSION);
72 write_str(&mut buf, &self.id);
73 write_bytes(&mut buf, &self.subject);
74 write_bytes(&mut buf, &self.payload);
75 write_u32(&mut buf, self.headers.len() as u32);
76 for (k, v) in &self.headers {
77 write_str(&mut buf, k);
78 write_str(&mut buf, v);
79 }
80 write_i64(&mut buf, self.timestamp_nanos);
81 write_u64(&mut buf, self.created_at_ms);
82 buf
83 }
84
85 fn deserialize(data: &[u8]) -> Option<Self> {
86 if data.is_empty() {
87 return None;
88 }
89 let mut pos = 0;
90 let version = data[0];
91 pos += 1;
92
93 if version == RECORD_VERSION {
94 let id = read_str(data, &mut pos)?;
96 let subject = read_bytes(data, &mut pos)?;
97 let payload = read_bytes(data, &mut pos)?;
98 let header_count = read_u32(data, &mut pos)? as usize;
99 let mut headers = Vec::with_capacity(header_count);
100 for _ in 0..header_count {
101 let k = read_str(data, &mut pos)?;
102 let v = read_str(data, &mut pos)?;
103 headers.push((k, v));
104 }
105 let timestamp_nanos = read_i64(data, &mut pos)?;
106 let created_at_ms = read_u64(data, &mut pos)?;
107 Some(Self {
108 id,
109 subject,
110 payload,
111 headers,
112 timestamp_nanos,
113 created_at_ms,
114 })
115 } else {
116 pos = if version == 1 { 1 } else { 0 };
118 let id = read_str(data, &mut pos)?;
119 let subject_str = read_str(data, &mut pos)?;
120 let payload = read_bytes(data, &mut pos)?;
121 let header_count = read_u32(data, &mut pos)? as usize;
122 let mut headers = Vec::with_capacity(header_count);
123 for _ in 0..header_count {
124 let k = read_str(data, &mut pos)?;
125 let v = read_str(data, &mut pos)?;
126 headers.push((k, v));
127 }
128 let timestamp_nanos = read_i64(data, &mut pos)?;
129 let created_at_ms = read_u64(data, &mut pos)?;
130 Some(Self {
131 id,
132 subject: subject_str.into_bytes(),
133 payload,
134 headers,
135 timestamp_nanos,
136 created_at_ms,
137 })
138 }
139 }
140}
141
142#[derive(Debug, Clone)]
143struct DeliveryRecord {
144 state: DeliveryState,
145 attempt: u32,
146 ack_deadline_ms: u64,
147}
148
149impl DeliveryRecord {
150 fn serialize(&self) -> Vec<u8> {
151 let mut buf = Vec::with_capacity(14);
152 buf.push(RECORD_VERSION);
153 buf.push(self.state.as_u8());
154 write_u32(&mut buf, self.attempt);
155 write_u64(&mut buf, self.ack_deadline_ms);
156 buf
157 }
158
159 fn deserialize(data: &[u8]) -> Option<Self> {
160 if data.is_empty() {
161 return None;
162 }
163 let version = data[0];
164 if version == RECORD_VERSION || version == 1 {
165 if data.len() < 14 {
167 return None;
168 }
169 let state = DeliveryState::from_u8(data[1])?;
170 let mut pos = 2;
171 let attempt = read_u32(data, &mut pos)?;
172 let ack_deadline_ms = read_u64(data, &mut pos)?;
173 Some(Self {
174 state,
175 attempt,
176 ack_deadline_ms,
177 })
178 } else {
179 if data.len() < 13 {
181 return None;
182 }
183 let state = DeliveryState::from_u8(data[0])?;
184 let mut pos = 1;
185 let attempt = read_u32(data, &mut pos)?;
186 let ack_deadline_ms = read_u64(data, &mut pos)?;
187 Some(Self {
188 state,
189 attempt,
190 ack_deadline_ms,
191 })
192 }
193 }
194}
195
196#[derive(Debug, Clone)]
197struct ConsumerRecord {
198 subject: Vec<u8>,
199 queue_groups: Vec<String>,
200}
201
202impl ConsumerRecord {
203 fn serialize(&self) -> Vec<u8> {
204 let mut buf = Vec::new();
205 buf.push(RECORD_VERSION);
206 write_bytes(&mut buf, &self.subject);
207 write_u32(&mut buf, self.queue_groups.len() as u32);
208 for qg in &self.queue_groups {
209 write_str(&mut buf, qg);
210 }
211 buf
212 }
213
214 fn deserialize(data: &[u8]) -> Option<Self> {
215 if data.is_empty() {
216 return None;
217 }
218 let version = data[0];
219
220 if version == RECORD_VERSION {
221 let mut pos = 1;
222 let subject = read_bytes(data, &mut pos)?;
223 let count = read_u32(data, &mut pos)?;
224 let mut queue_groups = Vec::with_capacity(count as usize);
225 for _ in 0..count {
226 queue_groups.push(read_str(data, &mut pos)?);
227 }
228 Some(Self {
229 subject,
230 queue_groups,
231 })
232 } else if version == 1 {
233 let mut pos = 1;
234 let subject_str = read_str(data, &mut pos)?;
235 let count = read_u32(data, &mut pos)?;
236 let mut queue_groups = Vec::with_capacity(count as usize);
237 for _ in 0..count {
238 queue_groups.push(read_str(data, &mut pos)?);
239 }
240 Some(Self {
241 subject: subject_str.into_bytes(),
242 queue_groups,
243 })
244 } else {
245 let mut pos = 0;
246 let subject_str = read_str(data, &mut pos)?;
247 let has_qg = *data.get(pos)?;
248 pos += 1;
249 let queue_groups = if has_qg == 1 {
250 vec![read_str(data, &mut pos)?]
251 } else {
252 vec![]
253 };
254 Some(Self {
255 subject: subject_str.into_bytes(),
256 queue_groups,
257 })
258 }
259 }
260}
261
262fn write_u32(buf: &mut Vec<u8>, v: u32) {
265 buf.extend_from_slice(&v.to_le_bytes());
266}
267
268fn write_u64(buf: &mut Vec<u8>, v: u64) {
269 buf.extend_from_slice(&v.to_le_bytes());
270}
271
272fn write_i64(buf: &mut Vec<u8>, v: i64) {
273 buf.extend_from_slice(&v.to_le_bytes());
274}
275
276fn write_str(buf: &mut Vec<u8>, s: &str) {
277 write_u32(buf, s.len() as u32);
278 buf.extend_from_slice(s.as_bytes());
279}
280
281fn write_bytes(buf: &mut Vec<u8>, b: &[u8]) {
282 write_u32(buf, b.len() as u32);
283 buf.extend_from_slice(b);
284}
285
286fn read_u32(data: &[u8], pos: &mut usize) -> Option<u32> {
287 let bytes: [u8; 4] = data.get(*pos..*pos + 4)?.try_into().ok()?;
288 *pos += 4;
289 Some(u32::from_le_bytes(bytes))
290}
291
292fn read_u64(data: &[u8], pos: &mut usize) -> Option<u64> {
293 let bytes: [u8; 8] = data.get(*pos..*pos + 8)?.try_into().ok()?;
294 *pos += 8;
295 Some(u64::from_le_bytes(bytes))
296}
297
298fn read_i64(data: &[u8], pos: &mut usize) -> Option<i64> {
299 let bytes: [u8; 8] = data.get(*pos..*pos + 8)?.try_into().ok()?;
300 *pos += 8;
301 Some(i64::from_le_bytes(bytes))
302}
303
304fn read_str(data: &[u8], pos: &mut usize) -> Option<String> {
305 let len = read_u32(data, pos)? as usize;
306 let s = std::str::from_utf8(data.get(*pos..*pos + len)?).ok()?;
307 *pos += len;
308 Some(s.to_string())
309}
310
311fn read_bytes(data: &[u8], pos: &mut usize) -> Option<Vec<u8>> {
312 let len = read_u32(data, pos)? as usize;
313 let b = data.get(*pos..*pos + len)?;
314 *pos += len;
315 Some(b.to_vec())
316}
317
318fn now_ms() -> u64 {
319 std::time::SystemTime::now()
320 .duration_since(std::time::UNIX_EPOCH)
321 .unwrap_or_default()
322 .as_millis() as u64
323}
324
325fn subject_index_key(subject: &[u8], message_id: &str) -> Vec<u8> {
327 let mut key = Vec::with_capacity(4 + subject.len() + message_id.len());
328 key.extend_from_slice(&(subject.len() as u32).to_le_bytes());
329 key.extend_from_slice(subject);
330 key.extend_from_slice(message_id.as_bytes());
331 key
332}
333
334fn subject_index_prefix(subject: &[u8]) -> Vec<u8> {
336 let mut prefix = Vec::with_capacity(4 + subject.len());
337 prefix.extend_from_slice(&(subject.len() as u32).to_le_bytes());
338 prefix.extend_from_slice(subject);
339 prefix
340}
341
342const KEY_SEP: char = '\0';
345
346fn delivery_key(consumer_name: &str, message_id: &str) -> String {
347 format!("{consumer_name}{KEY_SEP}{message_id}")
348}
349
350pub struct RedbMessageStore {
353 db: Database,
354}
355
356impl RedbMessageStore {
357 fn init_tables(db: &Database) -> Result<(), StoreError> {
359 let txn = db.begin_write()?;
360 {
361 let _ = txn.open_table(MESSAGES)?;
362 let _ = txn.open_table(DELIVERIES)?;
363 let _ = txn.open_table(CONSUMERS)?;
364 let _ = txn.open_table(SUBJECT_INDEX)?;
365 let mut meta = txn.open_table(META)?;
366
367 let needs_init = {
369 let existing = meta.get("schema_version")?;
370 match existing {
371 Some(guard) => {
372 let bytes: &[u8] = guard.value();
373 if bytes.len() >= 4 {
374 let version =
375 u32::from_le_bytes(bytes[..4].try_into().unwrap_or([0; 4]));
376 if version > SCHEMA_VERSION {
377 return Err(StoreError::InvalidState(version as u8));
378 }
379 }
380 false
381 }
382 None => true,
383 }
384 };
385 if needs_init {
386 meta.insert("schema_version", SCHEMA_VERSION.to_le_bytes().as_slice())?;
387 }
388 }
389 txn.commit()?;
390 Ok(())
391 }
392
393 pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
395 let db = Database::create(path.as_ref())?;
396 Self::init_tables(&db)?;
397 debug!(path = %path.as_ref().display(), "store opened");
398 Ok(Self { db })
399 }
400
401 #[cfg(test)]
403 pub fn open_temporary() -> Result<Self, StoreError> {
404 let db = Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
405 Self::init_tables(&db)?;
406 Ok(Self { db })
407 }
408}
409
410impl MessageStore for RedbMessageStore {
411 fn persist(&self, envelope: &EventEnvelope) -> Result<(), StoreError> {
412 let record = MessageRecord::from_envelope(envelope, now_ms());
413 let serialized = record.serialize();
414
415 let txn = self.db.begin_write()?;
416 {
417 let mut messages = txn.open_table(MESSAGES)?;
418 messages.insert(envelope.id.as_str(), serialized.as_slice())?;
419
420 let mut idx = txn.open_table(SUBJECT_INDEX)?;
421 let key = subject_index_key(&envelope.subject, &envelope.id);
422 idx.insert(key.as_slice(), ())?;
423 }
424 txn.commit()?;
425
426 debug!(id = envelope.id, "message persisted");
427 Ok(())
428 }
429
430 fn register_consumer(
431 &self,
432 consumer_name: &str,
433 subject: &[u8],
434 queue_groups: &[String],
435 ) -> Result<(), StoreError> {
436 let record = ConsumerRecord {
437 subject: subject.to_vec(),
438 queue_groups: queue_groups.to_vec(),
439 };
440
441 let txn = self.db.begin_write()?;
442 {
443 let mut consumers = txn.open_table(CONSUMERS)?;
444 consumers.insert(consumer_name, record.serialize().as_slice())?;
445 }
446 txn.commit()?;
447
448 debug!(consumer_name, "consumer registered");
449 Ok(())
450 }
451
452 fn fetch_pending(
453 &self,
454 consumer_name: &str,
455 limit: u32,
456 ) -> Result<Vec<StoredMessage>, StoreError> {
457 let txn = self.db.begin_read()?;
458
459 let consumers = txn.open_table(CONSUMERS)?;
461 let consumer_data = consumers
462 .get(consumer_name)?
463 .ok_or_else(|| StoreError::ConsumerNotFound(consumer_name.to_string()))?;
464 let consumer = ConsumerRecord::deserialize(consumer_data.value())
465 .ok_or(StoreError::InvalidState(255))?;
466
467 let messages = txn.open_table(MESSAGES)?;
468 let deliveries = txn.open_table(DELIVERIES)?;
469 let idx = txn.open_table(SUBJECT_INDEX)?;
470
471 let prefix = subject_index_prefix(&consumer.subject);
472 let mut result = Vec::new();
473
474 let range = idx.range(prefix.as_slice()..)?;
475 for entry in range {
476 let entry = entry?;
477 let key: &[u8] = entry.0.value();
478
479 if !key.starts_with(&prefix) {
481 break;
482 }
483
484 if result.len() >= limit as usize {
485 break;
486 }
487
488 let message_id = match std::str::from_utf8(&key[prefix.len()..]) {
490 Ok(s) => s,
491 Err(_) => continue,
492 };
493 let dk = delivery_key(consumer_name, message_id);
494
495 if deliveries.get(dk.as_str())?.is_some() {
497 continue;
498 }
499
500 if let Some(msg_data) = messages.get(message_id)? {
502 let bytes: &[u8] = msg_data.value();
503 if let Some(record) = MessageRecord::deserialize(bytes) {
504 result.push(StoredMessage {
505 envelope: record.to_envelope(),
506 attempt: 1,
507 });
508 }
509 }
510 }
511
512 debug!(
513 consumer_name,
514 count = result.len(),
515 "fetched pending messages"
516 );
517 Ok(result)
518 }
519
520 fn mark_delivered(
521 &self,
522 message_id: &str,
523 consumer_name: &str,
524 ack_deadline_ms: u64,
525 ) -> Result<(), StoreError> {
526 let dk = delivery_key(consumer_name, message_id);
527 let record = DeliveryRecord {
528 state: DeliveryState::Delivered,
529 attempt: 1,
530 ack_deadline_ms,
531 };
532
533 let txn = self.db.begin_write()?;
534 {
535 let mut deliveries = txn.open_table(DELIVERIES)?;
536 deliveries.insert(dk.as_str(), record.serialize().as_slice())?;
537 }
538 txn.commit()?;
539
540 trace!(message_id, consumer_name, "delivery recorded");
541 Ok(())
542 }
543
544 fn ack(&self, message_id: &str, consumer_name: &str) -> Result<(), StoreError> {
545 let dk = delivery_key(consumer_name, message_id);
546
547 let txn = self.db.begin_write()?;
548 {
549 let mut deliveries = txn.open_table(DELIVERIES)?;
550 let existing = deliveries
551 .get(dk.as_str())?
552 .ok_or_else(|| StoreError::MessageNotFound(message_id.to_string()))?;
553 let mut record = DeliveryRecord::deserialize(existing.value())
554 .ok_or(StoreError::InvalidState(255))?;
555 drop(existing);
556
557 record.state = DeliveryState::Acked;
558 deliveries.insert(dk.as_str(), record.serialize().as_slice())?;
559 }
560 txn.commit()?;
561
562 debug!(message_id, consumer_name, "message acked");
563 Ok(())
564 }
565
566 fn nack(&self, message_id: &str, consumer_name: &str, requeue: bool) -> Result<(), StoreError> {
567 let dk = delivery_key(consumer_name, message_id);
568
569 let txn = self.db.begin_write()?;
570 {
571 let mut deliveries = txn.open_table(DELIVERIES)?;
572 let existing = deliveries
573 .get(dk.as_str())?
574 .ok_or_else(|| StoreError::MessageNotFound(message_id.to_string()))?;
575 let record = DeliveryRecord::deserialize(existing.value())
576 .ok_or(StoreError::InvalidState(255))?;
577 drop(existing);
578
579 if requeue {
580 deliveries.remove(dk.as_str())?;
582 debug!(message_id, consumer_name, "message nacked, requeued");
583 } else {
584 let dead = DeliveryRecord {
586 state: DeliveryState::DeadLettered,
587 attempt: record.attempt,
588 ack_deadline_ms: 0,
589 };
590 deliveries.insert(dk.as_str(), dead.serialize().as_slice())?;
591 debug!(message_id, consumer_name, "message nacked, dead-lettered");
592 }
593 }
594 txn.commit()?;
595
596 Ok(())
597 }
598
599 fn fetch_expired(
600 &self,
601 consumer_name: &str,
602 now_ms: u64,
603 limit: u32,
604 ) -> Result<Vec<StoredMessage>, StoreError> {
605 let txn = self.db.begin_read()?;
606 let deliveries = txn.open_table(DELIVERIES)?;
607 let messages = txn.open_table(MESSAGES)?;
608
609 let prefix = format!("{consumer_name}{KEY_SEP}");
610 let mut result = Vec::new();
611
612 let range = deliveries.range(prefix.as_str()..)?;
613 for entry in range {
614 let entry = entry?;
615 let key = entry.0.value();
616
617 if !key.starts_with(&prefix) {
618 break;
619 }
620
621 if result.len() >= limit as usize {
622 break;
623 }
624
625 let record = match DeliveryRecord::deserialize(entry.1.value()) {
626 Some(r) => r,
627 None => continue,
628 };
629
630 if record.state != DeliveryState::Delivered || record.ack_deadline_ms > now_ms {
632 continue;
633 }
634
635 let message_id = &key[prefix.len()..];
636 if let Some(msg_data) = messages.get(message_id)?
637 && let Some(msg) = MessageRecord::deserialize(msg_data.value())
638 {
639 result.push(StoredMessage {
640 envelope: msg.to_envelope(),
641 attempt: record.attempt.saturating_add(1),
642 });
643 }
644 }
645
646 debug!(
647 consumer_name,
648 count = result.len(),
649 "fetched expired messages"
650 );
651 Ok(result)
652 }
653
654 fn gc_acked(&self, older_than_ms: u64) -> Result<u64, StoreError> {
655 let txn = self.db.begin_write()?;
656 let mut removed: u64 = 0;
657
658 {
659 let mut messages = txn.open_table(MESSAGES)?;
660 let mut idx = txn.open_table(SUBJECT_INDEX)?;
661 let mut deliveries = txn.open_table(DELIVERIES)?;
662
663 let mut msg_status: std::collections::HashMap<String, bool> =
665 std::collections::HashMap::new();
666 let mut del_keys_by_msg: std::collections::HashMap<String, Vec<String>> =
667 std::collections::HashMap::new();
668
669 for entry in deliveries.range::<&str>(..)? {
670 let entry = entry?;
671 let key = entry.0.value().to_string();
672 let message_id = match key.rsplit_once(KEY_SEP) {
673 Some((_, mid)) => mid.to_string(),
674 None => continue,
675 };
676
677 let terminal = DeliveryRecord::deserialize(entry.1.value())
678 .map(|dr| {
679 dr.state == DeliveryState::Acked || dr.state == DeliveryState::DeadLettered
680 })
681 .unwrap_or(false);
682
683 let all_terminal = msg_status.entry(message_id.clone()).or_insert(true);
684 if !terminal {
685 *all_terminal = false;
686 }
687
688 del_keys_by_msg.entry(message_id).or_default().push(key);
689 }
690
691 let msg_range: Vec<_> = {
693 messages
694 .range::<&str>(..)?
695 .filter_map(|e| {
696 let e = e.ok()?;
697 let id = e.0.value().to_string();
698 let record = MessageRecord::deserialize(e.1.value())?;
699 Some((id, record))
700 })
701 .collect()
702 };
703
704 for (id, record) in &msg_range {
705 if record.created_at_ms > older_than_ms {
706 continue;
707 }
708
709 let all_terminal = msg_status.get(id).copied().unwrap_or(false);
710 if !all_terminal {
711 continue;
712 }
713
714 messages.remove(id.as_str())?;
715 let idx_key = subject_index_key(&record.subject, id);
716 idx.remove(idx_key.as_slice())?;
717
718 if let Some(del_keys) = del_keys_by_msg.remove(id) {
719 for dk in del_keys {
720 deliveries.remove(dk.as_str())?;
721 }
722 }
723
724 removed = removed.saturating_add(1);
725 }
726 }
727
728 txn.commit()?;
729
730 if removed > 0 {
731 debug!(removed, "gc completed");
732 }
733
734 Ok(removed)
735 }
736
737 fn list_consumers(&self) -> Result<Vec<String>, StoreError> {
738 let txn = self.db.begin_read()?;
739 let consumers = txn.open_table(CONSUMERS)?;
740
741 let mut names = Vec::new();
742 for entry in consumers.range::<&str>(..)? {
743 let entry = entry?;
744 names.push(entry.0.value().to_string());
745 }
746
747 trace!(count = names.len(), "listed consumers");
748 Ok(names)
749 }
750}
751
752#[cfg(test)]
753mod tests {
754 use super::*;
755
756 fn test_envelope(id: &str, subject: &[u8]) -> EventEnvelope {
757 EventEnvelope {
758 id: id.to_string(),
759 subject: subject.to_vec(),
760 payload: vec![1, 2, 3],
761 headers: Default::default(),
762 timestamp_nanos: 42,
763 }
764 }
765
766 fn test_subject(s: &str) -> Vec<u8> {
767 use hermes_core::Subject;
768 Subject::from(s).to_bytes()
769 }
770
771 #[test]
772 fn test_persist_and_fetch_pending() {
773 let store = RedbMessageStore::open_temporary().unwrap();
774 let subject = test_subject("orders.Created");
775
776 store.register_consumer("worker-1", &subject, &[]).unwrap();
777 store.persist(&test_envelope("msg-1", &subject)).unwrap();
778 store.persist(&test_envelope("msg-2", &subject)).unwrap();
779
780 let pending = store.fetch_pending("worker-1", 10).unwrap();
781 assert_eq!(pending.len(), 2);
782 assert_eq!(pending[0].envelope.id, "msg-1");
783 assert_eq!(pending[1].envelope.id, "msg-2");
784 }
785
786 #[test]
787 fn test_mark_delivered_excludes_from_pending() {
788 let store = RedbMessageStore::open_temporary().unwrap();
789 let subject = test_subject("orders.Created");
790
791 store.register_consumer("worker-1", &subject, &[]).unwrap();
792 store.persist(&test_envelope("msg-1", &subject)).unwrap();
793 store.mark_delivered("msg-1", "worker-1", u64::MAX).unwrap();
794
795 let pending = store.fetch_pending("worker-1", 10).unwrap();
796 assert!(pending.is_empty());
797 }
798
799 #[test]
800 fn test_ack() {
801 let store = RedbMessageStore::open_temporary().unwrap();
802 let subject = test_subject("orders.Created");
803
804 store.register_consumer("worker-1", &subject, &[]).unwrap();
805 store.persist(&test_envelope("msg-1", &subject)).unwrap();
806 store.mark_delivered("msg-1", "worker-1", u64::MAX).unwrap();
807 store.ack("msg-1", "worker-1").unwrap();
808
809 let pending = store.fetch_pending("worker-1", 10).unwrap();
810 assert!(pending.is_empty());
811 let expired = store.fetch_expired("worker-1", u64::MAX, 10).unwrap();
812 assert!(expired.is_empty());
813 }
814
815 #[test]
816 fn test_nack_requeue() {
817 let store = RedbMessageStore::open_temporary().unwrap();
818 let subject = test_subject("orders.Created");
819
820 store.register_consumer("worker-1", &subject, &[]).unwrap();
821 store.persist(&test_envelope("msg-1", &subject)).unwrap();
822 store.mark_delivered("msg-1", "worker-1", u64::MAX).unwrap();
823 let pending = store.fetch_pending("worker-1", 10).unwrap();
824 assert_eq!(pending.len(), 0);
825 store.nack("msg-1", "worker-1", true).unwrap();
826
827 let pending = store.fetch_pending("worker-1", 10).unwrap();
828 assert_eq!(pending.len(), 1);
829 }
830
831 #[test]
832 fn test_nack_dead_letter() {
833 let store = RedbMessageStore::open_temporary().unwrap();
834 let subject = test_subject("orders.Created");
835
836 store.register_consumer("worker-1", &subject, &[]).unwrap();
837 store.persist(&test_envelope("msg-1", &subject)).unwrap();
838 store.mark_delivered("msg-1", "worker-1", u64::MAX).unwrap();
839 store.nack("msg-1", "worker-1", false).unwrap();
840
841 let pending = store.fetch_pending("worker-1", 10).unwrap();
842 assert!(pending.is_empty());
843 let expired = store.fetch_expired("worker-1", u64::MAX, 10).unwrap();
844 assert!(expired.is_empty());
845 }
846
847 #[test]
848 fn test_fetch_expired() {
849 let store = RedbMessageStore::open_temporary().unwrap();
850 let subject = test_subject("orders.Created");
851
852 store.register_consumer("worker-1", &subject, &[]).unwrap();
853 store.persist(&test_envelope("msg-1", &subject)).unwrap();
854 store.mark_delivered("msg-1", "worker-1", 1000).unwrap();
855
856 let expired = store.fetch_expired("worker-1", 999, 10).unwrap();
858 assert!(expired.is_empty());
859
860 let expired = store.fetch_expired("worker-1", 1001, 10).unwrap();
862 assert_eq!(expired.len(), 1);
863 assert_eq!(expired[0].attempt, 2);
864 }
865
866 #[test]
867 fn test_gc_acked() {
868 let store = RedbMessageStore::open_temporary().unwrap();
869 let subject = test_subject("orders.Created");
870
871 store.register_consumer("worker-1", &subject, &[]).unwrap();
872 store.persist(&test_envelope("msg-1", &subject)).unwrap();
873 store.mark_delivered("msg-1", "worker-1", u64::MAX).unwrap();
874 store.ack("msg-1", "worker-1").unwrap();
875
876 let removed = store.gc_acked(u64::MAX).unwrap();
877 assert_eq!(removed, 1);
878
879 let pending = store.fetch_pending("worker-1", 10).unwrap();
880 assert!(pending.is_empty());
881 }
882
883 #[test]
884 fn test_list_consumers() {
885 let store = RedbMessageStore::open_temporary().unwrap();
886 let subject = test_subject("orders.Created");
887 let subject2 = test_subject("orders.Shipped");
888
889 store.register_consumer("worker-1", &subject, &[]).unwrap();
890 store.register_consumer("worker-2", &subject2, &[]).unwrap();
891
892 let mut consumers = store.list_consumers().unwrap();
893 consumers.sort();
894 assert_eq!(consumers, vec!["worker-1", "worker-2"]);
895 }
896
897 #[test]
898 fn test_different_subjects_isolated() {
899 let store = RedbMessageStore::open_temporary().unwrap();
900 let subject1 = test_subject("orders.Created");
901 let subject2 = test_subject("orders.Shipped");
902
903 store.register_consumer("worker-1", &subject1, &[]).unwrap();
904 store.persist(&test_envelope("msg-1", &subject1)).unwrap();
905 store.persist(&test_envelope("msg-2", &subject2)).unwrap();
906
907 let pending = store.fetch_pending("worker-1", 10).unwrap();
909 assert_eq!(pending.len(), 1);
910 assert_eq!(pending[0].envelope.subject, subject1);
911 }
912
913 #[test]
914 fn test_message_record_roundtrip() {
915 let subject = test_subject("test.Subject");
916 let envelope = EventEnvelope {
917 id: "test-id".into(),
918 subject: subject.clone(),
919 payload: vec![1, 2, 3, 4, 5],
920 headers: [("key".to_string(), "value".to_string())].into(),
921 timestamp_nanos: 123456789,
922 };
923 let record = MessageRecord::from_envelope(&envelope, 999);
924 let serialized = record.serialize();
925 let deserialized = MessageRecord::deserialize(&serialized).unwrap();
926
927 assert_eq!(deserialized.id, "test-id");
928 assert_eq!(deserialized.subject, subject);
929 assert_eq!(deserialized.payload, vec![1, 2, 3, 4, 5]);
930 assert_eq!(deserialized.headers, vec![("key".into(), "value".into())]);
931 assert_eq!(deserialized.timestamp_nanos, 123456789);
932 assert_eq!(deserialized.created_at_ms, 999);
933 }
934}