duma 0.2.0

A minimal file downloader
Documentation
use std::cell::RefCell;
use std::fmt;
use std::io::Read;
use std::sync::mpsc;
use std::time::Duration;

use failure::{format_err, Fallible};
use reqwest::blocking::{Client, Request};
use reqwest::header::{self, HeaderMap, HeaderValue};
use url::Url;

use threadpool::ThreadPool;

use ftp::FtpStream;

#[derive(Debug, Clone)]
pub struct Config {
    pub user_agent: String,
    pub resume: bool,
    pub headers: HeaderMap,
    pub file: String,
    pub timeout: u64,
    pub concurrent: bool,
    pub max_retries: i32,
    pub num_workers: usize,
    pub bytes_on_disk: Option<u64>,
    pub chunk_offsets: Option<Vec<(u64, u64)>>,
    pub chunk_size: u64,
}

#[allow(unused_variables)]
pub trait EventsHandler {
    fn on_resume_download(&mut self, bytes_on_disk: u64) {}

    fn on_headers(&mut self, headers: HeaderMap) {}

    fn on_content(&mut self, content: &[u8]) -> Fallible<()> {
        Ok(())
    }

    fn on_concurrent_content(&mut self, content: (u64, u64, &[u8])) -> Fallible<()> {
        Ok(())
    }

    fn on_content_length(&mut self, ct_len: u64) {}

    fn on_ftp_content_length(&mut self, ct_len: Option<u64>) {}

    fn on_success_status(&self) {}

    fn on_failure_status(&self, status_code: i32) {}

    fn on_finish(&mut self) {}

    fn on_max_retries(&mut self) {}

    fn on_server_supports_resume(&mut self) {}
}

pub struct FtpDownload {
    url: Url,
    hooks: Vec<RefCell<Box<dyn EventsHandler>>>,
}

impl FtpDownload {
    pub fn new(url: Url) -> Self {
        Self {
            url,
            hooks: Vec::new(),
        }
    }

    pub fn download(&mut self) -> Fallible<()> {
        let ftp_server = format!(
            "{}:{}",
            self.url
                .host_str()
                .ok_or_else(|| format_err!("failed to parse hostname from url: {}", self.url))?,
            self.url
                .port_or_known_default()
                .ok_or_else(|| format_err!("failed to parse port from url: {}", self.url))?,
        );
        let username = if self.url.username().is_empty() {
            "anonymous"
        } else {
            self.url.username()
        };
        let password = self.url.password().unwrap_or("anonymous");

        let mut path_segments: Vec<&str> = self
            .url
            .path_segments()
            .ok_or_else(|| format_err!("failed to get url path segments: {}", self.url))?
            .collect();
        let ftp_fname = path_segments
            .pop()
            .ok_or_else(|| format_err!("got empty path segments from url: {}", self.url))?;

        let mut conn = FtpStream::connect(ftp_server)?;
        conn.login(username, password)?;
        for path in &path_segments {
            conn.cwd(path)?;
        }
        let ct_len = conn.size(ftp_fname)?;
        let mut reader = conn.get(ftp_fname)?;

        for hook in &self.hooks {
            let ct_len = ct_len.map(|x| x as u64);
            hook.borrow_mut().on_ftp_content_length(ct_len);
        }

        loop {
            let mut buffer = vec![0; 2048usize];
            let bcount = reader.read(&mut buffer[..])?;
            buffer.truncate(bcount);
            if !buffer.is_empty() {
                self.send_content(buffer.as_slice())?;
            } else {
                break;
            }
        }

        for hook in &self.hooks {
            hook.borrow_mut().on_finish();
        }

        Ok(())
    }

    fn send_content(&self, contents: &[u8]) -> Fallible<()> {
        for hk in &self.hooks {
            hk.borrow_mut().on_content(contents)?;
        }
        Ok(())
    }
    pub fn events_hook<E: EventsHandler + 'static>(&mut self, hk: E) -> &mut FtpDownload {
        self.hooks.push(RefCell::new(Box::new(hk)));
        self
    }
}

pub struct HttpDownload {
    url: Url,
    hooks: Vec<RefCell<Box<dyn EventsHandler>>>,
    conf: Config,
    retries: i32,
    client: Client,
}

impl fmt::Debug for HttpDownload {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "HttpDownload url: {}", self.url)
    }
}

impl HttpDownload {
    pub fn new(url: Url, conf: Config) -> HttpDownload {
        HttpDownload {
            url,
            hooks: Vec::new(),
            conf,
            retries: 0,
            client: Client::new(),
        }
    }

    pub fn download(&mut self) -> Fallible<()> {
        let resp = self
            .client
            .get(self.url.as_ref())
            .timeout(Duration::from_secs(self.conf.timeout))
            .headers(self.conf.headers.clone())
            .header(
                header::USER_AGENT,
                HeaderValue::from_str(&self.conf.user_agent)?,
            )
            .send()?;
        let headers = resp.headers();

        let server_supports_bytes = match headers.get(header::ACCEPT_RANGES) {
            Some(val) => val == "bytes",
            None => false,
        };

        if server_supports_bytes && self.conf.headers.contains_key(header::RANGE) {
            if self.conf.concurrent {
                self.conf.headers.remove(header::RANGE);
            }
            for hook in &self.hooks {
                hook.borrow_mut().on_server_supports_resume();
            }
        }

        let req = self
            .client
            .get(self.url.as_ref())
            .timeout(Duration::from_secs(self.conf.timeout))
            .headers(self.conf.headers.clone())
            .build()?;

        for hk in &self.hooks {
            hk.borrow_mut().on_headers(headers.clone());
        }
        if server_supports_bytes
            && self.conf.concurrent
            && headers.contains_key(header::CONTENT_LENGTH)
        {
            self.concurrent_download(req, headers.get(header::CONTENT_LENGTH).unwrap())?;
        } else {
            self.singlethread_download(req)?;
        }

        for hook in &self.hooks {
            hook.borrow_mut().on_finish();
        }

        Ok(())
    }

    pub fn events_hook<E: EventsHandler + 'static>(&mut self, hk: E) -> &mut HttpDownload {
        self.hooks.push(RefCell::new(Box::new(hk)));
        self
    }

    fn singlethread_download(&mut self, req: Request) -> Fallible<()> {
        let mut resp = self.client.execute(req)?;
        let ct_len = if let Some(val) = resp.headers().get(header::CONTENT_LENGTH) {
            Some(val.to_str()?.parse::<usize>()?)
        } else {
            None
        };
        let mut cnt = 0;
        loop {
            let mut buffer = vec![0; self.conf.chunk_size as usize];
            let bcount = resp.read(&mut buffer[..])?;
            cnt += bcount;
            buffer.truncate(bcount);
            if !buffer.is_empty() {
                self.send_content(buffer.as_slice())?;
            } else {
                break;
            }
            if Some(cnt) == ct_len {
                break;
            }
        }
        Ok(())
    }

    pub fn concurrent_download(&mut self, req: Request, ct_val: &HeaderValue) -> Fallible<()> {
        let (data_tx, data_rx) = mpsc::channel();
        let (errors_tx, errors_rx) = mpsc::channel();
        let ct_len = ct_val.to_str()?.parse::<u64>()?;
        let chunk_offsets = self
            .conf
            .chunk_offsets
            .clone()
            .unwrap_or_else(|| self.get_chunk_offsets(ct_len, self.conf.chunk_size));
        let worker_pool = ThreadPool::new(self.conf.num_workers);
        for offsets in chunk_offsets {
            let data_tx = data_tx.clone();
            let errors_tx = errors_tx.clone();
            let req = req.try_clone().unwrap();
            worker_pool.execute(move || download_chunk(req, offsets, data_tx.clone(), errors_tx))
        }

        let mut count = self.conf.bytes_on_disk.unwrap_or(0);
        loop {
            if count == ct_len {
                break;
            }
            let (byte_count, offset, buf) = data_rx.recv()?;
            count += byte_count;
            for hk in &self.hooks {
                hk.borrow_mut()
                    .on_concurrent_content((byte_count, offset, &buf))?;
            }
            match errors_rx.recv_timeout(Duration::from_micros(1)) {
                Err(_) => {}
                Ok(offsets) => {
                    if self.retries > self.conf.max_retries {
                        for hk in &self.hooks {
                            hk.borrow_mut().on_max_retries();
                        }
                    }
                    self.retries += 1;
                    let data_tx = data_tx.clone();
                    let errors_tx = errors_tx.clone();
                    let req = req.try_clone().unwrap();
                    worker_pool.execute(move || download_chunk(req, offsets, data_tx, errors_tx))
                }
            }
        }
        Ok(())
    }

    fn get_chunk_offsets(&self, ct_len: u64, chunk_size: u64) -> Vec<(u64, u64)> {
        let no_of_chunks = ct_len / chunk_size;
        let mut sizes = Vec::new();

        for chunk in 0..no_of_chunks {
            let bound = if chunk == no_of_chunks - 1 {
                ct_len
            } else {
                ((chunk + 1) * chunk_size) - 1
            };
            sizes.push((chunk * chunk_size, bound));
        }
        if sizes.is_empty() {
            sizes.push((0, ct_len));
        }

        sizes
    }

    fn send_content(&mut self, contents: &[u8]) -> Fallible<()> {
        for hk in &self.hooks {
            hk.borrow_mut().on_content(contents)?;
        }

        Ok(())
    }
}

fn download_chunk(
    req: Request,
    offsets: (u64, u64),
    sender: mpsc::Sender<(u64, u64, Vec<u8>)>,
    errors: mpsc::Sender<(u64, u64)>,
) {
    fn inner(
        mut req: Request,
        offsets: (u64, u64),
        sender: mpsc::Sender<(u64, u64, Vec<u8>)>,
        start_offset: &mut u64,
    ) -> Fallible<()> {
        let byte_range = format!("bytes={}-{}", offsets.0, offsets.1);
        let headers = req.headers_mut();
        headers.insert(header::RANGE, HeaderValue::from_str(&byte_range)?);
        headers.insert(header::ACCEPT, HeaderValue::from_str("*/*")?);
        headers.insert(header::CONNECTION, HeaderValue::from_str("keep-alive")?);
        let mut resp = Client::new().execute(req)?;
        let chunk_sz = offsets.1 - offsets.0;
        let mut cnt = 0u64;
        loop {
            let mut buf = vec![0; chunk_sz as usize];
            let byte_count = resp.read(&mut buf[..])?;
            cnt += byte_count as u64;
            buf.truncate(byte_count);
            if !buf.is_empty() {
                sender.send((byte_count as u64, *start_offset, buf.clone()))?;
                *start_offset += byte_count as u64;
            } else {
                break;
            }
            if cnt == (chunk_sz + 1) {
                break;
            }
        }

        Ok(())
    }
    let mut start_offset = offsets.0;
    let end_offset = offsets.1;
    match inner(req, offsets, sender, &mut start_offset) {
        Ok(_) => {}
        Err(_) => match errors.send((start_offset, end_offset)) {
            _ => {}
        },
    }
}