Skip to main content

atomr_patterns/outbox/
journal_offset_store.rs

1//! [`JournalOffsetStore`] — durable [`OutboxOffsetStore`] that
2//! piggy-backs on any [`atomr_persistence::Journal`] backend.
3//!
4//! Each `(outbox_name, source_pid)` pair is encoded as a dedicated
5//! persistence id (`outbox::<outbox_name>::offsets`). The full
6//! offset map is serialized as a single payload on every `save()`
7//! and restored by replaying the highest-sequence record on
8//! `load()`.
9//!
10//! Why one bucket per outbox instead of per source-pid: outbox
11//! progress is a single coherent cursor; loading it on restart should
12//! be a single round trip. Saves are `O(payload-size)` not `O(n
13//! sources)`, which matters when an outbox follows thousands of
14//! aggregate streams.
15
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use atomr_persistence::{Journal, PersistentRepr};
20use parking_lot::Mutex;
21use tokio::runtime::Handle;
22
23use crate::outbox::OutboxOffsetStore;
24
25/// Durable offset store backed by any `Journal` backend.
26///
27/// Plug it into [`super::OutboxBuilder::offset_store`] when you want
28/// outbox progress to survive process restarts. Pick the same backend
29/// you use for aggregate journals to keep the operational surface
30/// small (one connection pool, one schema migration).
31pub struct JournalOffsetStore<J: Journal> {
32    journal: Arc<J>,
33    pid: String,
34    cache: Mutex<HashMap<String, u64>>,
35    writer_uuid: String,
36}
37
38impl<J: Journal> JournalOffsetStore<J> {
39    /// Construct against `journal`, scoping offsets under
40    /// `outbox::<outbox_name>::offsets`. Eagerly hydrates the cache
41    /// from the journal — call from an async context.
42    pub async fn new(journal: Arc<J>, outbox_name: impl Into<String>) -> Self {
43        let outbox_name = outbox_name.into();
44        let pid = format!("outbox::{}::offsets", outbox_name);
45        let cache = match journal.highest_sequence_nr(&pid, 0).await {
46            Ok(highest) if highest > 0 => match journal.replay_messages(&pid, highest, highest, 1).await {
47                Ok(reprs) => reprs
48                    .into_iter()
49                    .last()
50                    .filter(|r| !r.deleted)
51                    .and_then(|r| decode(&r.payload))
52                    .unwrap_or_default(),
53                Err(_) => HashMap::new(),
54            },
55            _ => HashMap::new(),
56        };
57        Self { journal, pid, cache: Mutex::new(cache), writer_uuid: format!("outbox-{}", rand_id()) }
58    }
59}
60
61impl<J: Journal> OutboxOffsetStore for JournalOffsetStore<J> {
62    fn load(&self) -> HashMap<String, u64> {
63        self.cache.lock().clone()
64    }
65
66    fn save(&self, offsets: &HashMap<String, u64>) {
67        // Update cache, then persist. The cache is the source of
68        // truth for the publisher loop's *current* offsets; the
69        // journal write makes the cache durable across process
70        // restarts.
71        let mut merged = {
72            let mut guard = self.cache.lock();
73            for (k, v) in offsets {
74                guard.insert(k.clone(), *v);
75            }
76            guard.clone()
77        };
78        // Drop nothing — keep merged as the full snapshot to write.
79        let payload = encode(&merged);
80        merged.clear();
81        let _ = merged;
82
83        let journal = self.journal.clone();
84        let pid = self.pid.clone();
85        let writer_uuid = self.writer_uuid.clone();
86        // Fire-and-forget the async write. `OutboxOffsetStore::save`
87        // is sync, but Journal::write_messages is async — we hop
88        // onto the current tokio runtime if one is running.
89        let task = async move {
90            let next_seq = journal.highest_sequence_nr(&pid, 0).await.unwrap_or(0) + 1;
91            let _ = journal
92                .write_messages(vec![PersistentRepr {
93                    persistence_id: pid,
94                    sequence_nr: next_seq,
95                    payload,
96                    manifest: "outbox-offsets".into(),
97                    writer_uuid,
98                    deleted: false,
99                    tags: vec!["outbox-offsets".into()],
100                }])
101                .await;
102        };
103        if let Ok(handle) = Handle::try_current() {
104            handle.spawn(task);
105        } else {
106            // No tokio runtime — best we can do is drop the write.
107            // This path is intended for debug/test environments only.
108            tracing::warn!(
109                "JournalOffsetStore::save called outside a tokio runtime; offset not durably written"
110            );
111            std::mem::drop(task);
112        }
113    }
114}
115
116fn encode(map: &HashMap<String, u64>) -> Vec<u8> {
117    // Simple framed: [u32 count][u32 key_len][key bytes][u64 value]…
118    let mut out = Vec::with_capacity(4 + map.len() * 24);
119    out.extend_from_slice(&(map.len() as u32).to_le_bytes());
120    for (k, v) in map {
121        let kb = k.as_bytes();
122        out.extend_from_slice(&(kb.len() as u32).to_le_bytes());
123        out.extend_from_slice(kb);
124        out.extend_from_slice(&v.to_le_bytes());
125    }
126    out
127}
128
129fn decode(bytes: &[u8]) -> Option<HashMap<String, u64>> {
130    if bytes.len() < 4 {
131        return None;
132    }
133    let count = u32::from_le_bytes(bytes[..4].try_into().ok()?) as usize;
134    let mut p = 4usize;
135    let mut out = HashMap::with_capacity(count);
136    for _ in 0..count {
137        if bytes.len() < p + 4 {
138            return None;
139        }
140        let kl = u32::from_le_bytes(bytes[p..p + 4].try_into().ok()?) as usize;
141        p += 4;
142        if bytes.len() < p + kl + 8 {
143            return None;
144        }
145        let key = std::str::from_utf8(&bytes[p..p + kl]).ok()?.to_string();
146        p += kl;
147        let v = u64::from_le_bytes(bytes[p..p + 8].try_into().ok()?);
148        p += 8;
149        out.insert(key, v);
150    }
151    Some(out)
152}
153
154fn rand_id() -> String {
155    use std::time::{SystemTime, UNIX_EPOCH};
156    let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos()).unwrap_or(0);
157    format!("{nanos:x}")
158}