use super::task_runner::TaskRunner;
use super::Crawler;
use async_mutex::Mutex;
use event_listener::Event;
use std::time::SystemTime;
#[cfg(feature = "tokio")]
use tokio::task;
#[cfg(feature = "async-std")]
use async_std::task;
pub struct CrawlerRunner {
internal: (Mutex<Crawler>, Event),
task_runner: (Mutex<TaskRunner>, Event),
crawling_concurrency: i32,
processing_concurrency: i32,
item_count: usize,
}
impl CrawlerRunner {
pub fn new(crawler: Crawler) -> Self {
Self {
item_count: crawler.items_count,
internal: (Mutex::new(crawler), Event::new()),
task_runner: (Mutex::new(TaskRunner::new()), Event::new()),
crawling_concurrency: 200,
processing_concurrency: 20,
}
}
pub fn set_crawling_concurrency(mut self, num_tasks: i32) -> Self {
if num_tasks <= 0 {
panic!("num_tasks must be higher than 0");
}
self.crawling_concurrency = num_tasks;
self
}
pub fn set_processing_concurrency(mut self, num_tasks: i32) -> Self {
if num_tasks <= 0 {
panic!("num_tasks must be higher than 0");
}
self.processing_concurrency = num_tasks;
self
}
#[cfg(any(feature = "tokio", feature = "async-std"))]
pub async fn spawn(self) {
let num_crawler_tasks = self.crawling_concurrency;
let num_processing_tasks = self.processing_concurrency;
let runner = std::sync::Arc::new(self);
let mut tasks = Vec::new();
if runner.item_count == 0 {
return;
}
let start = SystemTime::now();
for _ in 0..num_crawler_tasks {
let crawler_runner = runner.clone();
tasks.push(task::spawn(async move {
Crawler::run(&crawler_runner.internal, &crawler_runner.task_runner, start).await;
}));
}
for _ in 0..num_processing_tasks {
let crawler_runner = runner.clone();
tasks.push(task::spawn(async move {
TaskRunner::run(&crawler_runner.task_runner).await;
}));
}
#[allow(unused_must_use)]
{
for t in tasks {
t.await;
}
}
}
}