#[allow(unused_imports)]
use crate::prelude::*;
use crate::{
types::*,
clickhouse_utils as chu,
};
use crusty_core::config as rc;
use std::{
cell::RefCell,
};
use backoff::future::retry;
use backoff::ExponentialBackoff;
use clickhouse::Reflection;
use serde::{Deserialize};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
#[derive(Clone, Debug, Deserialize)]
pub struct JobReaderConfig {
pub re_after_days: usize,
pub shard_min_last_read: rc::CDuration,
pub shard_min: usize,
pub shard_max: usize,
pub shard_total: usize,
pub shard_select_limit: usize,
pub job_buffer: usize,
pub domain_tail_top_n: usize,
pub domain_table_name: String,
pub default_crawling_settings: rc::CrawlingSettings,
pub seeds: Vec<String>,
}
impl Default for JobReaderConfig {
fn default() -> Self {
let shard_min = 1;
let shard_max = 25;
Self {
domain_table_name: String::from("domain_discovery"),
domain_tail_top_n: 3,
shard_min_last_read: rc::CDuration::from_secs(1),
shard_min,
shard_max,
shard_total: shard_max - shard_min + 1,
re_after_days: 3,
shard_select_limit: 100_000,
job_buffer: 100_000,
default_crawling_settings: rc::CrawlingSettings::default(),
seeds: vec![String::from("https://bash.im")]
}
}
}
pub struct JobReader {
cfg: JobReaderConfig
}
struct JobReaderState {
shard_min_last_read: Duration,
shard_last_read: RefCell<HashMap<u16, Instant>>,
free_shards: RefCell<LinkedList<u16>>,
busy_shards: RefCell<HashMap<u16, HashSet<String>>>,
jobs: RefCell<LinkedList<Domain>>,
}
impl JobReaderState {
fn new(shard_min: usize, shard_max: usize, shard_min_last_read: Duration) -> Self {
Self {
shard_min_last_read,
shard_last_read: RefCell::new(HashMap::new()),
free_shards: RefCell::new((shard_min as u16..shard_max as u16 + 1).collect()),
busy_shards: RefCell::new(HashMap::new()),
jobs: RefCell::new(LinkedList::new()),
}
}
fn next_job_and_shard(&self, task_buffer: usize) -> (Option<u16>, Option<Domain>) {
let mut jobs = self.jobs.borrow_mut();
let mut shard: Option<u16> = None;
if jobs.len() < task_buffer {
let mut free_shards = self.free_shards.borrow_mut();
for _ in 0..free_shards.len() {
shard = free_shards.pop_front();
if shard.is_none() || self.reserve_busy_shard(shard.unwrap()) {
break
}
free_shards.push_back(shard.unwrap());
shard = None
}
}
let job = jobs.pop_front();
(shard, job)
}
fn check_shard(&self, shard: u16) {
let mut busy_shards = self.busy_shards.borrow_mut();
let sh = busy_shards.get_mut(&shard);
if let Some(sh) = sh {
if sh.is_empty() {
busy_shards.remove(&shard);
self.free_shards.borrow_mut().push_back(shard);
info!("Busy shard {} evicted", shard);
}
}
}
fn reserve_busy_shard(&self, shard: u16) -> bool {
let mut busy_shards = self.busy_shards.borrow_mut();
let mut shard_last_read = self.shard_last_read.borrow_mut();
if let Some(last_read) = shard_last_read.get(&shard) {
if last_read.elapsed() < self.shard_min_last_read {
return false
}
}
shard_last_read.insert(shard, Instant::now());
if let Some(v) = busy_shards.insert(shard, HashSet::new()) {
panic!(
"Shard {} is not supposed to be busy: {:?}",
shard,
v
)
}
info!("Busy shard {} reserved", shard);
true
}
fn add_job(&self, shard: u16, domain: Domain) {
let mut busy_shards = self.busy_shards.borrow_mut();
let busy_shard = busy_shards.get_mut(&shard).unwrap();
if !busy_shard.insert(domain.domain.clone()) {
panic!("Shard {} already contains a job for {}!", shard, &domain.domain)
}
self.jobs.borrow_mut().push_back(domain.clone());
info!("Job {}/{} added!", shard, &domain.domain);
}
fn finish_job(&self, domain: &Domain) {
{
let mut busy_shards = self.busy_shards.borrow_mut();
let busy_shard = busy_shards.get_mut(&domain.shard);
if busy_shard.is_none() {
panic!("Got notification about finished job '{domain}' but couldn't locate shard {shard}", shard = domain.shard, domain = &domain.domain);
}
let busy_shard = busy_shard.unwrap();
if !busy_shard.remove(&domain.domain) {
panic!("Got notification about finished job '{domain}' but couldn't locate it inside the shard {shard}", shard = domain.shard, domain = &domain.domain);
}
}
info!("Job {}/{} is finished!", domain.shard, &domain.domain);
}
}
#[derive(Reflection, Deserialize)]
struct JobReaderRow<'a> {
domain: &'a str,
tails: Vec<&'a str>
}
#[derive(Debug)]
enum FutureResult {
JobsRead(Result<Vec<String>>),
JobsSent,
MetricsSent,
Notify(core::result::Result<chu::Notification<Domain>, RecvError>)
}
impl JobReader {
pub fn new(
cfg: JobReaderConfig,
) -> Self {
Self {
cfg
}
}
fn read_jobs<'a>(
&'a self,
client: &'a clickhouse::Client,
shard: u16,
) -> TracingTask<'a, Vec<String>> {
TracingTask::new_short_lived(span!(Level::INFO), async move {
let r = client
.query(format!(
"SELECT domain, groupArray(?)(domain_tail) as tails FROM (
SELECT domain, domain_tail FROM {}
WHERE shard = ?
GROUP BY shard, domain, domain_tail
HAVING max(updated_at) <= date_sub(DAY, ?, NOW())
)
GROUP BY domain
LIMIT ?", self.cfg.domain_table_name.as_str()
).as_str())
.bind(self.cfg.domain_tail_top_n as u32)
.bind(shard)
.bind(self.cfg.re_after_days as u32)
.bind(self.cfg.shard_select_limit as u32)
.fetch::<JobReaderRow<'_>>();
let mut cursor = r.context("cannot get cursor for domain_discovery")?;
let mut domains = vec![];
while let Some(row) = cursor.next().await.context("cannot read from domain_discovery")? {
row.tails.iter().fold(&mut domains, |domains, t| {
domains.push(if t.is_empty() { String::from(row.domain) } else { format!("{}.{}", t, row.domain) });
domains
});
}
Ok(domains)
})
}
fn handle_read_jobs(
&self,
state: &JobReaderState,
shard: u16,
jobs: Vec<String>,
queried_for: Duration,
) {
for domain in jobs {
state.add_job(shard, Domain::new(domain, self.cfg.shard_total, None, false));
}
state.check_shard(shard);
trace!(
"->jobs present: {}, query took: {}ms",
state.jobs.borrow().len(),
queried_for.as_millis()
);
}
async fn send_job(&self, tx: &Sender<Job>, job: Domain) {
let mut url = job.url.clone();
if url.is_none() {
let std_url = Url::parse(format!("http://{}", &job.domain).as_str());
if std_url.is_err() {
warn!("->cannot create task for {}: invalid url - {}", &job.domain, std_url.err().unwrap());
return
}
url = Some(std_url.unwrap());
}
let url = url.unwrap();
let job_obj = Job {
url,
settings: self.cfg.default_crawling_settings.clone(),
rules: Box::new(CrawlingRules {}),
job_state: JobState{selected_domain: job.clone()}
};
let _ = tx.send(job_obj).await;
trace!("->sending task for {}", &job.domain);
}
fn handle_sent_job(&self, state: &JobReaderState, job: Domain) {
info!(
"->new task sent for {}, jobs left: {}, free shards: {}, busy shards: {}",
job.domain,
state.jobs.borrow().len(),
state.free_shards.borrow().len(),
state.busy_shards.borrow().len()
);
}
fn handle_confirmation(&self, state: &JobReaderState, domains: Vec<Domain>) {
let mut shards = HashSet::new();
for domain in &domains {
state.finish_job(domain);
shards.insert(domain.shard);
}
for shard in shards {
state.check_shard(shard);
}
info!(
"<-{} tasks completed, jobs left: {}, free shards: {}, busy shards: {}",
domains.len(),
state.jobs.borrow().len(),
state.free_shards.borrow().len(),
state.busy_shards.borrow().len()
);
if state.free_shards.borrow().len() == 0 {
info!("busy shards dump: {:?}", state.busy_shards.borrow())
}
}
pub async fn go(
self,
client: clickhouse::Client,
tx_jobs: Sender<Job>,
tx_metrics_db: Sender<Vec<chu::GenericNotification>>,
rx_confirmation: Receiver<chu::Notification<Domain>>,
) -> Result<()>{
let state = JobReaderState::new(self.cfg.shard_min, self.cfg.shard_max, *self.cfg.shard_min_last_read);
let mut seed_domains : Vec<Domain> = self.cfg.seeds.iter()
.filter_map(|seed|Url::parse(seed).ok())
.map(|seed|Domain::new(
seed.domain().unwrap().into(), self.cfg.shard_total, Some(seed.clone()), false
))
.collect();
let mut last_read = Instant::now();
while !tx_jobs.is_closed() {
let (shard, job) = state.next_job_and_shard(self.cfg.job_buffer);
if !seed_domains.is_empty() && shard.is_some() {
let shard = shard.unwrap();
for i in 0..seed_domains.len() {
let d = &seed_domains[i];
if shard == d.shard {
state.add_job(d.shard, d.clone());
seed_domains.remove(i);
break
}
}
}
type BoxedFuture<'a> = Pin<Box<dyn Future<Output = FutureResult> + Send + 'a>>;
let mut futures = FuturesUnordered::<BoxedFuture>::new();
if shard.is_some() {
let read_jobs = Box::pin(retry(ExponentialBackoff::default(), || async {
self.read_jobs(&client, shard.unwrap())
.instrument().await.map_err(backoff::Error::Transient)
}));
futures.push(Box::pin(async move {
FutureResult::JobsRead(read_jobs.await)
}))
}
if job.is_some() {
let send_job = Box::pin(self.send_job(&tx_jobs, job.as_ref().unwrap().clone()));
futures.push(Box::pin(async move {
send_job.await;
FutureResult::JobsSent
}))
}
let mut awaiting_notification = true;
let read_notification = Box::pin(rx_confirmation.recv());
futures.push(Box::pin(async move {
FutureResult::Notify(read_notification.await)
}));
let t = Instant::now();
while let Some(r) = futures.next().await {
match r {
FutureResult::JobsRead(jobs) => {
let jobs = jobs.unwrap_or_default();
let queried_for = t.elapsed();
let notification = chu::GenericNotification{
table_name: self.cfg.domain_table_name.clone(),
label: String::from("read"),
since_last: last_read.elapsed(),
duration: queried_for,
items: jobs.len()
};
futures.push(Box::pin(async {
let _ = tx_metrics_db.send(vec![notification]).await;
FutureResult::MetricsSent
}));
last_read = Instant::now();
self.handle_read_jobs(&state, shard.unwrap(), jobs, queried_for);
},
FutureResult::JobsSent => {
self.handle_sent_job(&state, job.as_ref().unwrap().clone());
},
FutureResult::Notify(notification) => {
awaiting_notification = false;
self.handle_confirmation(&state, notification.unwrap().items);
},
_ => {}
}
if futures.len() <= 1 && awaiting_notification {
break
}
}
}
Ok(())
}
}