use std::sync::Arc;
use async_trait::async_trait;
use crate::{
error::KumoError,
events::{CrawlEvent, EventEmitter},
};
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum HookErrorPolicy {
#[default]
LogAndContinue,
AbortCrawl,
}
#[async_trait]
pub trait CrawlHook: Send + Sync {
async fn on_event(&self, event: &CrawlEvent) -> Result<(), KumoError> {
match event {
CrawlEvent::CrawlStarted { .. } => self.on_crawl_started(event).await,
CrawlEvent::RequestScheduled { .. } => self.on_request_scheduled(event).await,
CrawlEvent::RequestSkipped { .. } => self.on_request_skipped(event).await,
CrawlEvent::RequestStarted { .. } => self.on_request_started(event).await,
CrawlEvent::RequestCompleted { .. } => self.on_request_completed(event).await,
CrawlEvent::RequestRetried { .. } => self.on_request_retried(event).await,
CrawlEvent::RequestFailed { .. } => self.on_request_failed(event).await,
CrawlEvent::TaskPanicked { .. } => self.on_task_panicked(event).await,
CrawlEvent::ItemScraped { .. } => self.on_item_scraped(event).await,
CrawlEvent::ItemDropped { .. } => self.on_item_dropped(event).await,
CrawlEvent::CrawlFinished { .. } => self.on_crawl_finished(event).await,
}
}
async fn on_crawl_started(&self, _event: &CrawlEvent) -> Result<(), KumoError> {
Ok(())
}
async fn on_request_scheduled(&self, _event: &CrawlEvent) -> Result<(), KumoError> {
Ok(())
}
async fn on_request_skipped(&self, _event: &CrawlEvent) -> Result<(), KumoError> {
Ok(())
}
async fn on_request_started(&self, _event: &CrawlEvent) -> Result<(), KumoError> {
Ok(())
}
async fn on_request_completed(&self, _event: &CrawlEvent) -> Result<(), KumoError> {
Ok(())
}
async fn on_request_retried(&self, _event: &CrawlEvent) -> Result<(), KumoError> {
Ok(())
}
async fn on_request_failed(&self, _event: &CrawlEvent) -> Result<(), KumoError> {
Ok(())
}
async fn on_task_panicked(&self, _event: &CrawlEvent) -> Result<(), KumoError> {
Ok(())
}
async fn on_item_scraped(&self, _event: &CrawlEvent) -> Result<(), KumoError> {
Ok(())
}
async fn on_item_dropped(&self, _event: &CrawlEvent) -> Result<(), KumoError> {
Ok(())
}
async fn on_crawl_finished(&self, _event: &CrawlEvent) -> Result<(), KumoError> {
Ok(())
}
}
#[derive(Clone)]
pub(crate) struct HookDispatcher {
hooks: Arc<Vec<Arc<dyn CrawlHook>>>,
policy: HookErrorPolicy,
}
impl HookDispatcher {
pub(crate) fn new(hooks: Vec<Arc<dyn CrawlHook>>, policy: HookErrorPolicy) -> Self {
Self {
hooks: Arc::new(hooks),
policy,
}
}
pub(crate) async fn dispatch(&self, event: &CrawlEvent) -> Result<(), KumoError> {
for hook in self.hooks.iter() {
if let Err(err) = hook.on_event(event).await {
match self.policy {
HookErrorPolicy::LogAndContinue => {
tracing::warn!(
event = event.name(),
error = %err,
policy = "log_and_continue",
"crawl hook failed"
);
}
HookErrorPolicy::AbortCrawl => {
return Err(KumoError::hook(format!(
"crawl hook failed while handling {}: {err}",
event.name()
)));
}
}
}
}
Ok(())
}
}
#[derive(Clone)]
pub(crate) struct CrawlObserver {
events: Option<EventEmitter>,
hooks: HookDispatcher,
}
impl CrawlObserver {
pub(crate) fn new(events: Option<EventEmitter>, hooks: HookDispatcher) -> Self {
Self { events, hooks }
}
pub(crate) async fn notify(&self, event: CrawlEvent) -> Result<(), KumoError> {
if let Some(events) = &self.events {
events.emit(event.clone());
}
self.hooks.dispatch(&event).await
}
}