use std::collections::HashMap;
use std::sync::Arc;
use atomr_persistence::{Journal, PersistentRepr};
use parking_lot::Mutex;
use tokio::runtime::Handle;
use crate::outbox::OutboxOffsetStore;
pub struct JournalOffsetStore<J: Journal> {
journal: Arc<J>,
pid: String,
cache: Mutex<HashMap<String, u64>>,
writer_uuid: String,
}
impl<J: Journal> JournalOffsetStore<J> {
pub async fn new(journal: Arc<J>, outbox_name: impl Into<String>) -> Self {
let outbox_name = outbox_name.into();
let pid = format!("outbox::{}::offsets", outbox_name);
let cache = match journal.highest_sequence_nr(&pid, 0).await {
Ok(highest) if highest > 0 => match journal.replay_messages(&pid, highest, highest, 1).await {
Ok(reprs) => reprs
.into_iter()
.last()
.filter(|r| !r.deleted)
.and_then(|r| decode(&r.payload))
.unwrap_or_default(),
Err(_) => HashMap::new(),
},
_ => HashMap::new(),
};
Self { journal, pid, cache: Mutex::new(cache), writer_uuid: format!("outbox-{}", rand_id()) }
}
}
impl<J: Journal> OutboxOffsetStore for JournalOffsetStore<J> {
fn load(&self) -> HashMap<String, u64> {
self.cache.lock().clone()
}
fn save(&self, offsets: &HashMap<String, u64>) {
let mut merged = {
let mut guard = self.cache.lock();
for (k, v) in offsets {
guard.insert(k.clone(), *v);
}
guard.clone()
};
let payload = encode(&merged);
merged.clear();
let _ = merged;
let journal = self.journal.clone();
let pid = self.pid.clone();
let writer_uuid = self.writer_uuid.clone();
let task = async move {
let next_seq = journal.highest_sequence_nr(&pid, 0).await.unwrap_or(0) + 1;
let _ = journal
.write_messages(vec![PersistentRepr {
persistence_id: pid,
sequence_nr: next_seq,
payload,
manifest: "outbox-offsets".into(),
writer_uuid,
deleted: false,
tags: vec!["outbox-offsets".into()],
}])
.await;
};
if let Ok(handle) = Handle::try_current() {
handle.spawn(task);
} else {
tracing::warn!(
"JournalOffsetStore::save called outside a tokio runtime; offset not durably written"
);
std::mem::drop(task);
}
}
}
fn encode(map: &HashMap<String, u64>) -> Vec<u8> {
let mut out = Vec::with_capacity(4 + map.len() * 24);
out.extend_from_slice(&(map.len() as u32).to_le_bytes());
for (k, v) in map {
let kb = k.as_bytes();
out.extend_from_slice(&(kb.len() as u32).to_le_bytes());
out.extend_from_slice(kb);
out.extend_from_slice(&v.to_le_bytes());
}
out
}
fn decode(bytes: &[u8]) -> Option<HashMap<String, u64>> {
if bytes.len() < 4 {
return None;
}
let count = u32::from_le_bytes(bytes[..4].try_into().ok()?) as usize;
let mut p = 4usize;
let mut out = HashMap::with_capacity(count);
for _ in 0..count {
if bytes.len() < p + 4 {
return None;
}
let kl = u32::from_le_bytes(bytes[p..p + 4].try_into().ok()?) as usize;
p += 4;
if bytes.len() < p + kl + 8 {
return None;
}
let key = std::str::from_utf8(&bytes[p..p + kl]).ok()?.to_string();
p += kl;
let v = u64::from_le_bytes(bytes[p..p + 8].try_into().ok()?);
p += 8;
out.insert(key, v);
}
Some(out)
}
fn rand_id() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos()).unwrap_or(0);
format!("{nanos:x}")
}