atomr_patterns/outbox/
journal_offset_store.rs1use std::collections::HashMap;
17use std::sync::Arc;
18
19use atomr_persistence::{Journal, PersistentRepr};
20use parking_lot::Mutex;
21use tokio::runtime::Handle;
22
23use crate::outbox::OutboxOffsetStore;
24
25pub struct JournalOffsetStore<J: Journal> {
32 journal: Arc<J>,
33 pid: String,
34 cache: Mutex<HashMap<String, u64>>,
35 writer_uuid: String,
36}
37
38impl<J: Journal> JournalOffsetStore<J> {
39 pub async fn new(journal: Arc<J>, outbox_name: impl Into<String>) -> Self {
43 let outbox_name = outbox_name.into();
44 let pid = format!("outbox::{}::offsets", outbox_name);
45 let cache = match journal.highest_sequence_nr(&pid, 0).await {
46 Ok(highest) if highest > 0 => match journal.replay_messages(&pid, highest, highest, 1).await {
47 Ok(reprs) => reprs
48 .into_iter()
49 .last()
50 .filter(|r| !r.deleted)
51 .and_then(|r| decode(&r.payload))
52 .unwrap_or_default(),
53 Err(_) => HashMap::new(),
54 },
55 _ => HashMap::new(),
56 };
57 Self { journal, pid, cache: Mutex::new(cache), writer_uuid: format!("outbox-{}", rand_id()) }
58 }
59}
60
61impl<J: Journal> OutboxOffsetStore for JournalOffsetStore<J> {
62 fn load(&self) -> HashMap<String, u64> {
63 self.cache.lock().clone()
64 }
65
66 fn save(&self, offsets: &HashMap<String, u64>) {
67 let mut merged = {
72 let mut guard = self.cache.lock();
73 for (k, v) in offsets {
74 guard.insert(k.clone(), *v);
75 }
76 guard.clone()
77 };
78 let payload = encode(&merged);
80 merged.clear();
81 let _ = merged;
82
83 let journal = self.journal.clone();
84 let pid = self.pid.clone();
85 let writer_uuid = self.writer_uuid.clone();
86 let task = async move {
90 let next_seq = journal.highest_sequence_nr(&pid, 0).await.unwrap_or(0) + 1;
91 let _ = journal
92 .write_messages(vec![PersistentRepr {
93 persistence_id: pid,
94 sequence_nr: next_seq,
95 payload,
96 manifest: "outbox-offsets".into(),
97 writer_uuid,
98 deleted: false,
99 tags: vec!["outbox-offsets".into()],
100 }])
101 .await;
102 };
103 if let Ok(handle) = Handle::try_current() {
104 handle.spawn(task);
105 } else {
106 tracing::warn!(
109 "JournalOffsetStore::save called outside a tokio runtime; offset not durably written"
110 );
111 std::mem::drop(task);
112 }
113 }
114}
115
116fn encode(map: &HashMap<String, u64>) -> Vec<u8> {
117 let mut out = Vec::with_capacity(4 + map.len() * 24);
119 out.extend_from_slice(&(map.len() as u32).to_le_bytes());
120 for (k, v) in map {
121 let kb = k.as_bytes();
122 out.extend_from_slice(&(kb.len() as u32).to_le_bytes());
123 out.extend_from_slice(kb);
124 out.extend_from_slice(&v.to_le_bytes());
125 }
126 out
127}
128
129fn decode(bytes: &[u8]) -> Option<HashMap<String, u64>> {
130 if bytes.len() < 4 {
131 return None;
132 }
133 let count = u32::from_le_bytes(bytes[..4].try_into().ok()?) as usize;
134 let mut p = 4usize;
135 let mut out = HashMap::with_capacity(count);
136 for _ in 0..count {
137 if bytes.len() < p + 4 {
138 return None;
139 }
140 let kl = u32::from_le_bytes(bytes[p..p + 4].try_into().ok()?) as usize;
141 p += 4;
142 if bytes.len() < p + kl + 8 {
143 return None;
144 }
145 let key = std::str::from_utf8(&bytes[p..p + kl]).ok()?.to_string();
146 p += kl;
147 let v = u64::from_le_bytes(bytes[p..p + 8].try_into().ok()?);
148 p += 8;
149 out.insert(key, v);
150 }
151 Some(out)
152}
153
154fn rand_id() -> String {
155 use std::time::{SystemTime, UNIX_EPOCH};
156 let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos()).unwrap_or(0);
157 format!("{nanos:x}")
158}