tur-rs 0.8.2

A relentless, high-concurrency download manager built for speed and efficiency. Tur uses dynamic work-stealing and aligned storage to saturate your bandwidth while maintaining a minuscule memory footprint. Inspired by the legends, built for the modern Rust ecosystem.
Documentation
use super::*;

impl ConnectionWorker {
    pub(super) async fn flush_pending_write(
        &self,
        write_tx: &tokio::sync::mpsc::Sender<(u64, Vec<u8>)>,
        recycle_rx: &mut tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
        pending: &mut PendingWrite,
        attempt_timing: &mut AttemptTiming,
    ) -> Result<()> {
        if pending.data.is_empty() {
            return Ok(());
        }

        let write_started = Instant::now();
        let data_to_write = std::mem::take(&mut pending.data);
        write_tx
            .send((pending.start_offset, data_to_write))
            .await
            .map_err(|_| anyhow::anyhow!("writer task died"))?;

        if let Ok(mut recycled) = recycle_rx.try_recv() {
            recycled.clear();
            pending.data = recycled;
        } else {
            pending.data = Vec::with_capacity(pending.target_bytes.max(WRITE_BUFFER_MIN_BYTES));
        }

        let write_ms = write_started.elapsed().as_millis() as u64;
        attempt_timing.write_ms = attempt_timing.write_ms.saturating_add(write_ms);
        if write_ms > 0 && write_ms <= 500 {
            let sample = write_ms as f64;
            let prev = self.shared_write_latency_ms.get();
            let updated = 0.2 * sample + 0.8 * prev;
            self.shared_write_latency_ms.set(updated);
            let max_x10 = self.metrics.max_ewma_write_latency_x10.get();
            let updated_x10 = (updated * 10.0).round() as u64;
            if updated_x10 > max_x10 {
                self.metrics.max_ewma_write_latency_x10.set(updated_x10);
            }
        }

        self.trim_pending_write(pending);
        Ok(())
    }

    pub(super) fn append_pending_write(&self, pending: &mut PendingWrite, offset: u64, data: &[u8]) {
        if pending.data.is_empty() {
            pending.start_offset = offset;
        }
        pending.data.extend_from_slice(data);
    }

    fn target_write_buffer_bytes(&self, recent_speed_bps: f64) -> usize {
        let speed_bps = recent_speed_bps.max(0.0) as u64;
        let speed_target = if speed_bps >= WRITE_BUFFER_MAX_SPEED_BPS {
            WRITE_BUFFER_MAX_BYTES
        } else if speed_bps >= WRITE_BUFFER_LARGE_SPEED_BPS {
            WRITE_BUFFER_LARGE_BYTES
        } else if speed_bps >= WRITE_BUFFER_MEDIUM_SPEED_BPS {
            WRITE_BUFFER_MEDIUM_BYTES
        } else {
            WRITE_BUFFER_MIN_BYTES
        };
        let latency_ratio = (self.shared_write_latency_ms.get() / 10.0).max(1.0);
        let latency_target = ((WRITE_BUFFER_LARGE_BYTES as f64) * latency_ratio).round() as usize;
        let cap = self.write_buffer_cap_bytes.get().max(WRITE_BUFFER_LARGE_BYTES);
        speed_target
            .max(latency_target)
            .clamp(WRITE_BUFFER_MIN_BYTES, cap)
    }

    fn record_write_buffer_target_metric(&self, target: usize) {
        let target_u64 = target as u64;
        if target_u64 > self.metrics.max_write_buffer_target_bytes.get() {
            self.metrics.max_write_buffer_target_bytes.set(target_u64);
        }
    }

    pub(super) fn update_pending_write_target(&self, pending: &mut PendingWrite, recent_speed_bps: f64) {
        let target = self.target_write_buffer_bytes(recent_speed_bps);
        self.record_write_buffer_target_metric(target);
        pending.target_bytes = target;

        if pending.data.capacity() < pending.target_bytes {
            pending
                .data
                .reserve(pending.target_bytes.saturating_sub(pending.data.capacity()));
        } else if pending.data.is_empty() {
            self.trim_pending_write(pending);
        }
    }

    fn trim_pending_write(&self, pending: &mut PendingWrite) {
        let target = pending.target_bytes.max(WRITE_BUFFER_MIN_BYTES);
        if pending.data.is_empty() && pending.data.capacity() > target.saturating_mul(2) {
            pending.data.shrink_to(target);
        }
    }

    pub(super) fn reset_pending_write_target(&self, pending: &mut PendingWrite) {
        pending.target_bytes = WRITE_BUFFER_MIN_BYTES;
        self.trim_pending_write(pending);
    }
}