kanade_shared/kv_cas.rs
1//! Optimistic-concurrency read-modify-write over a JetStream KV key.
2//!
3//! A blind `kv.get` → mutate → `kv.put` loses updates when two
4//! writers race: both read the same snapshot, both put, and the
5//! second put silently erases the first writer's change. JetStream
6//! KV's `update(key, value, revision)` is a compare-and-swap — it
7//! fails unless the key is still at the revision the caller read —
8//! and `create(key, value)` is the matching CAS for "key must not
9//! exist yet". [`read_modify_write`] wraps both behind a re-read
10//! retry loop so callers only supply the mutation.
11
12use anyhow::Context as _;
13use async_nats::jetstream::kv::{Operation, Store};
14use serde::Serialize;
15use serde::de::DeserializeOwned;
16
17/// Upper bound on CAS retries before giving up. Each conflict round
18/// means another writer won — global progress is guaranteed — but a
19/// single writer can keep losing under a burst (N lockstep writers
20/// need up to N rounds for the unluckiest one), so the bound is
21/// sized for "an operator script fanning out over one key", not
22/// just two racing humans. Hitting it means something is rewriting
23/// the key continuously and deserves the error.
24const MAX_CAS_ATTEMPTS: usize = 32;
25
26/// Per-round backoff cap. The backoff grows linearly with the
27/// attempt number — enough to spread lockstep writers apart (their
28/// differing broker round-trip times do the rest) without adding
29/// meaningful latency to the 1-conflict common case.
30const BACKOFF_STEP_MS: u64 = 2;
31const BACKOFF_CAP_MS: u64 = 25;
32
33/// Read `key` (missing or deleted → `T::default()`), apply `mutate`,
34/// and write the result back guarded by the revision the read
35/// observed, retrying the whole round on a CAS conflict.
36///
37/// `mutate` returning `false` means "no change needed" — the current
38/// value is returned without a write (so no-op adds/removes don't
39/// bump the key's revision or wake watchers).
40///
41/// Returns the value as written (or as read, for a no-op).
42pub async fn read_modify_write<T, F>(kv: &Store, key: &str, mut mutate: F) -> anyhow::Result<T>
43where
44 T: Serialize + DeserializeOwned + Default,
45 F: FnMut(&mut T) -> bool,
46{
47 let mut last_err = None;
48 for attempt in 0..MAX_CAS_ATTEMPTS {
49 if attempt > 0 {
50 let ms = (attempt as u64 * BACKOFF_STEP_MS).min(BACKOFF_CAP_MS);
51 tokio::time::sleep(std::time::Duration::from_millis(ms)).await;
52 }
53 // A transient read failure spends an attempt like a write
54 // conflict does — the loop is the retry policy for the
55 // whole round-trip, not just the CAS write.
56 let entry = match kv.entry(key).await {
57 Ok(e) => e,
58 Err(e) => {
59 last_err = Some(anyhow::Error::from(e).context(format!("kv entry '{key}'")));
60 continue;
61 }
62 };
63 let (mut value, revision) = match entry {
64 Some(e) if e.operation == Operation::Put => {
65 let v: T = serde_json::from_slice(&e.value)
66 .with_context(|| format!("decode kv '{key}'"))?;
67 (v, Some(e.revision))
68 }
69 // Delete / purge marker: the value is gone but the
70 // marker's revision still guards the CAS, so a writer
71 // racing a delete loses cleanly instead of resurrecting
72 // over it unguarded.
73 Some(e) => (T::default(), Some(e.revision)),
74 None => (T::default(), None),
75 };
76 if !mutate(&mut value) {
77 return Ok(value);
78 }
79 let bytes = serde_json::to_vec(&value).with_context(|| format!("encode kv '{key}'"))?;
80 let written = match revision {
81 Some(rev) => kv
82 .update(key, bytes.into(), rev)
83 .await
84 .map(|_| ())
85 .map_err(anyhow::Error::from),
86 None => kv
87 .create(key, bytes.into())
88 .await
89 .map(|_| ())
90 .map_err(anyhow::Error::from),
91 };
92 match written {
93 Ok(()) => return Ok(value),
94 // Revision conflict (or AlreadyExists for the create
95 // arm) — somebody won the race; re-read their value and
96 // re-apply the mutation on top. Transient broker errors
97 // take the same path; they just spend an attempt.
98 Err(e) => last_err = Some(e),
99 }
100 }
101 // Not necessarily a conflict — transient broker errors ride the
102 // same retry loop — so the context stays cause-neutral and the
103 // chained source error carries the specifics.
104 Err(last_err.expect("loop ran at least once").context(format!(
105 "kv read-modify-write '{key}': failed after {MAX_CAS_ATTEMPTS} attempts"
106 )))
107}