Skip to main content

hermes_store/
redb_store.rs

1use std::path::Path;
2
3use hermes_proto::EventEnvelope;
4use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
5use tracing::{debug, trace};
6
7use crate::error::StoreError;
8use crate::{DeliveryState, MessageStore, StoredMessage};
9
10// --- Table definitions ---
11//
12// redb is a typed key-value store. We define our tables with their key/value types.
13//
14// MESSAGES: message_id (str) -> serialized MessageRecord (bytes)
15// DELIVERIES: "consumer_name:message_id" (str) -> serialized DeliveryRecord (bytes)
16// CONSUMERS: consumer_name (str) -> serialized ConsumerRecord (bytes)
17// SUBJECT_INDEX: [len(4) + subject_bytes + message_id] (bytes) -> ()
18
19const MESSAGES: TableDefinition<&str, &[u8]> = TableDefinition::new("messages");
20const DELIVERIES: TableDefinition<&str, &[u8]> = TableDefinition::new("deliveries");
21const CONSUMERS: TableDefinition<&str, &[u8]> = TableDefinition::new("consumers");
22const SUBJECT_INDEX: TableDefinition<&[u8], ()> = TableDefinition::new("subject_index_v2");
23const META: TableDefinition<&str, &[u8]> = TableDefinition::new("meta");
24
25/// Current schema version. Bump when adding tables or changing formats.
26const SCHEMA_VERSION: u32 = 2;
27
28/// Record format version byte prepended to serialized records.
29const RECORD_VERSION: u8 = 2;
30
31/// A message as stored on disk.
32/// We use a simple hand-rolled serialization to avoid pulling in extra deps.
33#[derive(Debug, Clone)]
34struct MessageRecord {
35    id: String,
36    subject: Vec<u8>,
37    payload: Vec<u8>,
38    headers: Vec<(String, String)>,
39    timestamp_nanos: i64,
40    created_at_ms: u64,
41}
42
43impl MessageRecord {
44    fn from_envelope(envelope: &EventEnvelope, created_at_ms: u64) -> Self {
45        Self {
46            id: envelope.id.clone(),
47            subject: envelope.subject.clone(),
48            payload: envelope.payload.clone(),
49            headers: envelope
50                .headers
51                .iter()
52                .map(|(k, v)| (k.clone(), v.clone()))
53                .collect(),
54            timestamp_nanos: envelope.timestamp_nanos,
55            created_at_ms,
56        }
57    }
58
59    fn to_envelope(&self) -> EventEnvelope {
60        EventEnvelope {
61            id: self.id.clone(),
62            subject: self.subject.clone(),
63            payload: self.payload.clone(),
64            headers: self.headers.iter().cloned().collect(),
65            timestamp_nanos: self.timestamp_nanos,
66        }
67    }
68
69    fn serialize(&self) -> Vec<u8> {
70        let mut buf = Vec::new();
71        buf.push(RECORD_VERSION);
72        write_str(&mut buf, &self.id);
73        write_bytes(&mut buf, &self.subject);
74        write_bytes(&mut buf, &self.payload);
75        write_u32(&mut buf, self.headers.len() as u32);
76        for (k, v) in &self.headers {
77            write_str(&mut buf, k);
78            write_str(&mut buf, v);
79        }
80        write_i64(&mut buf, self.timestamp_nanos);
81        write_u64(&mut buf, self.created_at_ms);
82        buf
83    }
84
85    fn deserialize(data: &[u8]) -> Option<Self> {
86        if data.is_empty() {
87            return None;
88        }
89        let mut pos = 0;
90        let version = data[0];
91        pos += 1;
92
93        if version == RECORD_VERSION {
94            // v2: subject is bytes
95            let id = read_str(data, &mut pos)?;
96            let subject = read_bytes(data, &mut pos)?;
97            let payload = read_bytes(data, &mut pos)?;
98            let header_count = read_u32(data, &mut pos)? as usize;
99            let mut headers = Vec::with_capacity(header_count);
100            for _ in 0..header_count {
101                let k = read_str(data, &mut pos)?;
102                let v = read_str(data, &mut pos)?;
103                headers.push((k, v));
104            }
105            let timestamp_nanos = read_i64(data, &mut pos)?;
106            let created_at_ms = read_u64(data, &mut pos)?;
107            Some(Self {
108                id,
109                subject,
110                payload,
111                headers,
112                timestamp_nanos,
113                created_at_ms,
114            })
115        } else {
116            // Legacy v0/v1: subject was a string, re-parse from start
117            pos = if version == 1 { 1 } else { 0 };
118            let id = read_str(data, &mut pos)?;
119            let subject_str = read_str(data, &mut pos)?;
120            let payload = read_bytes(data, &mut pos)?;
121            let header_count = read_u32(data, &mut pos)? as usize;
122            let mut headers = Vec::with_capacity(header_count);
123            for _ in 0..header_count {
124                let k = read_str(data, &mut pos)?;
125                let v = read_str(data, &mut pos)?;
126                headers.push((k, v));
127            }
128            let timestamp_nanos = read_i64(data, &mut pos)?;
129            let created_at_ms = read_u64(data, &mut pos)?;
130            Some(Self {
131                id,
132                subject: subject_str.into_bytes(),
133                payload,
134                headers,
135                timestamp_nanos,
136                created_at_ms,
137            })
138        }
139    }
140}
141
142#[derive(Debug, Clone)]
143struct DeliveryRecord {
144    state: DeliveryState,
145    attempt: u32,
146    ack_deadline_ms: u64,
147}
148
149impl DeliveryRecord {
150    fn serialize(&self) -> Vec<u8> {
151        let mut buf = Vec::with_capacity(14);
152        buf.push(RECORD_VERSION);
153        buf.push(self.state.as_u8());
154        write_u32(&mut buf, self.attempt);
155        write_u64(&mut buf, self.ack_deadline_ms);
156        buf
157    }
158
159    fn deserialize(data: &[u8]) -> Option<Self> {
160        if data.is_empty() {
161            return None;
162        }
163        let version = data[0];
164        if version == RECORD_VERSION || version == 1 {
165            // v1/v2: [version, state, attempt(4), deadline(8)] = 14 bytes
166            if data.len() < 14 {
167                return None;
168            }
169            let state = DeliveryState::from_u8(data[1])?;
170            let mut pos = 2;
171            let attempt = read_u32(data, &mut pos)?;
172            let ack_deadline_ms = read_u64(data, &mut pos)?;
173            Some(Self {
174                state,
175                attempt,
176                ack_deadline_ms,
177            })
178        } else {
179            // Legacy format: [state, attempt(4), deadline(8)] = 13 bytes
180            if data.len() < 13 {
181                return None;
182            }
183            let state = DeliveryState::from_u8(data[0])?;
184            let mut pos = 1;
185            let attempt = read_u32(data, &mut pos)?;
186            let ack_deadline_ms = read_u64(data, &mut pos)?;
187            Some(Self {
188                state,
189                attempt,
190                ack_deadline_ms,
191            })
192        }
193    }
194}
195
196#[derive(Debug, Clone)]
197struct ConsumerRecord {
198    subject: Vec<u8>,
199    queue_groups: Vec<String>,
200}
201
202impl ConsumerRecord {
203    fn serialize(&self) -> Vec<u8> {
204        let mut buf = Vec::new();
205        buf.push(RECORD_VERSION);
206        write_bytes(&mut buf, &self.subject);
207        write_u32(&mut buf, self.queue_groups.len() as u32);
208        for qg in &self.queue_groups {
209            write_str(&mut buf, qg);
210        }
211        buf
212    }
213
214    fn deserialize(data: &[u8]) -> Option<Self> {
215        if data.is_empty() {
216            return None;
217        }
218        let version = data[0];
219
220        if version == RECORD_VERSION {
221            let mut pos = 1;
222            let subject = read_bytes(data, &mut pos)?;
223            let count = read_u32(data, &mut pos)?;
224            let mut queue_groups = Vec::with_capacity(count as usize);
225            for _ in 0..count {
226                queue_groups.push(read_str(data, &mut pos)?);
227            }
228            Some(Self {
229                subject,
230                queue_groups,
231            })
232        } else if version == 1 {
233            let mut pos = 1;
234            let subject_str = read_str(data, &mut pos)?;
235            let count = read_u32(data, &mut pos)?;
236            let mut queue_groups = Vec::with_capacity(count as usize);
237            for _ in 0..count {
238                queue_groups.push(read_str(data, &mut pos)?);
239            }
240            Some(Self {
241                subject: subject_str.into_bytes(),
242                queue_groups,
243            })
244        } else {
245            let mut pos = 0;
246            let subject_str = read_str(data, &mut pos)?;
247            let has_qg = *data.get(pos)?;
248            pos += 1;
249            let queue_groups = if has_qg == 1 {
250                vec![read_str(data, &mut pos)?]
251            } else {
252                vec![]
253            };
254            Some(Self {
255                subject: subject_str.into_bytes(),
256                queue_groups,
257            })
258        }
259    }
260}
261
262// --- Serialization helpers ---
263
264fn write_u32(buf: &mut Vec<u8>, v: u32) {
265    buf.extend_from_slice(&v.to_le_bytes());
266}
267
268fn write_u64(buf: &mut Vec<u8>, v: u64) {
269    buf.extend_from_slice(&v.to_le_bytes());
270}
271
272fn write_i64(buf: &mut Vec<u8>, v: i64) {
273    buf.extend_from_slice(&v.to_le_bytes());
274}
275
276fn write_str(buf: &mut Vec<u8>, s: &str) {
277    write_u32(buf, s.len() as u32);
278    buf.extend_from_slice(s.as_bytes());
279}
280
281fn write_bytes(buf: &mut Vec<u8>, b: &[u8]) {
282    write_u32(buf, b.len() as u32);
283    buf.extend_from_slice(b);
284}
285
286fn read_u32(data: &[u8], pos: &mut usize) -> Option<u32> {
287    let bytes: [u8; 4] = data.get(*pos..*pos + 4)?.try_into().ok()?;
288    *pos += 4;
289    Some(u32::from_le_bytes(bytes))
290}
291
292fn read_u64(data: &[u8], pos: &mut usize) -> Option<u64> {
293    let bytes: [u8; 8] = data.get(*pos..*pos + 8)?.try_into().ok()?;
294    *pos += 8;
295    Some(u64::from_le_bytes(bytes))
296}
297
298fn read_i64(data: &[u8], pos: &mut usize) -> Option<i64> {
299    let bytes: [u8; 8] = data.get(*pos..*pos + 8)?.try_into().ok()?;
300    *pos += 8;
301    Some(i64::from_le_bytes(bytes))
302}
303
304fn read_str(data: &[u8], pos: &mut usize) -> Option<String> {
305    let len = read_u32(data, pos)? as usize;
306    let s = std::str::from_utf8(data.get(*pos..*pos + len)?).ok()?;
307    *pos += len;
308    Some(s.to_string())
309}
310
311fn read_bytes(data: &[u8], pos: &mut usize) -> Option<Vec<u8>> {
312    let len = read_u32(data, pos)? as usize;
313    let b = data.get(*pos..*pos + len)?;
314    *pos += len;
315    Some(b.to_vec())
316}
317
318fn now_ms() -> u64 {
319    std::time::SystemTime::now()
320        .duration_since(std::time::UNIX_EPOCH)
321        .unwrap_or_default()
322        .as_millis() as u64
323}
324
325/// Build a composite key for the subject index: [subject_len(4 LE)][subject][message_id].
326fn subject_index_key(subject: &[u8], message_id: &str) -> Vec<u8> {
327    let mut key = Vec::with_capacity(4 + subject.len() + message_id.len());
328    key.extend_from_slice(&(subject.len() as u32).to_le_bytes());
329    key.extend_from_slice(subject);
330    key.extend_from_slice(message_id.as_bytes());
331    key
332}
333
334/// Build the prefix for range-scanning the subject index.
335fn subject_index_prefix(subject: &[u8]) -> Vec<u8> {
336    let mut prefix = Vec::with_capacity(4 + subject.len());
337    prefix.extend_from_slice(&(subject.len() as u32).to_le_bytes());
338    prefix.extend_from_slice(subject);
339    prefix
340}
341
342/// Separator for composite delivery keys. Using NUL byte which cannot appear in
343/// UTF-8 consumer names or message IDs.
344const KEY_SEP: char = '\0';
345
346fn delivery_key(consumer_name: &str, message_id: &str) -> String {
347    format!("{consumer_name}{KEY_SEP}{message_id}")
348}
349
350// --- RedbMessageStore ---
351
352pub struct RedbMessageStore {
353    db: Database,
354}
355
356impl RedbMessageStore {
357    /// Ensure all required tables exist and verify schema version.
358    fn init_tables(db: &Database) -> Result<(), StoreError> {
359        let txn = db.begin_write()?;
360        {
361            let _ = txn.open_table(MESSAGES)?;
362            let _ = txn.open_table(DELIVERIES)?;
363            let _ = txn.open_table(CONSUMERS)?;
364            let _ = txn.open_table(SUBJECT_INDEX)?;
365            let mut meta = txn.open_table(META)?;
366
367            // Check or set schema version.
368            let needs_init = {
369                let existing = meta.get("schema_version")?;
370                match existing {
371                    Some(guard) => {
372                        let bytes: &[u8] = guard.value();
373                        if bytes.len() >= 4 {
374                            let version =
375                                u32::from_le_bytes(bytes[..4].try_into().unwrap_or([0; 4]));
376                            if version > SCHEMA_VERSION {
377                                return Err(StoreError::InvalidState(version as u8));
378                            }
379                        }
380                        false
381                    }
382                    None => true,
383                }
384            };
385            if needs_init {
386                meta.insert("schema_version", SCHEMA_VERSION.to_le_bytes().as_slice())?;
387            }
388        }
389        txn.commit()?;
390        Ok(())
391    }
392
393    /// Open or create a store at the given path.
394    pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
395        let db = Database::create(path.as_ref())?;
396        Self::init_tables(&db)?;
397        debug!(path = %path.as_ref().display(), "store opened");
398        Ok(Self { db })
399    }
400
401    /// Open an in-memory store (for tests).
402    #[cfg(test)]
403    pub fn open_temporary() -> Result<Self, StoreError> {
404        let db = Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?;
405        Self::init_tables(&db)?;
406        Ok(Self { db })
407    }
408}
409
410impl MessageStore for RedbMessageStore {
411    fn persist(&self, envelope: &EventEnvelope) -> Result<(), StoreError> {
412        let record = MessageRecord::from_envelope(envelope, now_ms());
413        let serialized = record.serialize();
414
415        let txn = self.db.begin_write()?;
416        {
417            let mut messages = txn.open_table(MESSAGES)?;
418            messages.insert(envelope.id.as_str(), serialized.as_slice())?;
419
420            let mut idx = txn.open_table(SUBJECT_INDEX)?;
421            let key = subject_index_key(&envelope.subject, &envelope.id);
422            idx.insert(key.as_slice(), ())?;
423        }
424        txn.commit()?;
425
426        debug!(id = envelope.id, "message persisted");
427        Ok(())
428    }
429
430    fn register_consumer(
431        &self,
432        consumer_name: &str,
433        subject: &[u8],
434        queue_groups: &[String],
435    ) -> Result<(), StoreError> {
436        let record = ConsumerRecord {
437            subject: subject.to_vec(),
438            queue_groups: queue_groups.to_vec(),
439        };
440
441        let txn = self.db.begin_write()?;
442        {
443            let mut consumers = txn.open_table(CONSUMERS)?;
444            consumers.insert(consumer_name, record.serialize().as_slice())?;
445        }
446        txn.commit()?;
447
448        debug!(consumer_name, "consumer registered");
449        Ok(())
450    }
451
452    fn fetch_pending(
453        &self,
454        consumer_name: &str,
455        limit: u32,
456    ) -> Result<Vec<StoredMessage>, StoreError> {
457        let txn = self.db.begin_read()?;
458
459        // Find the consumer's subject.
460        let consumers = txn.open_table(CONSUMERS)?;
461        let consumer_data = consumers
462            .get(consumer_name)?
463            .ok_or_else(|| StoreError::ConsumerNotFound(consumer_name.to_string()))?;
464        let consumer = ConsumerRecord::deserialize(consumer_data.value())
465            .ok_or(StoreError::InvalidState(255))?;
466
467        let messages = txn.open_table(MESSAGES)?;
468        let deliveries = txn.open_table(DELIVERIES)?;
469        let idx = txn.open_table(SUBJECT_INDEX)?;
470
471        let prefix = subject_index_prefix(&consumer.subject);
472        let mut result = Vec::new();
473
474        let range = idx.range(prefix.as_slice()..)?;
475        for entry in range {
476            let entry = entry?;
477            let key: &[u8] = entry.0.value();
478
479            // Stop if we've left the prefix range.
480            if !key.starts_with(&prefix) {
481                break;
482            }
483
484            if result.len() >= limit as usize {
485                break;
486            }
487
488            // Extract message_id from the key (after prefix).
489            let message_id = match std::str::from_utf8(&key[prefix.len()..]) {
490                Ok(s) => s,
491                Err(_) => continue,
492            };
493            let dk = delivery_key(consumer_name, message_id);
494
495            // Skip if already has a delivery record (delivered, acked, or dead-lettered).
496            if deliveries.get(dk.as_str())?.is_some() {
497                continue;
498            }
499
500            // Fetch the message.
501            if let Some(msg_data) = messages.get(message_id)? {
502                let bytes: &[u8] = msg_data.value();
503                if let Some(record) = MessageRecord::deserialize(bytes) {
504                    result.push(StoredMessage {
505                        envelope: record.to_envelope(),
506                        attempt: 1,
507                    });
508                }
509            }
510        }
511
512        debug!(
513            consumer_name,
514            count = result.len(),
515            "fetched pending messages"
516        );
517        Ok(result)
518    }
519
520    fn mark_delivered(
521        &self,
522        message_id: &str,
523        consumer_name: &str,
524        ack_deadline_ms: u64,
525    ) -> Result<(), StoreError> {
526        let dk = delivery_key(consumer_name, message_id);
527        let record = DeliveryRecord {
528            state: DeliveryState::Delivered,
529            attempt: 1,
530            ack_deadline_ms,
531        };
532
533        let txn = self.db.begin_write()?;
534        {
535            let mut deliveries = txn.open_table(DELIVERIES)?;
536            deliveries.insert(dk.as_str(), record.serialize().as_slice())?;
537        }
538        txn.commit()?;
539
540        trace!(message_id, consumer_name, "delivery recorded");
541        Ok(())
542    }
543
544    fn ack(&self, message_id: &str, consumer_name: &str) -> Result<(), StoreError> {
545        let dk = delivery_key(consumer_name, message_id);
546
547        let txn = self.db.begin_write()?;
548        {
549            let mut deliveries = txn.open_table(DELIVERIES)?;
550            let existing = deliveries
551                .get(dk.as_str())?
552                .ok_or_else(|| StoreError::MessageNotFound(message_id.to_string()))?;
553            let mut record = DeliveryRecord::deserialize(existing.value())
554                .ok_or(StoreError::InvalidState(255))?;
555            drop(existing);
556
557            record.state = DeliveryState::Acked;
558            deliveries.insert(dk.as_str(), record.serialize().as_slice())?;
559        }
560        txn.commit()?;
561
562        debug!(message_id, consumer_name, "message acked");
563        Ok(())
564    }
565
566    fn nack(&self, message_id: &str, consumer_name: &str, requeue: bool) -> Result<(), StoreError> {
567        let dk = delivery_key(consumer_name, message_id);
568
569        let txn = self.db.begin_write()?;
570        {
571            let mut deliveries = txn.open_table(DELIVERIES)?;
572            let existing = deliveries
573                .get(dk.as_str())?
574                .ok_or_else(|| StoreError::MessageNotFound(message_id.to_string()))?;
575            let record = DeliveryRecord::deserialize(existing.value())
576                .ok_or(StoreError::InvalidState(255))?;
577            drop(existing);
578
579            if requeue {
580                // Remove the delivery record entirely so fetch_pending picks it up again.
581                deliveries.remove(dk.as_str())?;
582                debug!(message_id, consumer_name, "message nacked, requeued");
583            } else {
584                // Dead-letter.
585                let dead = DeliveryRecord {
586                    state: DeliveryState::DeadLettered,
587                    attempt: record.attempt,
588                    ack_deadline_ms: 0,
589                };
590                deliveries.insert(dk.as_str(), dead.serialize().as_slice())?;
591                debug!(message_id, consumer_name, "message nacked, dead-lettered");
592            }
593        }
594        txn.commit()?;
595
596        Ok(())
597    }
598
599    fn fetch_expired(
600        &self,
601        consumer_name: &str,
602        now_ms: u64,
603        limit: u32,
604    ) -> Result<Vec<StoredMessage>, StoreError> {
605        let txn = self.db.begin_read()?;
606        let deliveries = txn.open_table(DELIVERIES)?;
607        let messages = txn.open_table(MESSAGES)?;
608
609        let prefix = format!("{consumer_name}{KEY_SEP}");
610        let mut result = Vec::new();
611
612        let range = deliveries.range(prefix.as_str()..)?;
613        for entry in range {
614            let entry = entry?;
615            let key = entry.0.value();
616
617            if !key.starts_with(&prefix) {
618                break;
619            }
620
621            if result.len() >= limit as usize {
622                break;
623            }
624
625            let record = match DeliveryRecord::deserialize(entry.1.value()) {
626                Some(r) => r,
627                None => continue,
628            };
629
630            // Only pick up delivered (not acked, not dead-lettered) with expired deadline.
631            if record.state != DeliveryState::Delivered || record.ack_deadline_ms > now_ms {
632                continue;
633            }
634
635            let message_id = &key[prefix.len()..];
636            if let Some(msg_data) = messages.get(message_id)?
637                && let Some(msg) = MessageRecord::deserialize(msg_data.value())
638            {
639                result.push(StoredMessage {
640                    envelope: msg.to_envelope(),
641                    attempt: record.attempt.saturating_add(1),
642                });
643            }
644        }
645
646        debug!(
647            consumer_name,
648            count = result.len(),
649            "fetched expired messages"
650        );
651        Ok(result)
652    }
653
654    fn gc_acked(&self, older_than_ms: u64) -> Result<u64, StoreError> {
655        let txn = self.db.begin_write()?;
656        let mut removed: u64 = 0;
657
658        {
659            let mut messages = txn.open_table(MESSAGES)?;
660            let mut idx = txn.open_table(SUBJECT_INDEX)?;
661            let mut deliveries = txn.open_table(DELIVERIES)?;
662
663            // Phase 1: Single pass over deliveries → build per-message status.
664            let mut msg_status: std::collections::HashMap<String, bool> =
665                std::collections::HashMap::new();
666            let mut del_keys_by_msg: std::collections::HashMap<String, Vec<String>> =
667                std::collections::HashMap::new();
668
669            for entry in deliveries.range::<&str>(..)? {
670                let entry = entry?;
671                let key = entry.0.value().to_string();
672                let message_id = match key.rsplit_once(KEY_SEP) {
673                    Some((_, mid)) => mid.to_string(),
674                    None => continue,
675                };
676
677                let terminal = DeliveryRecord::deserialize(entry.1.value())
678                    .map(|dr| {
679                        dr.state == DeliveryState::Acked || dr.state == DeliveryState::DeadLettered
680                    })
681                    .unwrap_or(false);
682
683                let all_terminal = msg_status.entry(message_id.clone()).or_insert(true);
684                if !terminal {
685                    *all_terminal = false;
686                }
687
688                del_keys_by_msg.entry(message_id).or_default().push(key);
689            }
690
691            // Phase 2: Scan messages, check age + delivery status.
692            let msg_range: Vec<_> = {
693                messages
694                    .range::<&str>(..)?
695                    .filter_map(|e| {
696                        let e = e.ok()?;
697                        let id = e.0.value().to_string();
698                        let record = MessageRecord::deserialize(e.1.value())?;
699                        Some((id, record))
700                    })
701                    .collect()
702            };
703
704            for (id, record) in &msg_range {
705                if record.created_at_ms > older_than_ms {
706                    continue;
707                }
708
709                let all_terminal = msg_status.get(id).copied().unwrap_or(false);
710                if !all_terminal {
711                    continue;
712                }
713
714                messages.remove(id.as_str())?;
715                let idx_key = subject_index_key(&record.subject, id);
716                idx.remove(idx_key.as_slice())?;
717
718                if let Some(del_keys) = del_keys_by_msg.remove(id) {
719                    for dk in del_keys {
720                        deliveries.remove(dk.as_str())?;
721                    }
722                }
723
724                removed = removed.saturating_add(1);
725            }
726        }
727
728        txn.commit()?;
729
730        if removed > 0 {
731            debug!(removed, "gc completed");
732        }
733
734        Ok(removed)
735    }
736
737    fn list_consumers(&self) -> Result<Vec<String>, StoreError> {
738        let txn = self.db.begin_read()?;
739        let consumers = txn.open_table(CONSUMERS)?;
740
741        let mut names = Vec::new();
742        for entry in consumers.range::<&str>(..)? {
743            let entry = entry?;
744            names.push(entry.0.value().to_string());
745        }
746
747        trace!(count = names.len(), "listed consumers");
748        Ok(names)
749    }
750}
751
752#[cfg(test)]
753mod tests {
754    use super::*;
755
756    fn test_envelope(id: &str, subject: &[u8]) -> EventEnvelope {
757        EventEnvelope {
758            id: id.to_string(),
759            subject: subject.to_vec(),
760            payload: vec![1, 2, 3],
761            headers: Default::default(),
762            timestamp_nanos: 42,
763        }
764    }
765
766    fn test_subject(s: &str) -> Vec<u8> {
767        use hermes_core::Subject;
768        Subject::from(s).to_bytes()
769    }
770
771    #[test]
772    fn test_persist_and_fetch_pending() {
773        let store = RedbMessageStore::open_temporary().unwrap();
774        let subject = test_subject("orders.Created");
775
776        store.register_consumer("worker-1", &subject, &[]).unwrap();
777        store.persist(&test_envelope("msg-1", &subject)).unwrap();
778        store.persist(&test_envelope("msg-2", &subject)).unwrap();
779
780        let pending = store.fetch_pending("worker-1", 10).unwrap();
781        assert_eq!(pending.len(), 2);
782        assert_eq!(pending[0].envelope.id, "msg-1");
783        assert_eq!(pending[1].envelope.id, "msg-2");
784    }
785
786    #[test]
787    fn test_mark_delivered_excludes_from_pending() {
788        let store = RedbMessageStore::open_temporary().unwrap();
789        let subject = test_subject("orders.Created");
790
791        store.register_consumer("worker-1", &subject, &[]).unwrap();
792        store.persist(&test_envelope("msg-1", &subject)).unwrap();
793        store.mark_delivered("msg-1", "worker-1", u64::MAX).unwrap();
794
795        let pending = store.fetch_pending("worker-1", 10).unwrap();
796        assert!(pending.is_empty());
797    }
798
799    #[test]
800    fn test_ack() {
801        let store = RedbMessageStore::open_temporary().unwrap();
802        let subject = test_subject("orders.Created");
803
804        store.register_consumer("worker-1", &subject, &[]).unwrap();
805        store.persist(&test_envelope("msg-1", &subject)).unwrap();
806        store.mark_delivered("msg-1", "worker-1", u64::MAX).unwrap();
807        store.ack("msg-1", "worker-1").unwrap();
808
809        let pending = store.fetch_pending("worker-1", 10).unwrap();
810        assert!(pending.is_empty());
811        let expired = store.fetch_expired("worker-1", u64::MAX, 10).unwrap();
812        assert!(expired.is_empty());
813    }
814
815    #[test]
816    fn test_nack_requeue() {
817        let store = RedbMessageStore::open_temporary().unwrap();
818        let subject = test_subject("orders.Created");
819
820        store.register_consumer("worker-1", &subject, &[]).unwrap();
821        store.persist(&test_envelope("msg-1", &subject)).unwrap();
822        store.mark_delivered("msg-1", "worker-1", u64::MAX).unwrap();
823        let pending = store.fetch_pending("worker-1", 10).unwrap();
824        assert_eq!(pending.len(), 0);
825        store.nack("msg-1", "worker-1", true).unwrap();
826
827        let pending = store.fetch_pending("worker-1", 10).unwrap();
828        assert_eq!(pending.len(), 1);
829    }
830
831    #[test]
832    fn test_nack_dead_letter() {
833        let store = RedbMessageStore::open_temporary().unwrap();
834        let subject = test_subject("orders.Created");
835
836        store.register_consumer("worker-1", &subject, &[]).unwrap();
837        store.persist(&test_envelope("msg-1", &subject)).unwrap();
838        store.mark_delivered("msg-1", "worker-1", u64::MAX).unwrap();
839        store.nack("msg-1", "worker-1", false).unwrap();
840
841        let pending = store.fetch_pending("worker-1", 10).unwrap();
842        assert!(pending.is_empty());
843        let expired = store.fetch_expired("worker-1", u64::MAX, 10).unwrap();
844        assert!(expired.is_empty());
845    }
846
847    #[test]
848    fn test_fetch_expired() {
849        let store = RedbMessageStore::open_temporary().unwrap();
850        let subject = test_subject("orders.Created");
851
852        store.register_consumer("worker-1", &subject, &[]).unwrap();
853        store.persist(&test_envelope("msg-1", &subject)).unwrap();
854        store.mark_delivered("msg-1", "worker-1", 1000).unwrap();
855
856        // Not expired yet.
857        let expired = store.fetch_expired("worker-1", 999, 10).unwrap();
858        assert!(expired.is_empty());
859
860        // Now expired.
861        let expired = store.fetch_expired("worker-1", 1001, 10).unwrap();
862        assert_eq!(expired.len(), 1);
863        assert_eq!(expired[0].attempt, 2);
864    }
865
866    #[test]
867    fn test_gc_acked() {
868        let store = RedbMessageStore::open_temporary().unwrap();
869        let subject = test_subject("orders.Created");
870
871        store.register_consumer("worker-1", &subject, &[]).unwrap();
872        store.persist(&test_envelope("msg-1", &subject)).unwrap();
873        store.mark_delivered("msg-1", "worker-1", u64::MAX).unwrap();
874        store.ack("msg-1", "worker-1").unwrap();
875
876        let removed = store.gc_acked(u64::MAX).unwrap();
877        assert_eq!(removed, 1);
878
879        let pending = store.fetch_pending("worker-1", 10).unwrap();
880        assert!(pending.is_empty());
881    }
882
883    #[test]
884    fn test_list_consumers() {
885        let store = RedbMessageStore::open_temporary().unwrap();
886        let subject = test_subject("orders.Created");
887        let subject2 = test_subject("orders.Shipped");
888
889        store.register_consumer("worker-1", &subject, &[]).unwrap();
890        store.register_consumer("worker-2", &subject2, &[]).unwrap();
891
892        let mut consumers = store.list_consumers().unwrap();
893        consumers.sort();
894        assert_eq!(consumers, vec!["worker-1", "worker-2"]);
895    }
896
897    #[test]
898    fn test_different_subjects_isolated() {
899        let store = RedbMessageStore::open_temporary().unwrap();
900        let subject1 = test_subject("orders.Created");
901        let subject2 = test_subject("orders.Shipped");
902
903        store.register_consumer("worker-1", &subject1, &[]).unwrap();
904        store.persist(&test_envelope("msg-1", &subject1)).unwrap();
905        store.persist(&test_envelope("msg-2", &subject2)).unwrap();
906
907        // worker-1 only sees orders.Created.
908        let pending = store.fetch_pending("worker-1", 10).unwrap();
909        assert_eq!(pending.len(), 1);
910        assert_eq!(pending[0].envelope.subject, subject1);
911    }
912
913    #[test]
914    fn test_message_record_roundtrip() {
915        let subject = test_subject("test.Subject");
916        let envelope = EventEnvelope {
917            id: "test-id".into(),
918            subject: subject.clone(),
919            payload: vec![1, 2, 3, 4, 5],
920            headers: [("key".to_string(), "value".to_string())].into(),
921            timestamp_nanos: 123456789,
922        };
923        let record = MessageRecord::from_envelope(&envelope, 999);
924        let serialized = record.serialize();
925        let deserialized = MessageRecord::deserialize(&serialized).unwrap();
926
927        assert_eq!(deserialized.id, "test-id");
928        assert_eq!(deserialized.subject, subject);
929        assert_eq!(deserialized.payload, vec![1, 2, 3, 4, 5]);
930        assert_eq!(deserialized.headers, vec![("key".into(), "value".into())]);
931        assert_eq!(deserialized.timestamp_nanos, 123456789);
932        assert_eq!(deserialized.created_at_ms, 999);
933    }
934}