medina 0.0.2-alpha.5

An asynchronous web crawling engine
Documentation
use async_mutex::Mutex;
use event_listener::Event;
use futures::future::FutureExt;
use futures::select;
use futures_timer::Delay;
use plum::StandardBloomFilter;
use std::collections::{BinaryHeap, HashMap, VecDeque};
use std::time::{Duration, SystemTime};
use url::Url;

use super::host_priority_queue::{CrawlerScheduler, HostPriorityQueue};
use super::page::{Page, PageResult};
use super::task_runner::TaskRunner;

struct CrawlerQueue {
    urls: VecDeque<Box<dyn Page>>,
    next_crawl_time: SystemTime,
}

pub struct Crawler {
    seen: StandardBloomFilter<String>,
    queue_by_host: HashMap<String, CrawlerQueue>,
    pub items_count: usize,
    queue_priorities: HostPriorityQueue,
    done: bool,
    working: i64,
    delay_by_host: HashMap<String, Duration>,
    default_delay: Duration,
}

impl Crawler {
    pub fn new() -> Self {
        Self {
            seen: plum::StandardBloomFilter::new(10_000_000, 0.000001),
            queue_by_host: HashMap::new(),
            items_count: 0,
            queue_priorities: BinaryHeap::new(),
            done: false,
            working: 0,
            delay_by_host: HashMap::new(),
            default_delay: Duration::from_millis(200),
        }
    }

    pub fn with_capacity(items_count: usize, fp_rate: f64) -> Self {
        Self {
            seen: plum::StandardBloomFilter::new(items_count, fp_rate),
            queue_by_host: HashMap::new(),
            items_count: 0,
            queue_priorities: BinaryHeap::new(),
            done: false,
            working: 0,
            delay_by_host: HashMap::new(),
            default_delay: Duration::from_millis(200),
        }
    }

    pub fn push_item(mut self, page: Box<dyn Page>) -> Self {
        self.push_item_internal(page);
        self
    }

    fn push_item_internal(&mut self, page: Box<dyn Page>) -> i32 {
        let url_string = page.get_url();

        // If this item has already been added in the past, ignore it
        if self.seen.contains(url_string) {
            return 0;
        }
        self.seen.insert(&url_string);

        // Extract the host from the url, because our scheduler
        // is host based. We want to load balance our crawler
        // based on each host
        let url = Url::parse(url_string.as_str()).unwrap();
        let host = String::from(url.host_str().unwrap());
        let mut next_crawl_time = SystemTime::UNIX_EPOCH;

        match self.queue_by_host.get_mut(&host) {
            Some(crawler_queue) => {
                // We already seem this host. So the next crawl time
                // is the time it was last scheduled
                next_crawl_time = crawler_queue.next_crawl_time;

                if crawler_queue.urls.len() == 0 {
                    // If its list of items were empty it means it is not in
                    // binary heap anymore, so we need to re-add it so it can
                    // get crawled again.
                    self.queue_priorities.push(CrawlerScheduler {
                        host: String::from(host),
                        next_crawl_time: next_crawl_time,
                    });
                }
                // Finally add it to the items queue
                crawler_queue.urls.push_back(page);
            }
            None => {
                // New host. Enqueing is as easy as adding the host to the
                // priority queue and pushing the url to the back of
                // the items list.
                self.queue_priorities.push(CrawlerScheduler {
                    host: host.clone(),
                    next_crawl_time: next_crawl_time,
                });

                self.queue_by_host.insert(
                    host,
                    CrawlerQueue {
                        next_crawl_time: next_crawl_time,
                        urls: {
                            let mut vec = VecDeque::<Box<dyn Page>>::new();
                            vec.push_back(page);
                            vec
                        },
                    },
                );
            }
        }
        self.items_count += 1;

        // If the nextCrawlTime is lower or equal than the current timestamp
        // it means the item we just enqueued can be crawled right now.
        if next_crawl_time <= SystemTime::now() {
            return 1;
        } else {
            return 0;
        }
    }

    fn pop_item(&mut self) -> Option<Box<dyn Page>> {
        // If there are no items, there is nothing to pop here
        if self.items_count == 0 {
            return None;
        }

        // Pick the host from the priority queue
        let next_host = self.queue_priorities.pop().unwrap();
        // Get the delay for this host
        let delay = self.delay_for_host(&next_host.host.as_str());
        // Access the list of items
        let queue = self.queue_by_host.get_mut(&next_host.host).unwrap();
        // And get the next item in the list of items
        let page = queue.urls.pop_front().unwrap();

        // Update the nextCrawlTime to throttle the amount of requests
        self.items_count -= 1;
        queue.next_crawl_time = SystemTime::now() + delay;

        // If there are still items in the queue, let's re-enqueue it
        if queue.urls.len() > 0 {
            self.queue_priorities.push(CrawlerScheduler {
                host: next_host.host,
                next_crawl_time: queue.next_crawl_time,
            });
        }

        return Some(page);
    }

    pub fn delay_for_host(&self, host: &str) -> Duration {
        if let Some(delay) = self.delay_by_host.get(host) {
            return delay.clone();
        }

        return self.default_delay.clone();
    }

    pub fn set_host_request_rate(mut self, host: &str, requests_per_second: f64) -> Self {
        let duration =
            Duration::from_nanos((1_000_000_000.0f64 / requests_per_second).floor() as u64);
        self.delay_by_host.insert(host.to_string(), duration);
        self
    }

    pub fn set_default_request_rate(mut self, requests_per_second: f64) -> Self {
        let duration =
            Duration::from_nanos((1_000_000_000.0f64 / requests_per_second).floor() as u64);
        self.default_delay = duration;
        self
    }

    async fn process_request_with_retry(
        page: &Box<dyn Page>,
        max_retries: i32,
        start_time: SystemTime,
    ) -> (Vec<Box<dyn Page>>, Vec<Box<dyn PageResult>>) {
        // retry up to 'max_retries' times
        for i in 0..max_retries {
            let crawl_start_time = SystemTime::now();

            match page.process_request().await {
                Ok(res) => {
                    let _crawl_end_time = SystemTime::now();

                    println!(
                        "Crawled url: '{}', time_start: {}, duration: {}",
                        page.get_url(),
                        crawl_start_time
                            .duration_since(start_time)
                            .unwrap()
                            .as_micros() as f64
                            / 1000.0,
                        _crawl_end_time
                            .duration_since(crawl_start_time)
                            .unwrap()
                            .as_micros() as f64
                            / 1000.0
                    );

                    return res;
                }
                Err(err) => {
                    let crawl_end_time = SystemTime::now();

                    println!(
                        "Error processing: '{}', time_start: {}, duration: {}, retry: {}/{}, err: '{:?}'",
                        page.get_url(),
                        crawl_start_time
                            .duration_since(start_time)
                            .unwrap()
                            .as_micros() as f64
                            / 1000.0,
                        crawl_end_time
                            .duration_since(crawl_start_time)
                            .unwrap()
                            .as_micros() as f64
                            / 1000.0,
                            i + 1,
                            max_retries,
                            err
                    );
                    if i < max_retries - 1 {
                        // Sleep for 1 sec before retrying
                        Delay::new(Duration::from_millis(1000)).await;
                    }
                }
            }
        }

        return (Vec::new(), Vec::new());
    }

    pub async fn run(
        crawler_mutex: &(Mutex<Crawler>, Event),
        tasks_mutex: &(Mutex<TaskRunner>, Event),
        start: SystemTime,
    ) {
        let (mutex, event) = (&crawler_mutex.0, &crawler_mutex.1);

        loop {
            let mut crawler = mutex.lock().await;
            if crawler.done {
                break;
            }

            if crawler.items_count > 0 {
                let state = crawler.queue_priorities.peek().unwrap();
                let now = SystemTime::now();
                let next_crawl = state.next_crawl_time;

                if SystemTime::now() < next_crawl {
                    let listener = event.listen();
                    drop(crawler);

                    let diff = next_crawl.duration_since(now).unwrap();
                    select! {
                        a = Delay::new(diff).fuse() => {},
                        b = listener.fuse() => {}
                    }
                    continue;
                }
            } else {
                // queue is empty, lets wait until we have a new item added
                let listener = event.listen();
                drop(crawler);
                listener.await;
                continue;
            }

            crawler.working += 1;

            let page = crawler.pop_item().unwrap();
            drop(crawler);

            let (new_pages, results) = Crawler::process_request_with_retry(&page, 3, start).await;

            crawler = mutex.lock().await;
            let mut items_added = 0;
            for u in new_pages {
                items_added += crawler.push_item_internal(u);
            }
            crawler.working -= 1;

            let is_done = crawler.working == 0 && crawler.items_count == 0;
            if is_done {
                crawler.done = true;
            }

            if items_added > 0 || crawler.done {
                drop(crawler);
                event.notify(usize::MAX);
            }

            notify_task_runner(tasks_mutex, results, is_done).await;
        }

        async fn notify_task_runner(
            tasks_mutex: &(Mutex<TaskRunner>, Event),
            results: Vec<Box<dyn PageResult>>,
            is_done: bool,
        ) {
            if results.len() > 0 || is_done {
                let mut task_runner = tasks_mutex.0.lock().await;
                for task in results {
                    task_runner.queue.push_back(task);
                }
                if is_done {
                    task_runner.crawler_done = true;
                }

                drop(task_runner);
                tasks_mutex.1.notify(usize::MAX);
            }
        }
    }
}