use async_mutex::Mutex;
use event_listener::Event;
use futures::future::FutureExt;
use futures::select;
use futures_timer::Delay;
use plum::StandardBloomFilter;
use std::collections::{BinaryHeap, HashMap, VecDeque};
use std::time::{Duration, SystemTime};
use url::Url;
use super::host_priority_queue::{CrawlerScheduler, HostPriorityQueue};
use super::page::{Page, PageResult};
use super::task_runner::TaskRunner;
struct CrawlerQueue {
urls: VecDeque<Box<dyn Page>>,
next_crawl_time: SystemTime,
}
pub struct Crawler {
seen: StandardBloomFilter<String>,
queue_by_host: HashMap<String, CrawlerQueue>,
pub items_count: usize,
queue_priorities: HostPriorityQueue,
done: bool,
working: i64,
delay_by_host: HashMap<String, Duration>,
default_delay: Duration,
}
impl Crawler {
pub fn new() -> Self {
Self {
seen: plum::StandardBloomFilter::new(10_000_000, 0.000001),
queue_by_host: HashMap::new(),
items_count: 0,
queue_priorities: BinaryHeap::new(),
done: false,
working: 0,
delay_by_host: HashMap::new(),
default_delay: Duration::from_millis(200),
}
}
pub fn with_capacity(items_count: usize, fp_rate: f64) -> Self {
Self {
seen: plum::StandardBloomFilter::new(items_count, fp_rate),
queue_by_host: HashMap::new(),
items_count: 0,
queue_priorities: BinaryHeap::new(),
done: false,
working: 0,
delay_by_host: HashMap::new(),
default_delay: Duration::from_millis(200),
}
}
pub fn push_item(mut self, page: Box<dyn Page>) -> Self {
self.push_item_internal(page);
self
}
fn push_item_internal(&mut self, page: Box<dyn Page>) -> i32 {
let url_string = page.get_url();
if self.seen.contains(url_string) {
return 0;
}
self.seen.insert(&url_string);
let url = Url::parse(url_string.as_str()).unwrap();
let host = String::from(url.host_str().unwrap());
let mut next_crawl_time = SystemTime::UNIX_EPOCH;
match self.queue_by_host.get_mut(&host) {
Some(crawler_queue) => {
next_crawl_time = crawler_queue.next_crawl_time;
if crawler_queue.urls.len() == 0 {
self.queue_priorities.push(CrawlerScheduler {
host: String::from(host),
next_crawl_time: next_crawl_time,
});
}
crawler_queue.urls.push_back(page);
}
None => {
self.queue_priorities.push(CrawlerScheduler {
host: host.clone(),
next_crawl_time: next_crawl_time,
});
self.queue_by_host.insert(
host,
CrawlerQueue {
next_crawl_time: next_crawl_time,
urls: {
let mut vec = VecDeque::<Box<dyn Page>>::new();
vec.push_back(page);
vec
},
},
);
}
}
self.items_count += 1;
if next_crawl_time <= SystemTime::now() {
return 1;
} else {
return 0;
}
}
fn pop_item(&mut self) -> Option<Box<dyn Page>> {
if self.items_count == 0 {
return None;
}
let next_host = self.queue_priorities.pop().unwrap();
let delay = self.delay_for_host(&next_host.host.as_str());
let queue = self.queue_by_host.get_mut(&next_host.host).unwrap();
let page = queue.urls.pop_front().unwrap();
self.items_count -= 1;
queue.next_crawl_time = SystemTime::now() + delay;
if queue.urls.len() > 0 {
self.queue_priorities.push(CrawlerScheduler {
host: next_host.host,
next_crawl_time: queue.next_crawl_time,
});
}
return Some(page);
}
pub fn delay_for_host(&self, host: &str) -> Duration {
if let Some(delay) = self.delay_by_host.get(host) {
return delay.clone();
}
return self.default_delay.clone();
}
pub fn set_host_request_rate(mut self, host: &str, requests_per_second: f64) -> Self {
let duration =
Duration::from_nanos((1_000_000_000.0f64 / requests_per_second).floor() as u64);
self.delay_by_host.insert(host.to_string(), duration);
self
}
pub fn set_default_request_rate(mut self, requests_per_second: f64) -> Self {
let duration =
Duration::from_nanos((1_000_000_000.0f64 / requests_per_second).floor() as u64);
self.default_delay = duration;
self
}
async fn process_request_with_retry(
page: &Box<dyn Page>,
max_retries: i32,
start_time: SystemTime,
) -> (Vec<Box<dyn Page>>, Vec<Box<dyn PageResult>>) {
for i in 0..max_retries {
let crawl_start_time = SystemTime::now();
match page.process_request().await {
Ok(res) => {
let _crawl_end_time = SystemTime::now();
println!(
"Crawled url: '{}', time_start: {}, duration: {}",
page.get_url(),
crawl_start_time
.duration_since(start_time)
.unwrap()
.as_micros() as f64
/ 1000.0,
_crawl_end_time
.duration_since(crawl_start_time)
.unwrap()
.as_micros() as f64
/ 1000.0
);
return res;
}
Err(err) => {
let crawl_end_time = SystemTime::now();
println!(
"Error processing: '{}', time_start: {}, duration: {}, retry: {}/{}, err: '{:?}'",
page.get_url(),
crawl_start_time
.duration_since(start_time)
.unwrap()
.as_micros() as f64
/ 1000.0,
crawl_end_time
.duration_since(crawl_start_time)
.unwrap()
.as_micros() as f64
/ 1000.0,
i + 1,
max_retries,
err
);
if i < max_retries - 1 {
Delay::new(Duration::from_millis(1000)).await;
}
}
}
}
return (Vec::new(), Vec::new());
}
pub async fn run(
crawler_mutex: &(Mutex<Crawler>, Event),
tasks_mutex: &(Mutex<TaskRunner>, Event),
start: SystemTime,
) {
let (mutex, event) = (&crawler_mutex.0, &crawler_mutex.1);
loop {
let mut crawler = mutex.lock().await;
if crawler.done {
break;
}
if crawler.items_count > 0 {
let state = crawler.queue_priorities.peek().unwrap();
let now = SystemTime::now();
let next_crawl = state.next_crawl_time;
if SystemTime::now() < next_crawl {
let listener = event.listen();
drop(crawler);
let diff = next_crawl.duration_since(now).unwrap();
select! {
a = Delay::new(diff).fuse() => {},
b = listener.fuse() => {}
}
continue;
}
} else {
let listener = event.listen();
drop(crawler);
listener.await;
continue;
}
crawler.working += 1;
let page = crawler.pop_item().unwrap();
drop(crawler);
let (new_pages, results) = Crawler::process_request_with_retry(&page, 3, start).await;
crawler = mutex.lock().await;
let mut items_added = 0;
for u in new_pages {
items_added += crawler.push_item_internal(u);
}
crawler.working -= 1;
let is_done = crawler.working == 0 && crawler.items_count == 0;
if is_done {
crawler.done = true;
}
if items_added > 0 || crawler.done {
drop(crawler);
event.notify(usize::MAX);
}
notify_task_runner(tasks_mutex, results, is_done).await;
}
async fn notify_task_runner(
tasks_mutex: &(Mutex<TaskRunner>, Event),
results: Vec<Box<dyn PageResult>>,
is_done: bool,
) {
if results.len() > 0 || is_done {
let mut task_runner = tasks_mutex.0.lock().await;
for task in results {
task_runner.queue.push_back(task);
}
if is_done {
task_runner.crawler_done = true;
}
drop(task_runner);
tasks_mutex.1.notify(usize::MAX);
}
}
}
}