Skip to main content

hermes_store/
lib.rs

1mod error;
2mod redb_store;
3
4pub use error::StoreError;
5pub use redb_store::RedbMessageStore;
6
7use hermes_proto::EventEnvelope;
8
9/// A stored message with its delivery metadata.
10#[derive(Debug, Clone)]
11pub struct StoredMessage {
12    pub envelope: EventEnvelope,
13    pub attempt: u32,
14}
15
16/// Delivery state for a message per consumer.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum DeliveryState {
19    /// Persisted but not yet sent to the consumer.
20    Pending,
21    /// Sent to the consumer, waiting for ack.
22    Delivered,
23    /// Consumer acknowledged processing.
24    Acked,
25    /// Moved to dead-letter after max attempts.
26    DeadLettered,
27}
28
29impl DeliveryState {
30    fn as_u8(self) -> u8 {
31        match self {
32            Self::Pending => 0,
33            Self::Delivered => 1,
34            Self::Acked => 2,
35            Self::DeadLettered => 3,
36        }
37    }
38
39    fn from_u8(v: u8) -> Option<Self> {
40        match v {
41            0 => Some(Self::Pending),
42            1 => Some(Self::Delivered),
43            2 => Some(Self::Acked),
44            3 => Some(Self::DeadLettered),
45            _ => None,
46        }
47    }
48}
49
50/// Trait abstracting message persistence for at-least-once delivery.
51pub trait MessageStore: Send + Sync + 'static {
52    /// Persist a message. Called before any dispatch.
53    fn persist(&self, envelope: &EventEnvelope) -> Result<(), StoreError>;
54
55    /// Register a durable consumer for a subject (bincode-encoded bytes).
56    fn register_consumer(
57        &self,
58        consumer_name: &str,
59        subject: &[u8],
60        queue_groups: &[String],
61    ) -> Result<(), StoreError>;
62
63    /// Fetch messages not yet delivered to this consumer, up to `limit`.
64    fn fetch_pending(
65        &self,
66        consumer_name: &str,
67        limit: u32,
68    ) -> Result<Vec<StoredMessage>, StoreError>;
69
70    /// Mark a message as delivered to a consumer with an ack deadline.
71    fn mark_delivered(
72        &self,
73        message_id: &str,
74        consumer_name: &str,
75        ack_deadline_ms: u64,
76    ) -> Result<(), StoreError>;
77
78    /// Acknowledge a message. It won't be redelivered.
79    fn ack(&self, message_id: &str, consumer_name: &str) -> Result<(), StoreError>;
80
81    /// Negative acknowledge. If `requeue` is true, reset to pending for
82    /// immediate redelivery. If false, move to dead-letter.
83    fn nack(&self, message_id: &str, consumer_name: &str, requeue: bool) -> Result<(), StoreError>;
84
85    /// Fetch messages delivered but not acked before `now_ms`.
86    fn fetch_expired(
87        &self,
88        consumer_name: &str,
89        now_ms: u64,
90        limit: u32,
91    ) -> Result<Vec<StoredMessage>, StoreError>;
92
93    /// Delete acked messages older than `older_than_ms`.
94    /// Returns the number of messages removed.
95    fn gc_acked(&self, older_than_ms: u64) -> Result<u64, StoreError>;
96
97    /// List all registered consumer names.
98    fn list_consumers(&self) -> Result<Vec<String>, StoreError>;
99}