Skip to main content

kanade_shared/
kv_cas.rs

1//! Optimistic-concurrency read-modify-write over a JetStream KV key.
2//!
3//! A blind `kv.get` → mutate → `kv.put` loses updates when two
4//! writers race: both read the same snapshot, both put, and the
5//! second put silently erases the first writer's change. JetStream
6//! KV's `update(key, value, revision)` is a compare-and-swap — it
7//! fails unless the key is still at the revision the caller read —
8//! and `create(key, value)` is the matching CAS for "key must not
9//! exist yet". [`read_modify_write`] wraps both behind a re-read
10//! retry loop so callers only supply the mutation.
11
12use anyhow::Context as _;
13use async_nats::jetstream::kv::{Operation, Store};
14use serde::Serialize;
15use serde::de::DeserializeOwned;
16
17/// Upper bound on CAS retries before giving up. Each conflict round
18/// means another writer won — global progress is guaranteed — but a
19/// single writer can keep losing under a burst (N lockstep writers
20/// need up to N rounds for the unluckiest one), so the bound is
21/// sized for "an operator script fanning out over one key", not
22/// just two racing humans. Hitting it means something is rewriting
23/// the key continuously and deserves the error.
24const MAX_CAS_ATTEMPTS: usize = 32;
25
26/// Per-round backoff cap. The backoff grows linearly with the
27/// attempt number — enough to spread lockstep writers apart (their
28/// differing broker round-trip times do the rest) without adding
29/// meaningful latency to the 1-conflict common case.
30const BACKOFF_STEP_MS: u64 = 2;
31const BACKOFF_CAP_MS: u64 = 25;
32
33/// Read `key` (missing or deleted → `T::default()`), apply `mutate`,
34/// and write the result back guarded by the revision the read
35/// observed, retrying the whole round on a CAS conflict.
36///
37/// `mutate` returning `false` means "no change needed" — the current
38/// value is returned without a write (so no-op adds/removes don't
39/// bump the key's revision or wake watchers).
40///
41/// Returns the value as written (or as read, for a no-op).
42pub async fn read_modify_write<T, F>(kv: &Store, key: &str, mut mutate: F) -> anyhow::Result<T>
43where
44    T: Serialize + DeserializeOwned + Default,
45    F: FnMut(&mut T) -> bool,
46{
47    let mut last_err = None;
48    for attempt in 0..MAX_CAS_ATTEMPTS {
49        if attempt > 0 {
50            let ms = (attempt as u64 * BACKOFF_STEP_MS).min(BACKOFF_CAP_MS);
51            tokio::time::sleep(std::time::Duration::from_millis(ms)).await;
52        }
53        // A transient read failure spends an attempt like a write
54        // conflict does — the loop is the retry policy for the
55        // whole round-trip, not just the CAS write.
56        let entry = match kv.entry(key).await {
57            Ok(e) => e,
58            Err(e) => {
59                last_err = Some(anyhow::Error::from(e).context(format!("kv entry '{key}'")));
60                continue;
61            }
62        };
63        let (mut value, revision) = match entry {
64            Some(e) if e.operation == Operation::Put => {
65                let v: T = serde_json::from_slice(&e.value)
66                    .with_context(|| format!("decode kv '{key}'"))?;
67                (v, Some(e.revision))
68            }
69            // Delete / purge marker: the value is gone but the
70            // marker's revision still guards the CAS, so a writer
71            // racing a delete loses cleanly instead of resurrecting
72            // over it unguarded.
73            Some(e) => (T::default(), Some(e.revision)),
74            None => (T::default(), None),
75        };
76        if !mutate(&mut value) {
77            return Ok(value);
78        }
79        let bytes = serde_json::to_vec(&value).with_context(|| format!("encode kv '{key}'"))?;
80        let written = match revision {
81            Some(rev) => kv
82                .update(key, bytes.into(), rev)
83                .await
84                .map(|_| ())
85                .map_err(anyhow::Error::from),
86            None => kv
87                .create(key, bytes.into())
88                .await
89                .map(|_| ())
90                .map_err(anyhow::Error::from),
91        };
92        match written {
93            Ok(()) => return Ok(value),
94            // Revision conflict (or AlreadyExists for the create
95            // arm) — somebody won the race; re-read their value and
96            // re-apply the mutation on top. Transient broker errors
97            // take the same path; they just spend an attempt.
98            Err(e) => last_err = Some(e),
99        }
100    }
101    // Not necessarily a conflict — transient broker errors ride the
102    // same retry loop — so the context stays cause-neutral and the
103    // chained source error carries the specifics.
104    Err(last_err.expect("loop ran at least once").context(format!(
105        "kv read-modify-write '{key}': failed after {MAX_CAS_ATTEMPTS} attempts"
106    )))
107}