use anyhow::Context as _;
use async_nats::jetstream::kv::{Operation, Store};
use serde::Serialize;
use serde::de::DeserializeOwned;
const MAX_CAS_ATTEMPTS: usize = 32;
const BACKOFF_STEP_MS: u64 = 2;
const BACKOFF_CAP_MS: u64 = 25;
pub async fn read_modify_write<T, F>(kv: &Store, key: &str, mut mutate: F) -> anyhow::Result<T>
where
T: Serialize + DeserializeOwned + Default,
F: FnMut(&mut T) -> bool,
{
let mut last_err = None;
for attempt in 0..MAX_CAS_ATTEMPTS {
if attempt > 0 {
let ms = (attempt as u64 * BACKOFF_STEP_MS).min(BACKOFF_CAP_MS);
tokio::time::sleep(std::time::Duration::from_millis(ms)).await;
}
let entry = match kv.entry(key).await {
Ok(e) => e,
Err(e) => {
last_err = Some(anyhow::Error::from(e).context(format!("kv entry '{key}'")));
continue;
}
};
let (mut value, revision) = match entry {
Some(e) if e.operation == Operation::Put => {
let v: T = serde_json::from_slice(&e.value)
.with_context(|| format!("decode kv '{key}'"))?;
(v, Some(e.revision))
}
Some(e) => (T::default(), Some(e.revision)),
None => (T::default(), None),
};
if !mutate(&mut value) {
return Ok(value);
}
let bytes = serde_json::to_vec(&value).with_context(|| format!("encode kv '{key}'"))?;
let written = match revision {
Some(rev) => kv
.update(key, bytes.into(), rev)
.await
.map(|_| ())
.map_err(anyhow::Error::from),
None => kv
.create(key, bytes.into())
.await
.map(|_| ())
.map_err(anyhow::Error::from),
};
match written {
Ok(()) => return Ok(value),
Err(e) => last_err = Some(e),
}
}
Err(last_err.expect("loop ran at least once").context(format!(
"kv read-modify-write '{key}': failed after {MAX_CAS_ATTEMPTS} attempts"
)))
}