crusty 0.1.0

Fast, scalable and stable Broad Web Crawler developed on top of crusty-core
#[allow(unused_imports)]
use crate::prelude::*;
use crate::{
    types::*,
    clickhouse_utils as chu,
};
use crusty_core::config as rc;

use std::{
    cell::RefCell,
};

use backoff::future::retry;
use backoff::ExponentialBackoff;
use clickhouse::Reflection;
use serde::{Deserialize};

use futures::stream::FuturesUnordered;
use futures::StreamExt;

#[derive(Clone, Debug, Deserialize)]
pub struct JobReaderConfig {
    pub re_after_days: usize,
    pub shard_min_last_read: rc::CDuration,
    pub shard_min: usize,
    pub shard_max: usize,
    pub shard_total: usize,
    pub shard_select_limit: usize,
    pub job_buffer: usize,
    pub domain_tail_top_n: usize,
    pub domain_table_name: String,
    pub default_crawling_settings: rc::CrawlingSettings,
    pub seeds: Vec<String>,
}

impl Default for JobReaderConfig {
    fn default() -> Self {
        let shard_min = 1;
        let shard_max = 25;
        Self {
            domain_table_name: String::from("domain_discovery"),
            domain_tail_top_n: 3,
            shard_min_last_read: rc::CDuration::from_secs(1),
            shard_min,
            shard_max,
            shard_total: shard_max - shard_min + 1,
            re_after_days: 3,
            shard_select_limit: 100_000,
            job_buffer: 100_000,
            default_crawling_settings: rc::CrawlingSettings::default(),

            seeds: vec![String::from("https://bash.im")]
        }
    }
}

pub struct JobReader {
    cfg: JobReaderConfig
}

struct JobReaderState {
    shard_min_last_read: Duration,
    shard_last_read: RefCell<HashMap<u16, Instant>>,
    free_shards: RefCell<LinkedList<u16>>,
    busy_shards: RefCell<HashMap<u16, HashSet<String>>>,
    jobs: RefCell<LinkedList<Domain>>,
}

impl JobReaderState {
    fn new(shard_min: usize, shard_max: usize, shard_min_last_read: Duration) -> Self {
        Self {
            shard_min_last_read,
            shard_last_read: RefCell::new(HashMap::new()),
            free_shards: RefCell::new((shard_min as u16..shard_max as u16 + 1).collect()),
            busy_shards: RefCell::new(HashMap::new()),
            jobs: RefCell::new(LinkedList::new()),
        }
    }

    fn next_job_and_shard(&self, task_buffer: usize) -> (Option<u16>, Option<Domain>) {
        let mut jobs = self.jobs.borrow_mut();

        let mut shard: Option<u16> = None;
        if jobs.len() < task_buffer {
            let mut free_shards = self.free_shards.borrow_mut();

            for _ in 0..free_shards.len() {
                shard = free_shards.pop_front();
                if shard.is_none() || self.reserve_busy_shard(shard.unwrap()) {
                    break
                }
                free_shards.push_back(shard.unwrap());
                shard = None
            }
        }

        let job = jobs.pop_front();

        (shard, job)
    }

    fn check_shard(&self, shard: u16) {
        let mut busy_shards = self.busy_shards.borrow_mut();
        let sh = busy_shards.get_mut(&shard);
        if let Some(sh) = sh {
            if sh.is_empty() {
                busy_shards.remove(&shard);
                self.free_shards.borrow_mut().push_back(shard);
                info!("Busy shard {} evicted", shard);
            }
        }
    }

    fn reserve_busy_shard(&self, shard: u16) ->  bool {
        let mut busy_shards = self.busy_shards.borrow_mut();
        let mut shard_last_read = self.shard_last_read.borrow_mut();

        if let Some(last_read) = shard_last_read.get(&shard) {
            if last_read.elapsed() < self.shard_min_last_read {
                return false
            }
        }
        shard_last_read.insert(shard, Instant::now());

        if let Some(v) = busy_shards.insert(shard, HashSet::new()) {
            panic!(
                "Shard {} is not supposed to be busy: {:?}",
                shard,
                v
            )
        }

        info!("Busy shard {} reserved", shard);
        true
    }

    fn add_job(&self, shard: u16, domain: Domain) {
        let mut busy_shards = self.busy_shards.borrow_mut();
        let busy_shard = busy_shards.get_mut(&shard).unwrap();
        if !busy_shard.insert(domain.domain.clone()) {
            panic!("Shard {} already contains a job for {}!", shard, &domain.domain)
        }

        self.jobs.borrow_mut().push_back(domain.clone());

        info!("Job {}/{} added!", shard, &domain.domain);
    }

    fn finish_job(&self, domain: &Domain) {
        {
            let mut busy_shards = self.busy_shards.borrow_mut();

            let busy_shard = busy_shards.get_mut(&domain.shard);
            if busy_shard.is_none() {
                panic!("Got notification about finished job '{domain}' but couldn't locate shard {shard}", shard = domain.shard, domain = &domain.domain);
            }

            let busy_shard = busy_shard.unwrap();
            if !busy_shard.remove(&domain.domain) {
                panic!("Got notification about finished job '{domain}' but couldn't locate it inside the shard {shard}", shard = domain.shard, domain = &domain.domain);
            }
        }

        info!("Job {}/{} is finished!", domain.shard, &domain.domain);
    }
}

#[derive(Reflection, Deserialize)]
struct JobReaderRow<'a> {
    domain: &'a str,
    tails: Vec<&'a str>
}

#[derive(Debug)]
enum FutureResult {
    JobsRead(Result<Vec<String>>),
    JobsSent,
    MetricsSent,
    Notify(core::result::Result<chu::Notification<Domain>, RecvError>)
}

impl JobReader {
    pub fn new(
        cfg: JobReaderConfig,
    ) -> Self {
        Self {
            cfg
        }
    }

    fn read_jobs<'a>(
        &'a self,
        client: &'a clickhouse::Client,
        shard: u16,
    ) -> TracingTask<'a, Vec<String>> {
        TracingTask::new_short_lived(span!(Level::INFO), async move {
            let r = client
                .query(format!(
                    "SELECT domain, groupArray(?)(domain_tail) as tails FROM (
                    SELECT domain, domain_tail FROM {}
                    WHERE shard = ?
                    GROUP BY shard, domain, domain_tail
                    HAVING max(updated_at) <= date_sub(DAY, ?, NOW())
                )
                GROUP BY domain
                LIMIT ?", self.cfg.domain_table_name.as_str()
                ).as_str())
                .bind(self.cfg.domain_tail_top_n as u32)
                .bind(shard)
                .bind(self.cfg.re_after_days as u32)
                .bind(self.cfg.shard_select_limit as u32)
                .fetch::<JobReaderRow<'_>>();

            let mut cursor = r.context("cannot get cursor for domain_discovery")?;

            let mut domains = vec![];
            while let Some(row) = cursor.next().await.context("cannot read from domain_discovery")? {
                row.tails.iter().fold(&mut domains, |domains, t| {
                    domains.push(if t.is_empty() { String::from(row.domain) } else { format!("{}.{}", t, row.domain) });
                    domains
                });
            }

            Ok(domains)
        })
    }

    fn handle_read_jobs(
        &self,
        state: &JobReaderState,
        shard: u16,
        jobs: Vec<String>,
        queried_for: Duration,
    ) {
        for domain in jobs {
            state.add_job(shard, Domain::new(domain, self.cfg.shard_total, None, false));
        }

        state.check_shard(shard);
        trace!(
            "->jobs present: {}, query took: {}ms",
            state.jobs.borrow().len(),
            queried_for.as_millis()
        );
    }

    async fn send_job(&self, tx: &Sender<Job>, job: Domain) {
        let mut url = job.url.clone();
        if url.is_none() {
            let std_url = Url::parse(format!("http://{}", &job.domain).as_str());
            if std_url.is_err() {
                warn!("->cannot create task for {}: invalid url - {}", &job.domain, std_url.err().unwrap());
                return
            }
            url = Some(std_url.unwrap());
        }
        let url = url.unwrap();

        let job_obj = Job {
            url,
            settings: self.cfg.default_crawling_settings.clone(),
            rules: Box::new(CrawlingRules {}),
            job_state: JobState{selected_domain: job.clone()}
        };

        let _ = tx.send(job_obj).await;
        trace!("->sending task  for {}", &job.domain);
    }

    fn handle_sent_job(&self, state: &JobReaderState, job: Domain) {
        info!(
            "->new task sent for {}, jobs left: {}, free shards: {}, busy shards: {}",
            job.domain,
            state.jobs.borrow().len(),
            state.free_shards.borrow().len(),
            state.busy_shards.borrow().len()
        );
    }

    fn handle_confirmation(&self, state: &JobReaderState, domains: Vec<Domain>) {
        let mut shards = HashSet::new();
        for domain in &domains {
            state.finish_job(domain);
            shards.insert(domain.shard);
        }
        for shard in shards {
            state.check_shard(shard);
        }
        info!(
            "<-{} tasks completed, jobs left: {}, free shards: {}, busy shards: {}",
            domains.len(),
            state.jobs.borrow().len(),
            state.free_shards.borrow().len(),
            state.busy_shards.borrow().len()
        );
        if state.free_shards.borrow().len() == 0 {
            info!("busy shards dump: {:?}", state.busy_shards.borrow())
        }
    }

    pub async fn go(
        self,
        client: clickhouse::Client,
        tx_jobs: Sender<Job>,
        tx_metrics_db: Sender<Vec<chu::GenericNotification>>,
        rx_confirmation: Receiver<chu::Notification<Domain>>,
    ) -> Result<()>{
        let state = JobReaderState::new(self.cfg.shard_min, self.cfg.shard_max, *self.cfg.shard_min_last_read);

        let mut seed_domains : Vec<Domain> = self.cfg.seeds.iter()
            .filter_map(|seed|Url::parse(seed).ok())
            .map(|seed|Domain::new(
                seed.domain().unwrap().into(), self.cfg.shard_total, Some(seed.clone()), false
            ))
            .collect();

        let mut last_read = Instant::now();
        while !tx_jobs.is_closed() {
            let (shard, job) = state.next_job_and_shard(self.cfg.job_buffer);

            // seed domains list is fairly short, so we do it dumb ;)
            if !seed_domains.is_empty() && shard.is_some() {
                let shard = shard.unwrap();
                for i in 0..seed_domains.len() {
                    let d = &seed_domains[i];
                    if shard == d.shard {
                        state.add_job(d.shard, d.clone());
                        seed_domains.remove(i);
                        break
                    }
                }
            }

            type BoxedFuture<'a> = Pin<Box<dyn Future<Output = FutureResult> + Send + 'a>>;
            let mut futures = FuturesUnordered::<BoxedFuture>::new();

            if shard.is_some() {
                let read_jobs = Box::pin(retry(ExponentialBackoff::default(), || async {
                    self.read_jobs(&client, shard.unwrap())
                        .instrument().await.map_err(backoff::Error::Transient)
                }));
                futures.push(Box::pin(async move {
                    FutureResult::JobsRead(read_jobs.await)
                }))
            }

            if job.is_some() {
                let send_job = Box::pin(self.send_job(&tx_jobs, job.as_ref().unwrap().clone()));
                futures.push(Box::pin(async move {
                    send_job.await;
                    FutureResult::JobsSent
                }))
            }

            let mut awaiting_notification = true;
            let read_notification = Box::pin(rx_confirmation.recv());
            futures.push(Box::pin(async move {
                FutureResult::Notify(read_notification.await)
            }));

            let t = Instant::now();
            while let Some(r) = futures.next().await {
                match r {
                    FutureResult::JobsRead(jobs) => {
                        let jobs = jobs.unwrap_or_default();

                        let queried_for = t.elapsed();
                        let notification = chu::GenericNotification{
                            table_name: self.cfg.domain_table_name.clone(),
                            label: String::from("read"),
                            since_last: last_read.elapsed(),
                            duration: queried_for,
                            items: jobs.len()
                        };
                        futures.push(Box::pin(async {
                            let _ = tx_metrics_db.send(vec![notification]).await;
                            FutureResult::MetricsSent
                        }));

                        last_read = Instant::now();
                        self.handle_read_jobs(&state, shard.unwrap(), jobs, queried_for);
                    },
                    FutureResult::JobsSent => {
                        self.handle_sent_job(&state, job.as_ref().unwrap().clone());
                    },
                    FutureResult::Notify(notification) => {
                        awaiting_notification = false;
                        self.handle_confirmation(&state, notification.unwrap().items);
                    },
                    _ => {}
                }

                if futures.len() <= 1 && awaiting_notification {
                    break
                }
            }
        }

        Ok(())
    }
}