Skip to main content

kevy_resp/
argv_pool.rs

1//! A recycling pool of owned [`Argv`]s for the cross-shard forward path.
2//!
3//! Forwarding a command to its owning shard materialises an owned `Argv`
4//! (2 mallocs) on the origin core, which the owning core then frees after
5//! dispatch — a cross-thread alloc/free pair per forwarded command. The
6//! pool breaks that cycle: the origin fills a recycled `Argv` (capacity
7//! retained → no malloc), the owning shard executes it borrowed, and the
8//! spent husk rides back to the origin with the reply batch, where it
9//! re-enters the origin's pool. Returning to the *origin* (rather than
10//! pooling at the owner) keeps each shard's pool level matched to its own
11//! conn demand by construction — recycle-at-the-owner measurably starved
12//! conn-heavy shards (accept skew) while overfilling quiet ones, leaving
13//! the malloc rate unchanged.
14
15use crate::argv::Argv;
16use crate::argv_view::ArgvView;
17
18/// Most argvs a pool retains. With husks returning to their origin the
19/// steady-state pool level equals the shard's own forwarded in-flight
20/// (conns × pipeline depth — ≈1600 at the 50-conn × P256 bench corner),
21/// so the cap is headroom, not a working limit; it only bounds memory
22/// when a conn burst comes and goes. 8192 ≈ ~1 MiB worst case per shard
23/// at typical small-argv buffer sizes.
24const MAX_POOLED: usize = 8192;
25
26/// Largest arg-bytes buffer worth retaining. A one-off `SET k <1 MB>`
27/// must not park a megabyte in the pool forever; oversized husks are
28/// dropped instead of pooled.
29const MAX_POOLED_BYTES: usize = 4096;
30
31/// A recycling pool of owned [`Argv`]s. See the module docs for the
32/// cross-shard ownership cycle it serves.
33#[derive(Default)]
34pub struct ArgvPool {
35    free: Vec<Argv>,
36}
37
38impl ArgvPool {
39    /// An empty pool.
40    pub fn new() -> Self {
41        Self::default()
42    }
43
44    /// An owned `Argv` filled with `view`'s arguments — a recycled one
45    /// (no malloc in steady state) when the pool has a husk, else fresh.
46    pub fn take_filled<A: ArgvView + ?Sized>(&mut self, view: &A) -> Argv {
47        let mut argv = self.free.pop().unwrap_or_default();
48        view.copy_into(&mut argv);
49        argv
50    }
51
52    /// Recycle a spent `Argv`. Dropped instead of pooled when the pool is
53    /// full or the argv's buffer is oversized (retention policy — see
54    /// `MAX_POOLED` / `MAX_POOLED_BYTES`).
55    pub fn put(&mut self, argv: Argv) {
56        if self.free.len() < MAX_POOLED && argv.buf_capacity() <= MAX_POOLED_BYTES {
57            self.free.push(argv);
58        }
59    }
60}
61
62#[cfg(test)]
63mod tests {
64    use super::*;
65
66    fn argv_of(args: &[&[u8]]) -> Argv {
67        let mut a = Argv::default();
68        for arg in args {
69            a.push(arg);
70        }
71        a
72    }
73
74    #[test]
75    fn take_filled_matches_to_argv() {
76        let mut pool = ArgvPool::new();
77        let src = argv_of(&[b"SET", b"k", b"v"]);
78        let got = pool.take_filled(&src);
79        assert_eq!(got, src.to_argv());
80    }
81
82    #[test]
83    fn recycled_argv_is_refilled_clean() {
84        let mut pool = ArgvPool::new();
85        pool.put(argv_of(&[b"SET", b"stale-key", b"stale-value"]));
86        let src = argv_of(&[b"GET", b"k"]);
87        let got = pool.take_filled(&src);
88        assert_eq!(got.len(), 2);
89        assert_eq!(got.get(0), Some(b"GET" as &[u8]));
90        assert_eq!(got.get(1), Some(b"k" as &[u8]));
91    }
92
93    #[test]
94    fn pool_count_is_capped() {
95        let mut pool = ArgvPool::new();
96        for _ in 0..(MAX_POOLED + 100) {
97            pool.put(argv_of(&[b"GET", b"k"]));
98        }
99        assert_eq!(pool.free.len(), MAX_POOLED);
100    }
101
102    #[test]
103    fn oversized_buffers_are_not_retained() {
104        let mut pool = ArgvPool::new();
105        let big = vec![b'x'; MAX_POOLED_BYTES + 1];
106        pool.put(argv_of(&[b"SET", b"k", &big]));
107        assert!(pool.free.is_empty());
108    }
109
110    #[test]
111    fn copy_into_reuses_capacity() {
112        let src = argv_of(&[b"SET", b"key", b"value"]);
113        let mut out = Argv::default();
114        src.copy_into(&mut out);
115        let cap = out.buf_capacity();
116        // Refill with same-size content: capacity must not have shrunk
117        // (the whole point of recycling).
118        src.copy_into(&mut out);
119        assert_eq!(out.buf_capacity(), cap);
120        assert_eq!(out, src);
121    }
122}