1use crate::{ProgressEntry, Pusher};
2use bytes::Bytes;
3use std::collections::BTreeMap;
4
5#[derive(Debug)]
6pub struct CacheSeqPusher<P> {
7 inner: P,
8 cache: BTreeMap<u64, Bytes>,
9 cache_size: usize,
10 high_watermark: usize,
11 low_watermark: usize,
12}
13
14impl<P: Pusher> CacheSeqPusher<P> {
15 pub const fn new(inner: P, high_watermark: usize, low_watermark: usize) -> Self {
16 Self {
17 inner,
18 cache: BTreeMap::new(),
19 cache_size: 0,
20 high_watermark,
21 low_watermark,
22 }
23 }
24
25 fn evict_until(&mut self, target_size: usize) -> Result<(), P::Error> {
26 let mut expected = None;
27 while let Some(entry) = self.cache.first_entry() {
28 let start = *entry.key();
29 if self.cache_size <= target_size && Some(start) != expected {
30 break;
31 }
32 let chunk = entry.remove();
33 let chunk_len = chunk.len();
34 let next_pos = start + chunk_len as u64;
35 self.cache_size -= chunk_len;
36 if let Err((e, ret)) = self.inner.push(&(start..next_pos), chunk) {
37 if !ret.is_empty() {
38 self.cache_size += ret.len();
39 self.cache.insert(next_pos - ret.len() as u64, ret);
40 }
41 return Err(e);
42 }
43 expected = Some(next_pos);
44 }
45 Ok(())
46 }
47}
48
49impl<P: Pusher> Pusher for CacheSeqPusher<P> {
50 type Error = P::Error;
51
52 fn push(&mut self, range: &ProgressEntry, bytes: Bytes) -> Result<(), (Self::Error, Bytes)> {
53 if bytes.is_empty() {
54 return Ok(());
55 }
56
57 self.cache_size += bytes.len();
58 if let Some(old_bytes) = self.cache.insert(range.start, bytes) {
59 self.cache_size -= old_bytes.len();
60 }
61
62 if self.cache_size >= self.high_watermark
63 && let Err(e) = self.evict_until(self.low_watermark)
64 {
65 return Err((e, Bytes::new()));
66 }
67
68 Ok(())
69 }
70
71 fn flush(&mut self) -> Result<(), Self::Error> {
72 self.evict_until(0)?;
73 self.inner.flush()
74 }
75}