fast_pull/cache/
direct.rs1use crate::{ProgressEntry, Pusher};
2use bytes::Bytes;
3use std::collections::BTreeMap;
4
5#[derive(Debug)]
7pub struct CacheDirectPusher<P> {
8 inner: P,
9 cache: BTreeMap<u64, Bytes>,
10 cache_size: usize,
11 high_watermark: usize,
12 low_watermark: usize,
13}
14
15impl<P: Pusher> CacheDirectPusher<P> {
16 pub const fn new(inner: P, high_watermark: usize, low_watermark: usize) -> Self {
17 Self {
18 inner,
19 cache: BTreeMap::new(),
20 cache_size: 0,
21 high_watermark,
22 low_watermark,
23 }
24 }
25
26 fn evict_until(&mut self, target_size: usize) -> Result<(), P::Error> {
27 if self.cache_size <= target_size {
28 return Ok(());
29 }
30
31 let mut runs: Vec<(u64, usize)> = Vec::with_capacity(self.cache.len());
32 let mut curr_start = None;
33 let mut curr_len = 0;
34 let mut expected_next = 0;
35
36 for (&start, bytes) in &self.cache {
37 let len = bytes.len();
38 if let Some(c_start) = curr_start {
39 if start == expected_next {
40 curr_len += len;
41 expected_next += len as u64;
42 } else {
43 runs.push((c_start, curr_len));
44 curr_start = Some(start);
45 curr_len = len;
46 expected_next = start + len as u64;
47 }
48 } else {
49 curr_start = Some(start);
50 curr_len = len;
51 expected_next = start + len as u64;
52 }
53 }
54 if let Some(c_start) = curr_start {
55 runs.push((c_start, curr_len));
56 }
57 runs.sort_unstable_by_key(|&(_, len)| std::cmp::Reverse(len));
58
59 for (mut start, mut total_len) in runs {
60 while total_len > 0 {
61 let chunk = self.cache.remove(&start).unwrap();
62 let len = chunk.len();
63 self.cache_size -= len;
64 total_len -= len;
65 let range = start..start + len as u64;
66 if let Err((e, ret_bytes)) = self.inner.push(&range, chunk) {
67 if !ret_bytes.is_empty() {
68 self.cache_size += ret_bytes.len();
69 let retry_start = start + (len - ret_bytes.len()) as u64;
70 self.cache.insert(retry_start, ret_bytes);
71 }
72 return Err(e);
73 }
74 start += len as u64;
75 }
76 if self.cache_size <= target_size {
77 break;
78 }
79 }
80 Ok(())
81 }
82}
83
84impl<P: Pusher> Pusher for CacheDirectPusher<P> {
85 type Error = P::Error;
86
87 fn push(&mut self, range: &ProgressEntry, bytes: Bytes) -> Result<(), (Self::Error, Bytes)> {
88 if bytes.is_empty() {
89 return Ok(());
90 }
91
92 self.cache_size += bytes.len();
93 if let Some(old_bytes) = self.cache.insert(range.start, bytes) {
94 self.cache_size -= old_bytes.len();
95 }
96
97 if self.cache_size >= self.high_watermark
98 && let Err(e) = self.evict_until(self.low_watermark)
99 {
100 return Err((e, Bytes::new()));
101 }
102
103 Ok(())
104 }
105
106 fn flush(&mut self) -> Result<(), Self::Error> {
107 self.evict_until(0)?;
108 self.inner.flush()
109 }
110}