crusty-core 0.82.0

Library for creating blazing fast and configurable web crawlers
Documentation
use humansize::{file_size_opts, FileSize};
use thiserror::{self, Error};

use crate::{_prelude::*, config, load_filters, status_filters, task_expanders, task_filters};

pub trait ParsedDocument: 'static {}
pub type TaskFilters<JS, TS> = Vec<Box<dyn task_filters::Filter<JS, TS> + Send + Sync>>;
pub type StatusFilters<JS, TS> = Vec<Box<dyn status_filters::Filter<JS, TS> + Send + Sync>>;
pub type LoadFilters<JS, TS> = Vec<Box<dyn load_filters::Filter<JS, TS> + Send + Sync>>;
pub type TaskExpanders<JS, TS, P> = Vec<Box<dyn task_expanders::Expander<JS, TS, P> + Send + Sync>>;
pub type DocumentParser<P> = Box<dyn Fn(Box<dyn io::Read + Sync + Send>) -> Result<P> + Send + Sync + 'static>;

pub type BoxedFn<T> = Box<dyn Fn() -> Box<T> + Send + Sync>;
pub type BoxedTaskFilter<JS, TS> = BoxedFn<dyn task_filters::Filter<JS, TS> + Send + Sync>;
pub type BoxedStatusFilter<JS, TS> = BoxedFn<dyn status_filters::Filter<JS, TS> + Send + Sync>;
pub type BoxedLoadFilter<JS, TS> = BoxedFn<dyn load_filters::Filter<JS, TS> + Send + Sync>;
pub type BoxedTaskExpander<JS, TS, P> = BoxedFn<dyn task_expanders::Expander<JS, TS, P> + Send + Sync>;

pub struct Job<JS: JobStateValues, TS: TaskStateValues, P: ParsedDocument> {
	pub url:       url::Url,
	pub addrs:     Vec<SocketAddr>,
	pub settings:  Arc<config::CrawlingSettings>,
	pub rules:     Box<dyn JobRules<JS, TS, P>>,
	pub user_arg:  Option<u64>,
	pub job_state: JS,
}

impl<JS: JobStateValues, TS: TaskStateValues, P: ParsedDocument> Job<JS, TS, P> {
	pub fn new<R: JobRules<JS, TS, P>>(
		url: &str,
		settings: config::CrawlingSettings,
		rules: R,
		job_state: JS,
	) -> anyhow::Result<Job<JS, TS, P>> {
		Self::new_with_shared_settings(url, Arc::new(settings), rules, job_state)
	}

	pub fn new_with_shared_settings<R: JobRules<JS, TS, P>>(
		url: &str,
		settings: Arc<config::CrawlingSettings>,
		rules: R,
		job_state: JS,
	) -> anyhow::Result<Job<JS, TS, P>> {
		let url = Url::parse(url).context("cannot parse url")?;

		Ok(Self { url, addrs: vec![], settings, rules: Box::new(rules), job_state, user_arg: None })
	}

	pub fn with_addrs(mut self, addrs: Vec<SocketAddr>) -> Self {
		self.addrs = addrs;
		self
	}

	pub fn with_user_arg(mut self, user_arg: u64) -> Self {
		self.user_arg = Some(user_arg);
		self
	}
}

#[derive(Derivative)]
#[derivative(Clone(bound = ""))]
pub struct ResolvedJob<JS: JobStateValues, TS: TaskStateValues, P: ParsedDocument> {
	pub url:      url::Url,
	pub addrs:    Vec<SocketAddr>,
	pub settings: Arc<config::CrawlingSettings>,
	pub rules:    Arc<Box<dyn JobRules<JS, TS, P>>>,
	pub ctx:      JobCtx<JS, TS>,
}

impl<JS: JobStateValues, TS: TaskStateValues, P: ParsedDocument> From<Job<JS, TS, P>> for ResolvedJob<JS, TS, P> {
	fn from(job: Job<JS, TS, P>) -> ResolvedJob<JS, TS, P> {
		ResolvedJob {
			url:      job.url.clone(),
			addrs:    job.addrs,
			settings: Arc::clone(&job.settings),
			rules:    Arc::new(job.rules),
			ctx:      JobCtx::new(job.url, job.settings, job.job_state, TS::default(), job.user_arg),
		}
	}
}

pub trait JobRules<JS: JobStateValues, TS: TaskStateValues, P: ParsedDocument>: Send + Sync + 'static {
	fn task_filters(&self) -> TaskFilters<JS, TS>;
	fn status_filters(&self) -> StatusFilters<JS, TS>;
	fn load_filters(&self) -> LoadFilters<JS, TS>;
	fn task_expanders(&self) -> TaskExpanders<JS, TS, P>;
	fn document_parser(&self) -> Arc<DocumentParser<P>>;
}

pub type BoxedJobRules<JS, TS, P> = Box<dyn JobRules<JS, TS, P>>;

#[derive(Error, Debug)]
pub enum Error {
	#[error(transparent)]
	Other(#[from] anyhow::Error),
	#[error("timeout while awaiting for status response")]
	StatusTimeout,
	#[error("timeout during loading")]
	LoadTimeout,
}

#[derive(Debug, Clone, IntoStaticStr)]
pub enum FilterKind {
	HeadStatusFilter,
	GetStatusFilter,
	LoadFilter,
	FollowFilter,
}

#[derive(Error, Debug, Clone)]
pub enum ExtStatusError {
	#[error("terminated by {kind:?} {name:?}")]
	Term { kind: FilterKind, name: &'static str, reason: &'static str },
	#[error("error in {kind:?} {name:?}")]
	Err {
		kind:   FilterKind,
		name:   &'static str,
		#[source]
		source: Arc<anyhow::Error>,
	},
	#[error("panic in {kind:?} {name:?}")]
	Panic {
		kind:   FilterKind,
		name:   &'static str,
		#[source]
		source: Arc<anyhow::Error>,
	},
}

pub type Result<T> = anyhow::Result<T, Error>;

#[derive(Error, Debug)]
pub enum ExtError {
	#[error(transparent)]
	Other(#[from] anyhow::Error),
	#[error("terminated because '{reason:?}'")]
	Term { reason: &'static str },
}
pub type ExtResult<T> = std::result::Result<T, ExtError>;

#[derive(Error, Debug)]
pub enum TaskError {
	#[error("timeout while waiting for loading({limit:?})")]
	Timeout { limit: Duration },
	#[error("http response is too big(>{limit:?})")]
	HttpTooBigResponse { limit: u32 },
	#[error("http response is malformed")]
	HttpMalformedResponse,
	#[error("http redirect limit reached({limit:?})")]
	HttpRedirectLimitReached { limit: u16 },
	#[error("http response error, code {code:?}")]
	HttpError { code: u16 },
}

#[derive(Error, Debug)]
pub enum JobError {
	#[error("job finished by soft timeout")]
	JobFinishedBySoftTimeout,
	#[error("job finished by hard timeout")]
	JobFinishedByHardTimeout,
	#[error("job finished by error in a root task")]
	RootTaskError(TaskError),
}

#[derive(Clone, PartialEq, Debug, Copy, IntoStaticStr)]
pub enum LinkTarget {
	JustResolveDNS = 0,
	Head           = 1,
	Load           = 2,
	HeadLoad       = 3,
	Follow         = 4,
	HeadFollow     = 5,
}

impl fmt::Display for LinkTarget {
	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
		write!(f, "{:?}", self)
	}
}

pub type LinkIter<'a> = Box<dyn Iterator<Item = &'a Link> + Send + 'a>;

#[derive(Clone)]
pub struct Link {
	pub url:      Url,
	pub rel:      String,
	pub alt:      String,
	pub text:     String,
	pub redirect: usize,
	pub target:   LinkTarget,
	pub marker:   u8,
}

impl Link {
	pub fn host(&self) -> Option<String> {
		Some(self.url.host()?.to_string().trim().to_lowercase())
	}
}

#[derive(Clone, Default)]
pub struct ResolveMetrics {
	pub duration: Duration,
}

#[derive(Clone)]
pub struct ResolveData {
	pub addrs:   Vec<SocketAddr>,
	pub metrics: ResolveMetrics,
}

#[derive(Clone)]
pub struct HeaderMap(pub http::HeaderMap<http::HeaderValue>);

impl HeaderMap {
	pub fn get_str(&self, name: http::header::HeaderName) -> anyhow::Result<&str> {
		let name_str = String::from(name.as_str());
		self.0
			.get(name)
			.ok_or_else(|| anyhow!("{}: not found", name_str))?
			.to_str()
			.with_context(|| format!("cannot read {} value", name_str))
	}
}

impl Deref for HeaderMap {
	type Target = http::HeaderMap<http::HeaderValue>;

	fn deref(&self) -> &Self::Target {
		&self.0
	}
}

#[derive(Clone)]
pub struct HttpStatus {
	pub started_at: Instant,
	pub code:       u16,
	pub headers:    HeaderMap,
	pub metrics:    StatusMetrics,
	pub filter_err: Option<ExtStatusError>,
}

#[derive(Clone, Default)]
pub struct StatusMetrics {
	pub wait_duration: Duration,
	pub duration:      Duration,
}

#[derive(Clone, Default)]
pub struct LoadMetrics {
	pub wait_duration: Duration,
	pub duration:      Duration,
	pub read_size:     usize,
	pub write_size:    usize,
}

#[derive(Clone, Default)]
pub struct FollowMetrics {
	pub duration: Duration,
}

pub struct LoadData {
	pub metrics:    LoadMetrics,
	pub filter_err: Option<ExtStatusError>,
}

pub struct StatusResult(pub Option<Result<HttpStatus>>);

impl Deref for StatusResult {
	type Target = Option<Result<HttpStatus>>;

	fn deref(&self) -> &Self::Target {
		&self.0
	}
}

pub struct LoadResult(pub Option<Result<LoadData>>);

impl Deref for LoadResult {
	type Target = Option<Result<LoadData>>;

	fn deref(&self) -> &Self::Target {
		&self.0
	}
}

pub struct FollowResult(pub Option<Result<FollowData>>);

impl Deref for FollowResult {
	type Target = Option<Result<FollowData>>;

	fn deref(&self) -> &Self::Target {
		&self.0
	}
}

pub struct JobProcessing {
	pub resolve_data: ResolveData,
	pub head_status:  StatusResult,
	pub status:       StatusResult,
	pub load:         LoadResult,
	pub follow:       FollowResult,
	pub links:        Vec<Arc<Link>>,
}

impl JobProcessing {
	pub fn new(resolve_data: ResolveData) -> Self {
		Self {
			resolve_data,
			head_status: StatusResult(None),
			status: StatusResult(None),
			load: LoadResult(None),
			follow: FollowResult(None),
			links: vec![],
		}
	}
}

pub struct FollowData {
	pub metrics:    FollowMetrics,
	pub filter_err: Option<ExtStatusError>,
}

pub struct JobFinished {}

pub enum JobStatus {
	Processing(Result<Box<JobProcessing>>),
	Finished(std::result::Result<JobFinished, JobError>),
}

#[derive(Clone)]
pub struct Task {
	pub queued_at: Instant,
	pub link:      Arc<Link>,
	pub level:     usize,
}

pub struct JobUpdate<JS: JobStateValues, TS: TaskStateValues> {
	pub task:   Arc<Task>,
	pub status: JobStatus,
	pub ctx:    JobCtx<JS, TS>,
}

pub struct ParserTask {
	pub payload: Box<dyn FnOnce() -> Result<FollowData> + Send + 'static>,
	pub time:    Instant,
	pub res_tx:  Sender<ParserResponse>,
}

pub struct ParserResponse {
	pub payload:       Result<FollowData>,
	pub wait_duration: Duration,
	pub work_duration: Duration,
}

pub trait JobStateValues: Send + Sync + 'static {}

impl<T: Send + Sync + 'static> JobStateValues for T {}

pub trait TaskStateValues: Send + Sync + Clone + Default + 'static {}

impl<T: Send + Sync + Clone + Default + 'static> TaskStateValues for T {}

pub type JobSharedState = Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>;

#[derive(Derivative)]
#[derivative(Clone(bound = "TS: Clone"))]
pub struct JobCtx<JS, TS> {
	pub settings:   Arc<config::CrawlingSettings>,
	pub started_at: Instant,
	pub root_url:   Url,
	pub shared:     JobSharedState,
	pub job_state:  Arc<Mutex<JS>>,
	pub task_state: TS,
	pub user_arg:   Option<u64>,
	links:          Vec<Arc<Link>>,
}

impl<JS: JobStateValues, TS: TaskStateValues> JobCtx<JS, TS> {
	pub fn new(
		root_url: Url,
		settings: Arc<config::CrawlingSettings>,
		job_state: JS,
		task_state: TS,
		user_arg: Option<u64>,
	) -> Self {
		Self {
			started_at: Instant::now(),
			root_url,
			settings,
			shared: Arc::new(Mutex::new(HashMap::new())),
			job_state: Arc::new(Mutex::new(job_state)),
			task_state,
			links: vec![],
			user_arg,
		}
	}

	pub fn timeout_remaining(&self, t: Duration) -> PinnedFut<()> {
		let elapsed = self.started_at.elapsed();

		Box::pin(async move {
			if t <= elapsed {
				return
			}
			tokio::time::sleep(t - elapsed).await;
		})
	}

	pub fn push_link(&mut self, link: Link) {
		self.links.push(Arc::new(link))
	}

	pub fn push_links<I: IntoIterator<Item = Link>>(&mut self, links: I) {
		self.links.extend(links.into_iter().map(Arc::new))
	}

	pub fn push_shared_links<I: IntoIterator<Item = Arc<Link>>>(&mut self, links: I) {
		self.links.extend(links.into_iter())
	}

	pub(crate) fn consume_links(&mut self) -> Vec<Arc<Link>> {
		let mut links = vec![];
		mem::swap(&mut links, &mut self.links);
		links
	}
}

impl Link {
	pub fn new(
		href: &str,
		rel: &str,
		alt: &str,
		text: &str,
		redirect: usize,
		target: LinkTarget,
		parent: &Link,
	) -> Result<Self> {
		let mut url = Url::parse(href)
			.or_else(|_err| {
				parent.url.join(href).with_context(|| format!("cannot join relative href {} to {}", href, &parent.url))
			})
			.context("cannot parse url")?;
		url.set_fragment(None);

		Ok(Self {
			url,
			rel: String::from(rel),
			alt: alt.trim().to_string(),
			text: text.trim().to_string(),
			redirect,
			target,
			marker: 0,
		})
	}

	pub(crate) fn new_abs(href: Url, rel: &str, alt: &str, text: &str, redirect: usize, target: LinkTarget) -> Self {
		Self {
			url: href,
			rel: String::from(rel),
			alt: alt.trim().to_string(),
			text: text.trim().to_string(),
			redirect,
			target,
			marker: 0,
		}
	}
}

impl Task {
	pub(crate) fn new_root(url: &Url) -> Result<Task> {
		let link = Arc::new(Link::new_abs(url.clone(), "", "", "", 0, LinkTarget::Follow));

		Ok(Task { queued_at: Instant::now(), link, level: 0 })
	}

	pub(crate) fn new(link: Arc<Link>, parent: &Task) -> Result<Task> {
		let scheme = link.url.scheme();
		if scheme != "http" && scheme != "https" {
			return Err(anyhow!("invalid scheme {:#?} in {:#?}", scheme, link.url.as_str()).into())
		}

		Ok(Task {
			queued_at: Instant::now(),
			level: if link.redirect > 0 { parent.level } else { parent.level + 1 },
			link,
		})
	}

	pub fn is_root(&self) -> bool {
		self.level == 0
	}
}

impl fmt::Display for ResolveData {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		let addrs_str = self.addrs.iter().map(|a| a.ip().to_string()).collect::<Vec<String>>().join(", ");
		write!(f, "[{}] resolve {}ms", addrs_str, self.metrics.duration.as_millis())
	}
}

impl fmt::Display for StatusResult {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		match self {
			StatusResult(None) => {
				write!(f, "")
			}
			StatusResult(Some(Ok(ref r))) => {
				write!(
					f,
					"[{}] wait {}ms / status {}ms",
					r.code,
					r.metrics.wait_duration.as_millis(),
					r.metrics.duration.as_millis()
				)
			}
			StatusResult(Some(Err(ref err))) => {
				write!(f, "[err]: {:#}", err)
			}
		}
	}
}

impl fmt::Display for LoadResult {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		match self {
			LoadResult(Some(Ok(ref r))) => {
				let m = &r.metrics;
				write!(
					f,
					"loaded {}ms / write {} / read {}",
					m.duration.as_millis(),
					m.write_size.file_size(file_size_opts::CONVENTIONAL).unwrap(),
					m.read_size.file_size(file_size_opts::CONVENTIONAL).unwrap()
				)
			}
			LoadResult(Some(Err(ref err))) => {
				write!(f, "[err loading]: {:#}", err)
			}
			LoadResult(None) => {
				write!(f, "none")
			}
		}
	}
}

impl fmt::Display for FollowResult {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		match self {
			FollowResult(Some(Ok(ref r))) => {
				let m = &r.metrics;
				write!(f, "parsed {}ms", m.duration.as_millis())
			}
			FollowResult(Some(Err(ref err))) => {
				write!(f, "[err following]: {:#}", err)
			}
			FollowResult(None) => {
				write!(f, "none")
			}
		}
	}
}

impl<JS: JobStateValues, TS: TaskStateValues> fmt::Display for JobUpdate<JS, TS> {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		match self.status {
			JobStatus::Processing(Ok(ref r)) => {
				write!(
					f,
					"{} {} {} (load: {}) | (follow: {}) {}",
					r.resolve_data, r.head_status, r.status, r.load, r.follow, self.task
				)
			}
			JobStatus::Processing(Err(ref err)) => {
				write!(f, "[dns error : {:#}] {}", err, self.task)
			}
			JobStatus::Finished(Ok(ref r)) => {
				write!(f, "[finished] {} {}", self.task, r)
			}
			JobStatus::Finished(Err(ref err)) => {
				write!(f, "[finished : {:?}] {}", err, self.task)
			}
		}
	}
}

impl fmt::Display for Link {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		write!(f, "{} {:?}", &self.url, self.target)
	}
}

impl fmt::Display for JobFinished {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		write!(f, "")
	}
}

impl fmt::Display for Task {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
		if self.level < 1 {
			write!(f, "{} (root)", &self.link.url)
		} else {
			write!(f, "{} ({}) {}", &self.link.url, self.level, self.link.text.chars().take(32).collect::<String>())
		}
	}
}