1use std::collections::HashMap;
37use std::time::{Duration, Instant};
38
39use parking_lot::Mutex;
40use thiserror::Error;
41
42#[derive(Debug, Error, Eq, PartialEq)]
44pub enum HintStoreError {
45 #[error("hint store over capacity ({max_bytes} bytes)")]
51 OverCapacity {
52 max_bytes: u64,
54 },
55 #[error("hint TTL must be greater than zero")]
59 ZeroTtl,
60 #[error("hint payload is empty")]
64 EmptyPayload,
65}
66
67#[derive(Clone, Debug, Eq, PartialEq)]
69pub struct Hint {
70 pub peer_idx: u32,
73 pub payload: Vec<u8>,
75 pub deadline: Instant,
77}
78
79impl Hint {
80 #[must_use]
84 fn weight(&self) -> u64 {
85 u64::try_from(self.payload.len()).unwrap_or(u64::MAX)
86 }
87}
88
89#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
91pub struct HintStoreStats {
92 pub hint_count: usize,
94 pub bytes: u64,
96 pub max_bytes: u64,
98 pub expired_total: u64,
101 pub rejected_over_capacity_total: u64,
104}
105
106#[derive(Debug)]
113pub struct HintStore {
114 inner: Mutex<Inner>,
115}
116
117#[derive(Debug)]
118struct Inner {
119 by_peer: HashMap<u32, Vec<Hint>>,
124 bytes: u64,
125 max_bytes: u64,
126 expired_total: u64,
127 rejected_over_capacity_total: u64,
128}
129
130impl HintStore {
131 #[must_use]
137 pub fn new(max_bytes: u64) -> Self {
138 Self {
139 inner: Mutex::new(Inner {
140 by_peer: HashMap::new(),
141 bytes: 0,
142 max_bytes,
143 expired_total: 0,
144 rejected_over_capacity_total: 0,
145 }),
146 }
147 }
148
149 pub fn enqueue(
161 &self,
162 peer_idx: u32,
163 payload: Vec<u8>,
164 ttl: Duration,
165 ) -> Result<(), HintStoreError> {
166 if ttl.is_zero() {
167 return Err(HintStoreError::ZeroTtl);
168 }
169 if payload.is_empty() {
170 return Err(HintStoreError::EmptyPayload);
171 }
172 let weight = u64::try_from(payload.len()).unwrap_or(u64::MAX);
173 let mut inner = self.inner.lock();
174 if inner.max_bytes > 0 && inner.bytes.saturating_add(weight) > inner.max_bytes {
175 inner.rejected_over_capacity_total =
176 inner.rejected_over_capacity_total.saturating_add(1);
177 return Err(HintStoreError::OverCapacity {
178 max_bytes: inner.max_bytes,
179 });
180 }
181 let deadline = Instant::now() + ttl;
182 inner.by_peer.entry(peer_idx).or_default().push(Hint {
183 peer_idx,
184 payload,
185 deadline,
186 });
187 inner.bytes = inner.bytes.saturating_add(weight);
188 Ok(())
189 }
190
191 pub fn take_for(&self, peer_idx: u32) -> Vec<Hint> {
197 let now = Instant::now();
198 let mut inner = self.inner.lock();
199 let Some(queue) = inner.by_peer.remove(&peer_idx) else {
200 return Vec::new();
201 };
202 let mut out = Vec::with_capacity(queue.len());
203 for h in queue {
204 if h.deadline <= now {
205 let w = h.weight();
206 inner.bytes = inner.bytes.saturating_sub(w);
207 inner.expired_total = inner.expired_total.saturating_add(1);
208 continue;
209 }
210 inner.bytes = inner.bytes.saturating_sub(h.weight());
211 out.push(h);
212 }
213 out
214 }
215
216 pub fn expire_now(&self, now: Instant) -> usize {
220 let mut inner = self.inner.lock();
221 let mut dropped = 0usize;
222 let mut empty_keys: Vec<u32> = Vec::new();
223 for (k, queue) in &mut inner.by_peer {
224 let before = queue.len();
225 queue.retain(|h| h.deadline > now);
226 let after = queue.len();
227 let removed = before - after;
228 if removed > 0 {
229 dropped += removed;
230 if after == 0 {
236 empty_keys.push(*k);
237 }
238 }
239 }
240 let mut new_bytes: u64 = 0;
246 for queue in inner.by_peer.values() {
247 for h in queue {
248 new_bytes = new_bytes.saturating_add(h.weight());
249 }
250 }
251 inner.bytes = new_bytes;
252 inner.expired_total = inner.expired_total.saturating_add(dropped as u64);
253 for k in empty_keys {
254 inner.by_peer.remove(&k);
255 }
256 dropped
257 }
258
259 #[must_use]
261 pub fn total_len(&self) -> usize {
262 let inner = self.inner.lock();
263 inner.by_peer.values().map(Vec::len).sum()
264 }
265
266 #[must_use]
268 pub fn len_for(&self, peer_idx: u32) -> usize {
269 let inner = self.inner.lock();
270 inner.by_peer.get(&peer_idx).map_or(0, Vec::len)
271 }
272
273 #[must_use]
275 pub fn stats(&self) -> HintStoreStats {
276 let inner = self.inner.lock();
277 HintStoreStats {
278 hint_count: inner.by_peer.values().map(Vec::len).sum(),
279 bytes: inner.bytes,
280 max_bytes: inner.max_bytes,
281 expired_total: inner.expired_total,
282 rejected_over_capacity_total: inner.rejected_over_capacity_total,
283 }
284 }
285
286 #[must_use]
291 pub fn peers_with_hints(&self) -> Vec<u32> {
292 let inner = self.inner.lock();
293 inner
294 .by_peer
295 .iter()
296 .filter_map(|(k, v)| if v.is_empty() { None } else { Some(*k) })
297 .collect()
298 }
299}
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304
305 fn payload(b: u8, n: usize) -> Vec<u8> {
306 vec![b; n]
307 }
308
309 #[test]
310 fn enqueue_and_take_round_trip() {
311 let store = HintStore::new(1024);
312 store
313 .enqueue(3, payload(b'a', 4), Duration::from_secs(60))
314 .unwrap();
315 store
316 .enqueue(3, payload(b'b', 4), Duration::from_secs(60))
317 .unwrap();
318 store
319 .enqueue(7, payload(b'c', 4), Duration::from_secs(60))
320 .unwrap();
321 assert_eq!(store.total_len(), 3);
322 let drained = store.take_for(3);
323 assert_eq!(drained.len(), 2);
324 assert_eq!(drained[0].payload, payload(b'a', 4));
325 assert_eq!(drained[1].payload, payload(b'b', 4));
326 assert_eq!(store.len_for(3), 0);
327 assert_eq!(store.len_for(7), 1);
328 assert_eq!(store.total_len(), 1);
329 }
330
331 #[test]
332 fn enqueue_rejects_over_capacity() {
333 let store = HintStore::new(8);
334 store
335 .enqueue(0, payload(b'x', 6), Duration::from_secs(60))
336 .unwrap();
337 let err = store
338 .enqueue(0, payload(b'y', 4), Duration::from_secs(60))
339 .unwrap_err();
340 assert_eq!(err, HintStoreError::OverCapacity { max_bytes: 8 });
341 assert_eq!(store.stats().bytes, 6);
343 assert_eq!(store.stats().rejected_over_capacity_total, 1);
344 let drained = store.take_for(0);
346 assert_eq!(drained.len(), 1);
347 store
349 .enqueue(0, payload(b'y', 4), Duration::from_secs(60))
350 .unwrap();
351 }
352
353 #[test]
354 fn expire_now_drops_old_hints() {
355 let store = HintStore::new(64);
356 store
357 .enqueue(1, payload(b'a', 3), Duration::from_millis(1))
358 .unwrap();
359 store
360 .enqueue(1, payload(b'b', 3), Duration::from_secs(60))
361 .unwrap();
362 std::thread::sleep(Duration::from_millis(5));
364 let now = Instant::now();
365 let dropped = store.expire_now(now);
366 assert_eq!(dropped, 1);
367 assert_eq!(store.len_for(1), 1);
368 let stats = store.stats();
369 assert_eq!(stats.expired_total, 1);
370 assert_eq!(stats.bytes, 3);
371 let drained = store.take_for(1);
373 assert_eq!(drained[0].payload, payload(b'b', 3));
374 }
375
376 #[test]
377 fn take_for_skips_already_expired() {
378 let store = HintStore::new(64);
379 store
380 .enqueue(2, payload(b'a', 3), Duration::from_millis(1))
381 .unwrap();
382 store
383 .enqueue(2, payload(b'b', 3), Duration::from_secs(60))
384 .unwrap();
385 std::thread::sleep(Duration::from_millis(5));
386 let drained = store.take_for(2);
387 assert_eq!(drained.len(), 1);
388 assert_eq!(drained[0].payload, payload(b'b', 3));
389 assert_eq!(store.stats().expired_total, 1);
390 }
391
392 #[test]
393 fn enqueue_rejects_zero_ttl_and_empty_payload() {
394 let store = HintStore::new(64);
395 let err = store
396 .enqueue(0, payload(b'x', 1), Duration::from_secs(0))
397 .unwrap_err();
398 assert_eq!(err, HintStoreError::ZeroTtl);
399 let err = store
400 .enqueue(0, Vec::new(), Duration::from_secs(60))
401 .unwrap_err();
402 assert_eq!(err, HintStoreError::EmptyPayload);
403 assert_eq!(store.total_len(), 0);
404 }
405
406 #[test]
407 fn mixed_peer_queues_are_independent() {
408 let store = HintStore::new(0); store
410 .enqueue(0, payload(b'a', 1), Duration::from_secs(60))
411 .unwrap();
412 store
413 .enqueue(1, payload(b'b', 1), Duration::from_secs(60))
414 .unwrap();
415 store
416 .enqueue(2, payload(b'c', 1), Duration::from_secs(60))
417 .unwrap();
418 assert_eq!(store.total_len(), 3);
419 let mut peers = store.peers_with_hints();
420 peers.sort_unstable();
421 assert_eq!(peers, vec![0, 1, 2]);
422 let drained = store.take_for(1);
423 assert_eq!(drained.len(), 1);
424 assert_eq!(drained[0].payload, payload(b'b', 1));
425 assert_eq!(store.len_for(0), 1);
426 assert_eq!(store.len_for(1), 0);
427 assert_eq!(store.len_for(2), 1);
428 }
429
430 #[test]
431 fn empty_max_bytes_means_unbounded() {
432 let store = HintStore::new(0);
433 for _ in 0..1024 {
434 store
435 .enqueue(0, payload(b'x', 1024), Duration::from_secs(60))
436 .unwrap();
437 }
438 assert_eq!(store.total_len(), 1024);
439 }
440
441 #[test]
442 fn expire_now_no_op_when_nothing_old() {
443 let store = HintStore::new(64);
444 store
445 .enqueue(0, payload(b'x', 3), Duration::from_secs(60))
446 .unwrap();
447 let dropped = store.expire_now(Instant::now());
448 assert_eq!(dropped, 0);
449 assert_eq!(store.total_len(), 1);
450 }
451
452 #[test]
453 fn stats_track_capacity_and_bytes() {
454 let store = HintStore::new(1024);
455 store
456 .enqueue(0, payload(b'x', 100), Duration::from_secs(60))
457 .unwrap();
458 let s = store.stats();
459 assert_eq!(s.hint_count, 1);
460 assert_eq!(s.bytes, 100);
461 assert_eq!(s.max_bytes, 1024);
462 }
463}