Skip to main content

dynomite/cluster/
hints.rs

1//! Node-local hinted-handoff store.
2//!
3//! When a write request fans out to a peer in
4//! [`crate::cluster::peer::PeerState::Down`] or to a peer whose
5//! outbound channel is closed, the dispatcher records a hint:
6//! the on-the-wire request bytes, the index of the intended
7//! peer, and an absolute expiry deadline. A background task
8//! periodically:
9//!
10//! * drains hints destined for any peer that has returned to
11//!   [`crate::cluster::peer::PeerState::Normal`] and ships them
12//!   over the same per-peer outbound channel the dispatcher
13//!   would have used;
14//! * drops hints that have aged past their `hint_ttl_seconds`
15//!   so the in-memory store stays bounded.
16//!
17//! The v1 store is RAM-only. The natural follow-up is an
18//! on-disk variant (one segment file per peer, replayed at
19//! startup); see `docs/journal/2026-05-23-hinted-handoff.md`
20//! for the deferral note.
21//!
22//! # Examples
23//!
24//! ```
25//! use std::time::{Duration, Instant};
26//! use dynomite::cluster::hints::HintStore;
27//!
28//! let store = HintStore::new(1024);
29//! store.enqueue(7, b"*3\r\n$3\r\nSET\r\n$1\r\nk\r\n$1\r\nv\r\n".to_vec(), Duration::from_secs(60))
30//!     .expect("under capacity");
31//! let drained = store.take_for(7);
32//! assert_eq!(drained.len(), 1);
33//! assert_eq!(store.expire_now(Instant::now()), 0);
34//! ```
35
36use std::collections::HashMap;
37use std::time::{Duration, Instant};
38
39use parking_lot::Mutex;
40use thiserror::Error;
41
42/// Errors produced by [`HintStore::enqueue`].
43#[derive(Debug, Error, Eq, PartialEq)]
44pub enum HintStoreError {
45    /// The store has reached `max_bytes`. The caller is
46    /// expected to fall back to its non-handoff error path
47    /// (typically, return `DynomiteNoQuorumAchieved` to the
48    /// client) and the next drainer sweep will reclaim space
49    /// when peers come back online.
50    #[error("hint store over capacity ({max_bytes} bytes)")]
51    OverCapacity {
52        /// Configured upper bound, in bytes.
53        max_bytes: u64,
54    },
55    /// The supplied TTL is zero. A zero TTL would be expired
56    /// immediately by the next sweep, so the store rejects it
57    /// up front to surface a configuration error.
58    #[error("hint TTL must be greater than zero")]
59    ZeroTtl,
60    /// The hint payload is empty. The wire-replay path requires
61    /// at least one byte; an empty payload is rejected so the
62    /// drainer never produces a no-op outbound write.
63    #[error("hint payload is empty")]
64    EmptyPayload,
65}
66
67/// One pending hint.
68#[derive(Clone, Debug, Eq, PartialEq)]
69pub struct Hint {
70    /// Index of the intended peer in
71    /// [`crate::cluster::pool::ServerPool::peers`].
72    pub peer_idx: u32,
73    /// On-the-wire request bytes, ready to forward.
74    pub payload: Vec<u8>,
75    /// Absolute deadline after which the hint is dropped.
76    pub deadline: Instant,
77}
78
79impl Hint {
80    /// Heap footprint in bytes used for the store's accounting.
81    /// Counts the payload only; the surrounding metadata
82    /// (`u32` + `Instant`) is small and constant per entry.
83    #[must_use]
84    fn weight(&self) -> u64 {
85        u64::try_from(self.payload.len()).unwrap_or(u64::MAX)
86    }
87}
88
89/// Snapshot of the store's current size.
90#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
91pub struct HintStoreStats {
92    /// Number of hints currently retained.
93    pub hint_count: usize,
94    /// Sum of payload bytes currently retained.
95    pub bytes: u64,
96    /// Configured upper bound on `bytes`.
97    pub max_bytes: u64,
98    /// Total hints dropped due to TTL expiry since the store
99    /// was created.
100    pub expired_total: u64,
101    /// Total hints rejected for over-capacity since the store
102    /// was created.
103    pub rejected_over_capacity_total: u64,
104}
105
106/// Node-local hint store.
107///
108/// The store is internally synchronised so [`std::sync::Arc`]
109/// clones share the same per-peer queues. Operations are O(1)
110/// with respect to the number of pending hints for the queried
111/// peer and O(N) for [`HintStore::expire_now`].
112#[derive(Debug)]
113pub struct HintStore {
114    inner: Mutex<Inner>,
115}
116
117#[derive(Debug)]
118struct Inner {
119    /// Per-peer FIFO queue. Insertion appends; drain pops the
120    /// whole queue at once (the dispatcher never wants to
121    /// trickle-deliver because hints are buffered against a
122    /// down peer that has already returned).
123    by_peer: HashMap<u32, Vec<Hint>>,
124    bytes: u64,
125    max_bytes: u64,
126    expired_total: u64,
127    rejected_over_capacity_total: u64,
128}
129
130impl HintStore {
131    /// Build a new store with the supplied byte cap.
132    ///
133    /// `max_bytes` of zero means "no cap"; this is intended for
134    /// tests that drive enqueue/take patterns and never want to
135    /// exercise the back-pressure branch.
136    #[must_use]
137    pub fn new(max_bytes: u64) -> Self {
138        Self {
139            inner: Mutex::new(Inner {
140                by_peer: HashMap::new(),
141                bytes: 0,
142                max_bytes,
143                expired_total: 0,
144                rejected_over_capacity_total: 0,
145            }),
146        }
147    }
148
149    /// Append a hint for `peer_idx`. The hint expires at
150    /// `Instant::now() + ttl`.
151    ///
152    /// # Errors
153    ///
154    /// * [`HintStoreError::ZeroTtl`] when `ttl` is zero.
155    /// * [`HintStoreError::EmptyPayload`] when `payload` is
156    ///   empty.
157    /// * [`HintStoreError::OverCapacity`] when accepting the
158    ///   hint would push the cumulative payload bytes over
159    ///   `max_bytes`.
160    pub fn enqueue(
161        &self,
162        peer_idx: u32,
163        payload: Vec<u8>,
164        ttl: Duration,
165    ) -> Result<(), HintStoreError> {
166        if ttl.is_zero() {
167            return Err(HintStoreError::ZeroTtl);
168        }
169        if payload.is_empty() {
170            return Err(HintStoreError::EmptyPayload);
171        }
172        let weight = u64::try_from(payload.len()).unwrap_or(u64::MAX);
173        let mut inner = self.inner.lock();
174        if inner.max_bytes > 0 && inner.bytes.saturating_add(weight) > inner.max_bytes {
175            inner.rejected_over_capacity_total =
176                inner.rejected_over_capacity_total.saturating_add(1);
177            return Err(HintStoreError::OverCapacity {
178                max_bytes: inner.max_bytes,
179            });
180        }
181        let deadline = Instant::now() + ttl;
182        inner.by_peer.entry(peer_idx).or_default().push(Hint {
183            peer_idx,
184            payload,
185            deadline,
186        });
187        inner.bytes = inner.bytes.saturating_add(weight);
188        Ok(())
189    }
190
191    /// Drain every pending hint for `peer_idx`. Hints that have
192    /// expired are dropped on the floor (and counted toward
193    /// [`HintStoreStats::expired_total`]).
194    ///
195    /// Returned hints are ordered by enqueue time, oldest first.
196    pub fn take_for(&self, peer_idx: u32) -> Vec<Hint> {
197        let now = Instant::now();
198        let mut inner = self.inner.lock();
199        let Some(queue) = inner.by_peer.remove(&peer_idx) else {
200            return Vec::new();
201        };
202        let mut out = Vec::with_capacity(queue.len());
203        for h in queue {
204            if h.deadline <= now {
205                let w = h.weight();
206                inner.bytes = inner.bytes.saturating_sub(w);
207                inner.expired_total = inner.expired_total.saturating_add(1);
208                continue;
209            }
210            inner.bytes = inner.bytes.saturating_sub(h.weight());
211            out.push(h);
212        }
213        out
214    }
215
216    /// Drop every hint whose deadline has passed at `now`.
217    /// Returns the number of hints dropped. Walks the entire
218    /// store; intended for the periodic drainer task.
219    pub fn expire_now(&self, now: Instant) -> usize {
220        let mut inner = self.inner.lock();
221        let mut dropped = 0usize;
222        let mut empty_keys: Vec<u32> = Vec::new();
223        for (k, queue) in &mut inner.by_peer {
224            let before = queue.len();
225            queue.retain(|h| h.deadline > now);
226            let after = queue.len();
227            let removed = before - after;
228            if removed > 0 {
229                dropped += removed;
230                // Recompute the bytes lost. The queue does not
231                // remember which entries it dropped, so we walk
232                // the original-vs-retained delta below using a
233                // single pass (cheaper than the alternative of
234                // collecting weights up front).
235                if after == 0 {
236                    empty_keys.push(*k);
237                }
238            }
239        }
240        // Recompute total bytes from scratch: the per-peer
241        // retained weights are now consistent with `bytes` only
242        // after we subtract the dropped weights. We trade a
243        // second pass for a clean invariant rather than tracking
244        // dropped weights inline above.
245        let mut new_bytes: u64 = 0;
246        for queue in inner.by_peer.values() {
247            for h in queue {
248                new_bytes = new_bytes.saturating_add(h.weight());
249            }
250        }
251        inner.bytes = new_bytes;
252        inner.expired_total = inner.expired_total.saturating_add(dropped as u64);
253        for k in empty_keys {
254            inner.by_peer.remove(&k);
255        }
256        dropped
257    }
258
259    /// Number of hints across every peer.
260    #[must_use]
261    pub fn total_len(&self) -> usize {
262        let inner = self.inner.lock();
263        inner.by_peer.values().map(Vec::len).sum()
264    }
265
266    /// Pending hint count for `peer_idx`. Useful for tests.
267    #[must_use]
268    pub fn len_for(&self, peer_idx: u32) -> usize {
269        let inner = self.inner.lock();
270        inner.by_peer.get(&peer_idx).map_or(0, Vec::len)
271    }
272
273    /// Snapshot the store's accounting fields.
274    #[must_use]
275    pub fn stats(&self) -> HintStoreStats {
276        let inner = self.inner.lock();
277        HintStoreStats {
278            hint_count: inner.by_peer.values().map(Vec::len).sum(),
279            bytes: inner.bytes,
280            max_bytes: inner.max_bytes,
281            expired_total: inner.expired_total,
282            rejected_over_capacity_total: inner.rejected_over_capacity_total,
283        }
284    }
285
286    /// Iterate the peer indices that currently have pending
287    /// hints. Used by the drainer to decide which peers to ship
288    /// to without holding the inner lock across the network
289    /// send.
290    #[must_use]
291    pub fn peers_with_hints(&self) -> Vec<u32> {
292        let inner = self.inner.lock();
293        inner
294            .by_peer
295            .iter()
296            .filter_map(|(k, v)| if v.is_empty() { None } else { Some(*k) })
297            .collect()
298    }
299}
300
301#[cfg(test)]
302mod tests {
303    use super::*;
304
305    fn payload(b: u8, n: usize) -> Vec<u8> {
306        vec![b; n]
307    }
308
309    #[test]
310    fn enqueue_and_take_round_trip() {
311        let store = HintStore::new(1024);
312        store
313            .enqueue(3, payload(b'a', 4), Duration::from_secs(60))
314            .unwrap();
315        store
316            .enqueue(3, payload(b'b', 4), Duration::from_secs(60))
317            .unwrap();
318        store
319            .enqueue(7, payload(b'c', 4), Duration::from_secs(60))
320            .unwrap();
321        assert_eq!(store.total_len(), 3);
322        let drained = store.take_for(3);
323        assert_eq!(drained.len(), 2);
324        assert_eq!(drained[0].payload, payload(b'a', 4));
325        assert_eq!(drained[1].payload, payload(b'b', 4));
326        assert_eq!(store.len_for(3), 0);
327        assert_eq!(store.len_for(7), 1);
328        assert_eq!(store.total_len(), 1);
329    }
330
331    #[test]
332    fn enqueue_rejects_over_capacity() {
333        let store = HintStore::new(8);
334        store
335            .enqueue(0, payload(b'x', 6), Duration::from_secs(60))
336            .unwrap();
337        let err = store
338            .enqueue(0, payload(b'y', 4), Duration::from_secs(60))
339            .unwrap_err();
340        assert_eq!(err, HintStoreError::OverCapacity { max_bytes: 8 });
341        // Bytes accounting unaffected by the rejected enqueue.
342        assert_eq!(store.stats().bytes, 6);
343        assert_eq!(store.stats().rejected_over_capacity_total, 1);
344        // Drain reclaims space.
345        let drained = store.take_for(0);
346        assert_eq!(drained.len(), 1);
347        // Now the previously-rejected payload fits.
348        store
349            .enqueue(0, payload(b'y', 4), Duration::from_secs(60))
350            .unwrap();
351    }
352
353    #[test]
354    fn expire_now_drops_old_hints() {
355        let store = HintStore::new(64);
356        store
357            .enqueue(1, payload(b'a', 3), Duration::from_millis(1))
358            .unwrap();
359        store
360            .enqueue(1, payload(b'b', 3), Duration::from_secs(60))
361            .unwrap();
362        // Sleep a moment so the first hint expires.
363        std::thread::sleep(Duration::from_millis(5));
364        let now = Instant::now();
365        let dropped = store.expire_now(now);
366        assert_eq!(dropped, 1);
367        assert_eq!(store.len_for(1), 1);
368        let stats = store.stats();
369        assert_eq!(stats.expired_total, 1);
370        assert_eq!(stats.bytes, 3);
371        // Surviving hint is the one with the long TTL.
372        let drained = store.take_for(1);
373        assert_eq!(drained[0].payload, payload(b'b', 3));
374    }
375
376    #[test]
377    fn take_for_skips_already_expired() {
378        let store = HintStore::new(64);
379        store
380            .enqueue(2, payload(b'a', 3), Duration::from_millis(1))
381            .unwrap();
382        store
383            .enqueue(2, payload(b'b', 3), Duration::from_secs(60))
384            .unwrap();
385        std::thread::sleep(Duration::from_millis(5));
386        let drained = store.take_for(2);
387        assert_eq!(drained.len(), 1);
388        assert_eq!(drained[0].payload, payload(b'b', 3));
389        assert_eq!(store.stats().expired_total, 1);
390    }
391
392    #[test]
393    fn enqueue_rejects_zero_ttl_and_empty_payload() {
394        let store = HintStore::new(64);
395        let err = store
396            .enqueue(0, payload(b'x', 1), Duration::from_secs(0))
397            .unwrap_err();
398        assert_eq!(err, HintStoreError::ZeroTtl);
399        let err = store
400            .enqueue(0, Vec::new(), Duration::from_secs(60))
401            .unwrap_err();
402        assert_eq!(err, HintStoreError::EmptyPayload);
403        assert_eq!(store.total_len(), 0);
404    }
405
406    #[test]
407    fn mixed_peer_queues_are_independent() {
408        let store = HintStore::new(0); // unbounded
409        store
410            .enqueue(0, payload(b'a', 1), Duration::from_secs(60))
411            .unwrap();
412        store
413            .enqueue(1, payload(b'b', 1), Duration::from_secs(60))
414            .unwrap();
415        store
416            .enqueue(2, payload(b'c', 1), Duration::from_secs(60))
417            .unwrap();
418        assert_eq!(store.total_len(), 3);
419        let mut peers = store.peers_with_hints();
420        peers.sort_unstable();
421        assert_eq!(peers, vec![0, 1, 2]);
422        let drained = store.take_for(1);
423        assert_eq!(drained.len(), 1);
424        assert_eq!(drained[0].payload, payload(b'b', 1));
425        assert_eq!(store.len_for(0), 1);
426        assert_eq!(store.len_for(1), 0);
427        assert_eq!(store.len_for(2), 1);
428    }
429
430    #[test]
431    fn empty_max_bytes_means_unbounded() {
432        let store = HintStore::new(0);
433        for _ in 0..1024 {
434            store
435                .enqueue(0, payload(b'x', 1024), Duration::from_secs(60))
436                .unwrap();
437        }
438        assert_eq!(store.total_len(), 1024);
439    }
440
441    #[test]
442    fn expire_now_no_op_when_nothing_old() {
443        let store = HintStore::new(64);
444        store
445            .enqueue(0, payload(b'x', 3), Duration::from_secs(60))
446            .unwrap();
447        let dropped = store.expire_now(Instant::now());
448        assert_eq!(dropped, 0);
449        assert_eq!(store.total_len(), 1);
450    }
451
452    #[test]
453    fn stats_track_capacity_and_bytes() {
454        let store = HintStore::new(1024);
455        store
456            .enqueue(0, payload(b'x', 100), Duration::from_secs(60))
457            .unwrap();
458        let s = store.stats();
459        assert_eq!(s.hint_count, 1);
460        assert_eq!(s.bytes, 100);
461        assert_eq!(s.max_bytes, 1024);
462    }
463}