Skip to main content

fast_pull/cache/
direct.rs

1use crate::{ProgressEntry, Pusher};
2use bytes::Bytes;
3use std::collections::BTreeMap;
4
5/// 优先选择大块调用 push 但是不含合并过程
6#[derive(Debug)]
7pub struct CacheDirectPusher<P> {
8    inner: P,
9    cache: BTreeMap<u64, Bytes>,
10    cache_size: usize,
11    high_watermark: usize,
12    low_watermark: usize,
13}
14
15impl<P: Pusher> CacheDirectPusher<P> {
16    pub const fn new(inner: P, high_watermark: usize, low_watermark: usize) -> Self {
17        Self {
18            inner,
19            cache: BTreeMap::new(),
20            cache_size: 0,
21            high_watermark,
22            low_watermark,
23        }
24    }
25
26    fn evict_until(&mut self, target_size: usize) -> Result<(), P::Error> {
27        if self.cache_size <= target_size {
28            return Ok(());
29        }
30
31        let mut runs: Vec<(u64, usize)> = Vec::with_capacity(self.cache.len());
32        let mut curr_start = None;
33        let mut curr_len = 0;
34        let mut expected_next = 0;
35
36        for (&start, bytes) in &self.cache {
37            let len = bytes.len();
38            if let Some(c_start) = curr_start {
39                if start == expected_next {
40                    curr_len += len;
41                    expected_next += len as u64;
42                } else {
43                    runs.push((c_start, curr_len));
44                    curr_start = Some(start);
45                    curr_len = len;
46                    expected_next = start + len as u64;
47                }
48            } else {
49                curr_start = Some(start);
50                curr_len = len;
51                expected_next = start + len as u64;
52            }
53        }
54        if let Some(c_start) = curr_start {
55            runs.push((c_start, curr_len));
56        }
57        runs.sort_unstable_by_key(|&(_, len)| std::cmp::Reverse(len));
58
59        for (mut start, mut total_len) in runs {
60            while total_len > 0 {
61                let chunk = self.cache.remove(&start).unwrap();
62                let len = chunk.len();
63                self.cache_size -= len;
64                total_len -= len;
65                let range = start..start + len as u64;
66                if let Err((e, ret_bytes)) = self.inner.push(&range, chunk) {
67                    if !ret_bytes.is_empty() {
68                        self.cache_size += ret_bytes.len();
69                        let retry_start = start + (len - ret_bytes.len()) as u64;
70                        self.cache.insert(retry_start, ret_bytes);
71                    }
72                    return Err(e);
73                }
74                start += len as u64;
75            }
76            if self.cache_size <= target_size {
77                break;
78            }
79        }
80        Ok(())
81    }
82}
83
84impl<P: Pusher> Pusher for CacheDirectPusher<P> {
85    type Error = P::Error;
86
87    fn push(&mut self, range: &ProgressEntry, bytes: Bytes) -> Result<(), (Self::Error, Bytes)> {
88        if bytes.is_empty() {
89            return Ok(());
90        }
91
92        self.cache_size += bytes.len();
93        if let Some(old_bytes) = self.cache.insert(range.start, bytes) {
94            self.cache_size -= old_bytes.len();
95        }
96
97        if self.cache_size >= self.high_watermark
98            && let Err(e) = self.evict_until(self.low_watermark)
99        {
100            return Err((e, Bytes::new()));
101        }
102
103        Ok(())
104    }
105
106    fn flush(&mut self) -> Result<(), Self::Error> {
107        self.evict_until(0)?;
108        self.inner.flush()
109    }
110}