fast-pull 4.0.2

Pull everything fast
Documentation
use crate::{ProgressEntry, Pusher};
use bytes::Bytes;
use std::collections::BTreeMap;

#[derive(Debug)]
pub struct CacheSeqPusher<P> {
    inner: P,
    cache: BTreeMap<u64, Bytes>,
    cache_size: usize,
    high_watermark: usize,
    low_watermark: usize,
}

impl<P: Pusher> CacheSeqPusher<P> {
    pub const fn new(inner: P, high_watermark: usize, low_watermark: usize) -> Self {
        Self {
            inner,
            cache: BTreeMap::new(),
            cache_size: 0,
            high_watermark,
            low_watermark,
        }
    }

    fn evict_until(&mut self, target_size: usize) -> Result<(), P::Error> {
        let mut expected = None;
        while let Some(entry) = self.cache.first_entry() {
            let start = *entry.key();
            if self.cache_size <= target_size && Some(start) != expected {
                break;
            }
            let chunk = entry.remove();
            let chunk_len = chunk.len();
            let next_pos = start + chunk_len as u64;
            self.cache_size -= chunk_len;
            if let Err((e, ret)) = self.inner.push(&(start..next_pos), chunk) {
                if !ret.is_empty() {
                    self.cache_size += ret.len();
                    self.cache.insert(next_pos - ret.len() as u64, ret);
                }
                return Err(e);
            }
            expected = Some(next_pos);
        }
        Ok(())
    }
}

impl<P: Pusher> Pusher for CacheSeqPusher<P> {
    type Error = P::Error;

    fn push(&mut self, range: &ProgressEntry, bytes: Bytes) -> Result<(), (Self::Error, Bytes)> {
        if bytes.is_empty() {
            return Ok(());
        }

        self.cache_size += bytes.len();
        if let Some(old_bytes) = self.cache.insert(range.start, bytes) {
            self.cache_size -= old_bytes.len();
        }

        if self.cache_size >= self.high_watermark
            && let Err(e) = self.evict_until(self.low_watermark)
        {
            return Err((e, Bytes::new()));
        }

        Ok(())
    }

    fn flush(&mut self) -> Result<(), Self::Error> {
        self.evict_until(0)?;
        self.inner.flush()
    }
}