use std::time::Duration;
use anyhow::{Result, bail};
use async_nats::jetstream::object_store::ObjectStore;
use sha2::{Digest, Sha256};
use tokio::io::{self, AsyncReadExt};
use tracing::warn;
const MAX_ATTEMPTS: u32 = 5;
pub async fn verify_readback(
store: &ObjectStore,
key: &str,
expected_digest: Option<&str>,
expected_size: usize,
) -> Result<()> {
let Some(expected_digest) = expected_digest else {
return verify_size_only(store, key, expected_size).await;
};
let mut delay = Duration::from_millis(200);
for attempt in 1..=MAX_ATTEMPTS {
match read_and_hash(store, key).await {
Ok((got_digest, got_size)) => {
if got_digest == expected_digest && got_size == expected_size {
return Ok(());
}
warn!(
attempt,
expected_digest,
got_digest = %got_digest,
expected_size,
got_size,
"publish read-back mismatch — JetStream object_store not yet consistent (#277)"
);
}
Err(e) => {
warn!(attempt, error = %e, "publish read-back: get failed (transient?)");
}
}
if attempt < MAX_ATTEMPTS {
tokio::time::sleep(delay).await;
delay = (delay * 2).min(Duration::from_secs(3));
}
}
bail!(
"publish read-back: object_store key {key:?} still inconsistent after {MAX_ATTEMPTS} attempts \
— JetStream race (#277). Retry the publish in a few seconds; if it persists, check broker health."
);
}
async fn read_and_hash(store: &ObjectStore, key: &str) -> Result<(String, usize)> {
let mut obj = store.get(key).await?;
let mut hasher = Sha256::new();
let mut total: usize = 0;
let mut buf = vec![0u8; 64 * 1024];
loop {
let n = obj.read(&mut buf).await?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
total += n;
}
use base64::Engine;
let b64 = base64::engine::general_purpose::URL_SAFE.encode(hasher.finalize());
Ok((format!("SHA-256={b64}"), total))
}
async fn verify_size_only(store: &ObjectStore, key: &str, expected_size: usize) -> Result<()> {
let mut delay = Duration::from_millis(200);
for attempt in 1..=MAX_ATTEMPTS {
let res = async {
let mut obj = store.get(key).await?;
let n = io::copy(&mut obj, &mut io::sink()).await?;
Ok::<u64, anyhow::Error>(n)
}
.await;
match res {
Ok(got) if got as usize == expected_size => return Ok(()),
Ok(got) => warn!(
attempt,
expected_size, got, "publish read-back size mismatch (#277, no digest)"
),
Err(e) => warn!(attempt, error = %e, "publish read-back: get failed (transient?)"),
}
if attempt < MAX_ATTEMPTS {
tokio::time::sleep(delay).await;
delay = (delay * 2).min(Duration::from_secs(3));
}
}
bail!(
"publish read-back: object_store key {key:?} size still wrong after {MAX_ATTEMPTS} attempts"
);
}