Skip to main content

fast_pull/cache/
seq.rs

1use crate::{ProgressEntry, Pusher};
2use bytes::Bytes;
3use std::collections::BTreeMap;
4
5#[derive(Debug)]
6pub struct CacheSeqPusher<P> {
7    inner: P,
8    cache: BTreeMap<u64, Bytes>,
9    cache_size: usize,
10    high_watermark: usize,
11    low_watermark: usize,
12}
13
14impl<P: Pusher> CacheSeqPusher<P> {
15    pub const fn new(inner: P, high_watermark: usize, low_watermark: usize) -> Self {
16        Self {
17            inner,
18            cache: BTreeMap::new(),
19            cache_size: 0,
20            high_watermark,
21            low_watermark,
22        }
23    }
24
25    fn evict_until(&mut self, target_size: usize) -> Result<(), P::Error> {
26        let mut expected = None;
27        while let Some(entry) = self.cache.first_entry() {
28            let start = *entry.key();
29            if self.cache_size <= target_size && Some(start) != expected {
30                break;
31            }
32            let chunk = entry.remove();
33            let chunk_len = chunk.len();
34            let next_pos = start + chunk_len as u64;
35            self.cache_size -= chunk_len;
36            if let Err((e, ret)) = self.inner.push(&(start..next_pos), chunk) {
37                if !ret.is_empty() {
38                    self.cache_size += ret.len();
39                    self.cache.insert(next_pos - ret.len() as u64, ret);
40                }
41                return Err(e);
42            }
43            expected = Some(next_pos);
44        }
45        Ok(())
46    }
47}
48
49impl<P: Pusher> Pusher for CacheSeqPusher<P> {
50    type Error = P::Error;
51
52    fn push(&mut self, range: &ProgressEntry, bytes: Bytes) -> Result<(), (Self::Error, Bytes)> {
53        if bytes.is_empty() {
54            return Ok(());
55        }
56
57        self.cache_size += bytes.len();
58        if let Some(old_bytes) = self.cache.insert(range.start, bytes) {
59            self.cache_size -= old_bytes.len();
60        }
61
62        if self.cache_size >= self.high_watermark
63            && let Err(e) = self.evict_until(self.low_watermark)
64        {
65            return Err((e, Bytes::new()));
66        }
67
68        Ok(())
69    }
70
71    fn flush(&mut self) -> Result<(), Self::Error> {
72        self.evict_until(0)?;
73        self.inner.flush()
74    }
75}