1use std::collections::HashMap;
2use std::sync::Arc;
3
4use bytes::Bytes;
5use parking_lot::Mutex;
6
7#[derive(Clone, Debug, Hash, PartialEq, Eq)]
8pub struct BodyKey {
9 pub bucket: String,
10 pub key: String,
11 pub version: Option<String>,
12}
13
14impl BodyKey {
15 pub fn new(bucket: impl Into<String>, key: impl Into<String>, version: Option<String>) -> Self {
16 Self {
17 bucket: bucket.into(),
18 key: key.into(),
19 version,
20 }
21 }
22}
23
24const PER_ENTRY_OVERHEAD: u64 = 128;
29
30#[derive(Debug)]
31struct Node {
32 key: BodyKey,
33 bytes: Bytes,
34 charged: u64,
35 prev: Option<usize>,
36 next: Option<usize>,
37}
38
39struct Inner {
40 capacity_bytes: u64,
41 single_object_cap: u64,
42 used_bytes: u64,
43 nodes: Vec<Option<Node>>,
44 free: Vec<usize>,
45 index: HashMap<BodyKey, usize>,
46 head: Option<usize>, tail: Option<usize>, }
49
50impl Inner {
51 fn new(capacity_bytes: u64) -> Self {
52 Self {
53 capacity_bytes,
54 single_object_cap: capacity_bytes / 2,
55 used_bytes: 0,
56 nodes: Vec::new(),
57 free: Vec::new(),
58 index: HashMap::new(),
59 head: None,
60 tail: None,
61 }
62 }
63
64 fn alloc_node(&mut self, node: Node) -> usize {
65 if let Some(idx) = self.free.pop() {
66 self.nodes[idx] = Some(node);
67 idx
68 } else {
69 self.nodes.push(Some(node));
70 self.nodes.len() - 1
71 }
72 }
73
74 fn detach(&mut self, idx: usize) {
75 let (prev, next) = {
76 let n = self.nodes[idx].as_ref().unwrap();
77 (n.prev, n.next)
78 };
79 match prev {
80 Some(p) => self.nodes[p].as_mut().unwrap().next = next,
81 None => self.head = next,
82 }
83 match next {
84 Some(n) => self.nodes[n].as_mut().unwrap().prev = prev,
85 None => self.tail = prev,
86 }
87 let n = self.nodes[idx].as_mut().unwrap();
88 n.prev = None;
89 n.next = None;
90 }
91
92 fn push_front(&mut self, idx: usize) {
93 let old_head = self.head;
94 {
95 let n = self.nodes[idx].as_mut().unwrap();
96 n.prev = None;
97 n.next = old_head;
98 }
99 if let Some(h) = old_head {
100 self.nodes[h].as_mut().unwrap().prev = Some(idx);
101 }
102 self.head = Some(idx);
103 if self.tail.is_none() {
104 self.tail = Some(idx);
105 }
106 }
107
108 fn remove(&mut self, idx: usize) -> Node {
109 self.detach(idx);
110 let node = self.nodes[idx].take().unwrap();
111 self.free.push(idx);
112 self.used_bytes = self.used_bytes.saturating_sub(node.charged);
113 self.index.remove(&node.key);
114 node
115 }
116
117 fn evict_until_fits(&mut self, incoming: u64) {
118 while self.used_bytes + incoming > self.capacity_bytes {
119 let Some(tail) = self.tail else {
120 break;
121 };
122 self.remove(tail);
123 }
124 }
125
126 fn get(&mut self, key: &BodyKey) -> Option<Bytes> {
127 let idx = *self.index.get(key)?;
128 self.detach(idx);
129 self.push_front(idx);
130 Some(self.nodes[idx].as_ref().unwrap().bytes.clone())
131 }
132
133 fn insert(&mut self, key: BodyKey, bytes: Bytes) {
134 if let Some(&idx) = self.index.get(&key) {
138 self.remove(idx);
139 }
140 let size = bytes.len() as u64;
141 let charged = size.saturating_add(PER_ENTRY_OVERHEAD);
142 if size > self.single_object_cap || charged > self.capacity_bytes {
143 return;
144 }
145 self.evict_until_fits(charged);
146 self.used_bytes += charged;
147 let node = Node {
148 key: key.clone(),
149 bytes,
150 charged,
151 prev: None,
152 next: None,
153 };
154 let idx = self.alloc_node(node);
155 self.index.insert(key, idx);
156 self.push_front(idx);
157 }
158
159 fn invalidate(&mut self, key: &BodyKey) {
160 if let Some(&idx) = self.index.get(key) {
161 self.remove(idx);
162 }
163 }
164}
165
166#[derive(Clone)]
167pub struct BodyCache {
168 inner: Arc<Mutex<Inner>>,
169 capacity_bytes: u64,
170}
171
172impl BodyCache {
173 pub fn new(capacity_bytes: u64) -> Self {
174 Self {
175 inner: Arc::new(Mutex::new(Inner::new(capacity_bytes))),
176 capacity_bytes,
177 }
178 }
179
180 pub fn capacity_bytes(&self) -> u64 {
181 self.capacity_bytes
182 }
183
184 pub fn single_object_cap(&self) -> u64 {
185 self.capacity_bytes / 2
186 }
187
188 pub fn get(&self, key: &BodyKey) -> Option<Bytes> {
189 self.inner.lock().get(key)
190 }
191
192 pub fn insert(&self, key: BodyKey, bytes: Bytes) {
193 self.inner.lock().insert(key, bytes);
194 }
195
196 pub fn invalidate(&self, key: &BodyKey) {
197 self.inner.lock().invalidate(key);
198 }
199
200 #[cfg(test)]
201 fn used_bytes(&self) -> u64 {
202 self.inner.lock().used_bytes
203 }
204
205 #[cfg(test)]
206 fn len(&self) -> usize {
207 self.inner.lock().index.len()
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214
215 fn k(name: &str) -> BodyKey {
216 BodyKey::new("b", name, None)
217 }
218
219 fn mk(n: usize) -> Bytes {
220 Bytes::from(vec![0u8; n])
221 }
222
223 #[test]
224 fn insert_and_get() {
225 let c = BodyCache::new(1024);
226 c.insert(k("a"), mk(100));
227 assert_eq!(c.get(&k("a")).unwrap().len(), 100);
228 assert_eq!(c.used_bytes(), 100 + PER_ENTRY_OVERHEAD);
229 }
230
231 #[test]
232 fn byte_accounting_on_overwrite() {
233 let c = BodyCache::new(1024);
234 c.insert(k("a"), mk(100));
235 c.insert(k("a"), mk(50));
236 assert_eq!(c.used_bytes(), 50 + PER_ENTRY_OVERHEAD);
237 assert_eq!(c.len(), 1);
238 }
239
240 #[test]
241 fn lru_eviction_on_capacity_pressure() {
242 let c = BodyCache::new(3 * (100 + PER_ENTRY_OVERHEAD));
244 c.insert(k("a"), mk(100));
245 c.insert(k("b"), mk(100));
246 c.insert(k("c"), mk(100));
247 let _ = c.get(&k("a"));
249 c.insert(k("d"), mk(100));
250 assert!(c.get(&k("b")).is_none());
251 assert!(c.get(&k("a")).is_some());
252 assert!(c.get(&k("c")).is_some());
253 assert!(c.get(&k("d")).is_some());
254 }
255
256 #[test]
257 fn empty_bodies_still_evict_under_entry_overhead() {
258 let c = BodyCache::new(1024 * 1024);
262 for i in 0..10_000 {
263 c.insert(k(&format!("empty-{i}")), Bytes::new());
264 }
265 let max_entries = (1024 * 1024 / PER_ENTRY_OVERHEAD) as usize;
266 assert!(
267 c.len() <= max_entries,
268 "cache must evict empty bodies: len={} max={}",
269 c.len(),
270 max_entries
271 );
272 assert!(c.used_bytes() <= 1024 * 1024);
273 }
274
275 #[test]
276 fn single_object_cap_bypass_leaves_cache_untouched() {
277 let cap = 64 * 1024 * 1024;
279 let c = BodyCache::new(cap);
280 c.insert(k("a"), mk(1024));
281 let before_used = c.used_bytes();
282 let before_len = c.len();
283 c.insert(k("big"), mk(40 * 1024 * 1024));
284 assert_eq!(c.used_bytes(), before_used);
285 assert_eq!(c.len(), before_len);
286 assert!(c.get(&k("big")).is_none());
287 assert!(c.get(&k("a")).is_some());
288 }
289
290 #[test]
291 fn get_promotes_to_mru() {
292 let c = BodyCache::new(3 * (100 + PER_ENTRY_OVERHEAD));
293 c.insert(k("a"), mk(100));
294 c.insert(k("b"), mk(100));
295 c.insert(k("c"), mk(100));
296 let _ = c.get(&k("a"));
297 c.insert(k("d"), mk(100));
298 assert!(c.get(&k("a")).is_some());
300 assert!(c.get(&k("b")).is_none());
301 }
302
303 #[test]
304 fn oversized_overwrite_invalidates_previous_entry() {
305 let cap = 64 * 1024 * 1024;
307 let c = BodyCache::new(cap);
308 c.insert(k("a"), mk(1024));
309 assert!(c.get(&k("a")).is_some());
310 c.insert(k("a"), mk(40 * 1024 * 1024));
313 assert!(c.get(&k("a")).is_none());
314 assert_eq!(c.used_bytes(), 0);
315 assert_eq!(c.len(), 0);
316 }
317
318 #[test]
319 fn charged_size_cannot_exceed_total_capacity() {
320 let c = BodyCache::new(200);
324 c.insert(k("a"), mk(100));
325 assert!(c.get(&k("a")).is_none());
326 assert_eq!(c.used_bytes(), 0);
327 assert_eq!(c.len(), 0);
328
329 let c = BodyCache::new(600);
332 c.insert(k("b"), mk(300));
333 assert!(c.get(&k("b")).is_some());
334 assert_eq!(c.used_bytes(), 300 + PER_ENTRY_OVERHEAD);
335
336 let c = BodyCache::new(600);
339 c.insert(k("c"), mk(500));
340 assert!(c.get(&k("c")).is_none());
341 }
342
343 #[test]
344 fn invalidate_removes_entry() {
345 let c = BodyCache::new(1024);
346 c.insert(k("a"), mk(100));
347 c.invalidate(&k("a"));
348 assert!(c.get(&k("a")).is_none());
349 assert_eq!(c.used_bytes(), 0);
350 }
351}