kevy_resp/argv_pool.rs
1//! A recycling pool of owned [`Argv`]s for the cross-shard forward path.
2//!
3//! Forwarding a command to its owning shard materialises an owned `Argv`
4//! (2 mallocs) on the origin core, which the owning core then frees after
5//! dispatch — a cross-thread alloc/free pair per forwarded command. The
6//! pool breaks that cycle: the origin fills a recycled `Argv` (capacity
7//! retained → no malloc), the owning shard executes it borrowed, and the
8//! spent husk rides back to the origin with the reply batch, where it
9//! re-enters the origin's pool. Returning to the *origin* (rather than
10//! pooling at the owner) keeps each shard's pool level matched to its own
11//! conn demand by construction — recycle-at-the-owner measurably starved
12//! conn-heavy shards (accept skew) while overfilling quiet ones, leaving
13//! the malloc rate unchanged.
14
15use crate::argv::Argv;
16use crate::argv_view::ArgvView;
17
18/// Most argvs a pool retains. With husks returning to their origin the
19/// steady-state pool level equals the shard's own forwarded in-flight
20/// (conns × pipeline depth — ≈1600 at the 50-conn × P256 bench corner),
21/// so the cap is headroom, not a working limit; it only bounds memory
22/// when a conn burst comes and goes. 8192 ≈ ~1 MiB worst case per shard
23/// at typical small-argv buffer sizes.
24const MAX_POOLED: usize = 8192;
25
26/// Largest arg-bytes buffer worth retaining. A one-off `SET k <1 MB>`
27/// must not park a megabyte in the pool forever; oversized husks are
28/// dropped instead of pooled.
29const MAX_POOLED_BYTES: usize = 4096;
30
31/// A recycling pool of owned [`Argv`]s. See the module docs for the
32/// cross-shard ownership cycle it serves.
33#[derive(Default)]
34pub struct ArgvPool {
35 free: Vec<Argv>,
36}
37
38impl ArgvPool {
39 /// An empty pool.
40 pub fn new() -> Self {
41 Self::default()
42 }
43
44 /// An owned `Argv` filled with `view`'s arguments — a recycled one
45 /// (no malloc in steady state) when the pool has a husk, else fresh.
46 pub fn take_filled<A: ArgvView + ?Sized>(&mut self, view: &A) -> Argv {
47 let mut argv = self.free.pop().unwrap_or_default();
48 view.copy_into(&mut argv);
49 argv
50 }
51
52 /// Recycle a spent `Argv`. Dropped instead of pooled when the pool is
53 /// full or the argv's buffer is oversized (retention policy — see
54 /// `MAX_POOLED` / `MAX_POOLED_BYTES`).
55 pub fn put(&mut self, argv: Argv) {
56 if self.free.len() < MAX_POOLED && argv.buf_capacity() <= MAX_POOLED_BYTES {
57 self.free.push(argv);
58 }
59 }
60}
61
62#[cfg(test)]
63mod tests {
64 use super::*;
65
66 fn argv_of(args: &[&[u8]]) -> Argv {
67 let mut a = Argv::default();
68 for arg in args {
69 a.push(arg);
70 }
71 a
72 }
73
74 #[test]
75 fn take_filled_matches_to_argv() {
76 let mut pool = ArgvPool::new();
77 let src = argv_of(&[b"SET", b"k", b"v"]);
78 let got = pool.take_filled(&src);
79 assert_eq!(got, src.to_argv());
80 }
81
82 #[test]
83 fn recycled_argv_is_refilled_clean() {
84 let mut pool = ArgvPool::new();
85 pool.put(argv_of(&[b"SET", b"stale-key", b"stale-value"]));
86 let src = argv_of(&[b"GET", b"k"]);
87 let got = pool.take_filled(&src);
88 assert_eq!(got.len(), 2);
89 assert_eq!(got.get(0), Some(b"GET" as &[u8]));
90 assert_eq!(got.get(1), Some(b"k" as &[u8]));
91 }
92
93 #[test]
94 fn pool_count_is_capped() {
95 let mut pool = ArgvPool::new();
96 for _ in 0..(MAX_POOLED + 100) {
97 pool.put(argv_of(&[b"GET", b"k"]));
98 }
99 assert_eq!(pool.free.len(), MAX_POOLED);
100 }
101
102 #[test]
103 fn oversized_buffers_are_not_retained() {
104 let mut pool = ArgvPool::new();
105 let big = vec![b'x'; MAX_POOLED_BYTES + 1];
106 pool.put(argv_of(&[b"SET", b"k", &big]));
107 assert!(pool.free.is_empty());
108 }
109
110 #[test]
111 fn copy_into_reuses_capacity() {
112 let src = argv_of(&[b"SET", b"key", b"value"]);
113 let mut out = Argv::default();
114 src.copy_into(&mut out);
115 let cap = out.buf_capacity();
116 // Refill with same-size content: capacity must not have shrunk
117 // (the whole point of recycling).
118 src.copy_into(&mut out);
119 assert_eq!(out.buf_capacity(), cap);
120 assert_eq!(out, src);
121 }
122}