crusty_core/
types.rs

1use humansize::{file_size_opts, FileSize};
2use thiserror::{self, Error};
3
4use crate::{_prelude::*, config, load_filters, status_filters, task_expanders, task_filters};
5
6pub trait ParsedDocument: 'static {}
7pub type TaskFilters<JS, TS> = Vec<Box<dyn task_filters::Filter<JS, TS> + Send + Sync>>;
8pub type StatusFilters<JS, TS> = Vec<Box<dyn status_filters::Filter<JS, TS> + Send + Sync>>;
9pub type LoadFilters<JS, TS> = Vec<Box<dyn load_filters::Filter<JS, TS> + Send + Sync>>;
10pub type TaskExpanders<JS, TS, P> = Vec<Box<dyn task_expanders::Expander<JS, TS, P> + Send + Sync>>;
11pub type DocumentParser<P> = Box<dyn Fn(Box<dyn io::Read + Sync + Send>) -> Result<P> + Send + Sync + 'static>;
12
13pub type BoxedFn<T> = Box<dyn Fn() -> Box<T> + Send + Sync>;
14pub type BoxedTaskFilter<JS, TS> = BoxedFn<dyn task_filters::Filter<JS, TS> + Send + Sync>;
15pub type BoxedStatusFilter<JS, TS> = BoxedFn<dyn status_filters::Filter<JS, TS> + Send + Sync>;
16pub type BoxedLoadFilter<JS, TS> = BoxedFn<dyn load_filters::Filter<JS, TS> + Send + Sync>;
17pub type BoxedTaskExpander<JS, TS, P> = BoxedFn<dyn task_expanders::Expander<JS, TS, P> + Send + Sync>;
18
19pub struct Job<JS: JobStateValues, TS: TaskStateValues, P: ParsedDocument> {
20	pub url:       url::Url,
21	pub addrs:     Vec<SocketAddr>,
22	pub settings:  Arc<config::CrawlingSettings>,
23	pub rules:     Box<dyn JobRules<JS, TS, P>>,
24	pub user_arg:  Option<u64>,
25	pub job_state: JS,
26}
27
28impl<JS: JobStateValues, TS: TaskStateValues, P: ParsedDocument> Job<JS, TS, P> {
29	pub fn new<R: JobRules<JS, TS, P>>(
30		url: &str,
31		settings: config::CrawlingSettings,
32		rules: R,
33		job_state: JS,
34	) -> anyhow::Result<Job<JS, TS, P>> {
35		Self::new_with_shared_settings(url, Arc::new(settings), rules, job_state)
36	}
37
38	pub fn new_with_shared_settings<R: JobRules<JS, TS, P>>(
39		url: &str,
40		settings: Arc<config::CrawlingSettings>,
41		rules: R,
42		job_state: JS,
43	) -> anyhow::Result<Job<JS, TS, P>> {
44		let url = Url::parse(url).context("cannot parse url")?;
45
46		Ok(Self { url, addrs: vec![], settings, rules: Box::new(rules), job_state, user_arg: None })
47	}
48
49	pub fn with_addrs(mut self, addrs: Vec<SocketAddr>) -> Self {
50		self.addrs = addrs;
51		self
52	}
53
54	pub fn with_user_arg(mut self, user_arg: u64) -> Self {
55		self.user_arg = Some(user_arg);
56		self
57	}
58}
59
60#[derive(Derivative)]
61#[derivative(Clone(bound = ""))]
62pub struct ResolvedJob<JS: JobStateValues, TS: TaskStateValues, P: ParsedDocument> {
63	pub url:      url::Url,
64	pub addrs:    Vec<SocketAddr>,
65	pub settings: Arc<config::CrawlingSettings>,
66	pub rules:    Arc<Box<dyn JobRules<JS, TS, P>>>,
67	pub ctx:      JobCtx<JS, TS>,
68}
69
70impl<JS: JobStateValues, TS: TaskStateValues, P: ParsedDocument> From<Job<JS, TS, P>> for ResolvedJob<JS, TS, P> {
71	fn from(job: Job<JS, TS, P>) -> ResolvedJob<JS, TS, P> {
72		ResolvedJob {
73			url:      job.url.clone(),
74			addrs:    job.addrs,
75			settings: Arc::clone(&job.settings),
76			rules:    Arc::new(job.rules),
77			ctx:      JobCtx::new(job.url, job.settings, job.job_state, TS::default(), job.user_arg),
78		}
79	}
80}
81
82pub trait JobRules<JS: JobStateValues, TS: TaskStateValues, P: ParsedDocument>: Send + Sync + 'static {
83	fn task_filters(&self) -> TaskFilters<JS, TS>;
84	fn status_filters(&self) -> StatusFilters<JS, TS>;
85	fn load_filters(&self) -> LoadFilters<JS, TS>;
86	fn task_expanders(&self) -> TaskExpanders<JS, TS, P>;
87	fn document_parser(&self) -> Arc<DocumentParser<P>>;
88}
89
90pub type BoxedJobRules<JS, TS, P> = Box<dyn JobRules<JS, TS, P>>;
91
92#[derive(Error, Debug)]
93pub enum Error {
94	#[error(transparent)]
95	Other(#[from] anyhow::Error),
96	#[error("timeout while awaiting for status response")]
97	StatusTimeout,
98	#[error("timeout during loading")]
99	LoadTimeout,
100}
101
102#[derive(Debug, Clone, IntoStaticStr)]
103pub enum FilterKind {
104	HeadStatusFilter,
105	GetStatusFilter,
106	LoadFilter,
107	FollowFilter,
108}
109
110#[derive(Error, Debug, Clone)]
111pub enum ExtStatusError {
112	#[error("terminated by {kind:?} {name:?}")]
113	Term { kind: FilterKind, name: &'static str, reason: &'static str },
114	#[error("error in {kind:?} {name:?}")]
115	Err {
116		kind:   FilterKind,
117		name:   &'static str,
118		#[source]
119		source: Arc<anyhow::Error>,
120	},
121	#[error("panic in {kind:?} {name:?}")]
122	Panic {
123		kind:   FilterKind,
124		name:   &'static str,
125		#[source]
126		source: Arc<anyhow::Error>,
127	},
128}
129
130pub type Result<T> = anyhow::Result<T, Error>;
131
132#[derive(Error, Debug)]
133pub enum ExtError {
134	#[error(transparent)]
135	Other(#[from] anyhow::Error),
136	#[error("terminated because '{reason:?}'")]
137	Term { reason: &'static str },
138}
139pub type ExtResult<T> = std::result::Result<T, ExtError>;
140
141#[derive(Error, Debug)]
142pub enum TaskError {
143	#[error("timeout while waiting for loading({limit:?})")]
144	Timeout { limit: Duration },
145	#[error("http response is too big(>{limit:?})")]
146	HttpTooBigResponse { limit: u32 },
147	#[error("http response is malformed")]
148	HttpMalformedResponse,
149	#[error("http redirect limit reached({limit:?})")]
150	HttpRedirectLimitReached { limit: u16 },
151	#[error("http response error, code {code:?}")]
152	HttpError { code: u16 },
153}
154
155#[derive(Error, Debug)]
156pub enum JobError {
157	#[error("job finished by soft timeout")]
158	JobFinishedBySoftTimeout,
159	#[error("job finished by hard timeout")]
160	JobFinishedByHardTimeout,
161	#[error("job finished by error in a root task")]
162	RootTaskError(TaskError),
163}
164
165#[derive(Clone, PartialEq, Debug, Copy, IntoStaticStr)]
166pub enum LinkTarget {
167	JustResolveDNS = 0,
168	Head           = 1,
169	Load           = 2,
170	HeadLoad       = 3,
171	Follow         = 4,
172	HeadFollow     = 5,
173}
174
175impl fmt::Display for LinkTarget {
176	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
177		write!(f, "{:?}", self)
178	}
179}
180
181pub type LinkIter<'a> = Box<dyn Iterator<Item = &'a Link> + Send + 'a>;
182
183#[derive(Clone)]
184pub struct Link {
185	pub url:      Url,
186	pub rel:      String,
187	pub alt:      String,
188	pub text:     String,
189	pub redirect: usize,
190	pub target:   LinkTarget,
191	pub marker:   u8,
192}
193
194impl Link {
195	pub fn host(&self) -> Option<String> {
196		Some(self.url.host()?.to_string().trim().to_lowercase())
197	}
198}
199
200#[derive(Clone, Default)]
201pub struct ResolveMetrics {
202	pub duration: Duration,
203}
204
205#[derive(Clone)]
206pub struct ResolveData {
207	pub addrs:   Vec<SocketAddr>,
208	pub metrics: ResolveMetrics,
209}
210
211#[derive(Clone)]
212pub struct HeaderMap(pub http::HeaderMap<http::HeaderValue>);
213
214impl HeaderMap {
215	pub fn get_str(&self, name: http::header::HeaderName) -> anyhow::Result<&str> {
216		let name_str = String::from(name.as_str());
217		self.0
218			.get(name)
219			.ok_or_else(|| anyhow!("{}: not found", name_str))?
220			.to_str()
221			.with_context(|| format!("cannot read {} value", name_str))
222	}
223}
224
225impl Deref for HeaderMap {
226	type Target = http::HeaderMap<http::HeaderValue>;
227
228	fn deref(&self) -> &Self::Target {
229		&self.0
230	}
231}
232
233#[derive(Clone)]
234pub struct HttpStatus {
235	pub started_at: Instant,
236	pub code:       u16,
237	pub headers:    HeaderMap,
238	pub metrics:    StatusMetrics,
239	pub filter_err: Option<ExtStatusError>,
240}
241
242#[derive(Clone, Default)]
243pub struct StatusMetrics {
244	pub wait_duration: Duration,
245	pub duration:      Duration,
246}
247
248#[derive(Clone, Default)]
249pub struct LoadMetrics {
250	pub wait_duration: Duration,
251	pub duration:      Duration,
252	pub read_size:     usize,
253	pub write_size:    usize,
254}
255
256#[derive(Clone, Default)]
257pub struct FollowMetrics {
258	pub duration: Duration,
259}
260
261pub struct LoadData {
262	pub metrics:    LoadMetrics,
263	pub filter_err: Option<ExtStatusError>,
264}
265
266pub struct StatusResult(pub Option<Result<HttpStatus>>);
267
268impl Deref for StatusResult {
269	type Target = Option<Result<HttpStatus>>;
270
271	fn deref(&self) -> &Self::Target {
272		&self.0
273	}
274}
275
276pub struct LoadResult(pub Option<Result<LoadData>>);
277
278impl Deref for LoadResult {
279	type Target = Option<Result<LoadData>>;
280
281	fn deref(&self) -> &Self::Target {
282		&self.0
283	}
284}
285
286pub struct FollowResult(pub Option<Result<FollowData>>);
287
288impl Deref for FollowResult {
289	type Target = Option<Result<FollowData>>;
290
291	fn deref(&self) -> &Self::Target {
292		&self.0
293	}
294}
295
296pub struct JobProcessing {
297	pub resolve_data: ResolveData,
298	pub head_status:  StatusResult,
299	pub status:       StatusResult,
300	pub load:         LoadResult,
301	pub follow:       FollowResult,
302	pub links:        Vec<Arc<Link>>,
303}
304
305impl JobProcessing {
306	pub fn new(resolve_data: ResolveData) -> Self {
307		Self {
308			resolve_data,
309			head_status: StatusResult(None),
310			status: StatusResult(None),
311			load: LoadResult(None),
312			follow: FollowResult(None),
313			links: vec![],
314		}
315	}
316}
317
318pub struct FollowData {
319	pub metrics:    FollowMetrics,
320	pub filter_err: Option<ExtStatusError>,
321}
322
323pub struct JobFinished {}
324
325pub enum JobStatus {
326	Processing(Result<Box<JobProcessing>>),
327	Finished(std::result::Result<JobFinished, JobError>),
328}
329
330#[derive(Clone)]
331pub struct Task {
332	pub queued_at: Instant,
333	pub link:      Arc<Link>,
334	pub level:     usize,
335}
336
337pub struct JobUpdate<JS: JobStateValues, TS: TaskStateValues> {
338	pub task:   Arc<Task>,
339	pub status: JobStatus,
340	pub ctx:    JobCtx<JS, TS>,
341}
342
343pub struct ParserTask {
344	pub payload: Box<dyn FnOnce() -> Result<FollowData> + Send + 'static>,
345	pub time:    Instant,
346	pub res_tx:  Sender<ParserResponse>,
347}
348
349pub struct ParserResponse {
350	pub payload:       Result<FollowData>,
351	pub wait_duration: Duration,
352	pub work_duration: Duration,
353}
354
355pub trait JobStateValues: Send + Sync + 'static {}
356
357impl<T: Send + Sync + 'static> JobStateValues for T {}
358
359pub trait TaskStateValues: Send + Sync + Clone + Default + 'static {}
360
361impl<T: Send + Sync + Clone + Default + 'static> TaskStateValues for T {}
362
363pub type JobSharedState = Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>;
364
365#[derive(Derivative)]
366#[derivative(Clone(bound = "TS: Clone"))]
367pub struct JobCtx<JS, TS> {
368	pub settings:   Arc<config::CrawlingSettings>,
369	pub started_at: Instant,
370	pub root_url:   Url,
371	pub shared:     JobSharedState,
372	pub job_state:  Arc<Mutex<JS>>,
373	pub task_state: TS,
374	pub user_arg:   Option<u64>,
375	links:          Vec<Arc<Link>>,
376}
377
378impl<JS: JobStateValues, TS: TaskStateValues> JobCtx<JS, TS> {
379	pub fn new(
380		root_url: Url,
381		settings: Arc<config::CrawlingSettings>,
382		job_state: JS,
383		task_state: TS,
384		user_arg: Option<u64>,
385	) -> Self {
386		Self {
387			started_at: Instant::now(),
388			root_url,
389			settings,
390			shared: Arc::new(Mutex::new(HashMap::new())),
391			job_state: Arc::new(Mutex::new(job_state)),
392			task_state,
393			links: vec![],
394			user_arg,
395		}
396	}
397
398	pub fn timeout_remaining(&self, t: Duration) -> PinnedFut<()> {
399		let elapsed = self.started_at.elapsed();
400
401		Box::pin(async move {
402			if t <= elapsed {
403				return
404			}
405			tokio::time::sleep(t - elapsed).await;
406		})
407	}
408
409	pub fn push_link(&mut self, link: Link) {
410		self.links.push(Arc::new(link))
411	}
412
413	pub fn push_links<I: IntoIterator<Item = Link>>(&mut self, links: I) {
414		self.links.extend(links.into_iter().map(Arc::new))
415	}
416
417	pub fn push_shared_links<I: IntoIterator<Item = Arc<Link>>>(&mut self, links: I) {
418		self.links.extend(links.into_iter())
419	}
420
421	pub(crate) fn consume_links(&mut self) -> Vec<Arc<Link>> {
422		let mut links = vec![];
423		mem::swap(&mut links, &mut self.links);
424		links
425	}
426}
427
428impl Link {
429	pub fn new(
430		href: &str,
431		rel: &str,
432		alt: &str,
433		text: &str,
434		redirect: usize,
435		target: LinkTarget,
436		parent: &Link,
437	) -> Result<Self> {
438		let mut url = Url::parse(href)
439			.or_else(|_err| {
440				parent.url.join(href).with_context(|| format!("cannot join relative href {} to {}", href, &parent.url))
441			})
442			.context("cannot parse url")?;
443		url.set_fragment(None);
444
445		Ok(Self {
446			url,
447			rel: String::from(rel),
448			alt: alt.trim().to_string(),
449			text: text.trim().to_string(),
450			redirect,
451			target,
452			marker: 0,
453		})
454	}
455
456	pub(crate) fn new_abs(href: Url, rel: &str, alt: &str, text: &str, redirect: usize, target: LinkTarget) -> Self {
457		Self {
458			url: href,
459			rel: String::from(rel),
460			alt: alt.trim().to_string(),
461			text: text.trim().to_string(),
462			redirect,
463			target,
464			marker: 0,
465		}
466	}
467}
468
469impl Task {
470	pub(crate) fn new_root(url: &Url) -> Result<Task> {
471		let link = Arc::new(Link::new_abs(url.clone(), "", "", "", 0, LinkTarget::Follow));
472
473		Ok(Task { queued_at: Instant::now(), link, level: 0 })
474	}
475
476	pub(crate) fn new(link: Arc<Link>, parent: &Task) -> Result<Task> {
477		let scheme = link.url.scheme();
478		if scheme != "http" && scheme != "https" {
479			return Err(anyhow!("invalid scheme {:#?} in {:#?}", scheme, link.url.as_str()).into())
480		}
481
482		Ok(Task {
483			queued_at: Instant::now(),
484			level: if link.redirect > 0 { parent.level } else { parent.level + 1 },
485			link,
486		})
487	}
488
489	pub fn is_root(&self) -> bool {
490		self.level == 0
491	}
492}
493
494impl fmt::Display for ResolveData {
495	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
496		let addrs_str = self.addrs.iter().map(|a| a.ip().to_string()).collect::<Vec<String>>().join(", ");
497		write!(f, "[{}] resolve {}ms", addrs_str, self.metrics.duration.as_millis())
498	}
499}
500
501impl fmt::Display for StatusResult {
502	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
503		match self {
504			StatusResult(None) => {
505				write!(f, "")
506			}
507			StatusResult(Some(Ok(ref r))) => {
508				write!(
509					f,
510					"[{}] wait {}ms / status {}ms",
511					r.code,
512					r.metrics.wait_duration.as_millis(),
513					r.metrics.duration.as_millis()
514				)
515			}
516			StatusResult(Some(Err(ref err))) => {
517				write!(f, "[err]: {:#}", err)
518			}
519		}
520	}
521}
522
523impl fmt::Display for LoadResult {
524	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
525		match self {
526			LoadResult(Some(Ok(ref r))) => {
527				let m = &r.metrics;
528				write!(
529					f,
530					"loaded {}ms / write {} / read {}",
531					m.duration.as_millis(),
532					m.write_size.file_size(file_size_opts::CONVENTIONAL).unwrap(),
533					m.read_size.file_size(file_size_opts::CONVENTIONAL).unwrap()
534				)
535			}
536			LoadResult(Some(Err(ref err))) => {
537				write!(f, "[err loading]: {:#}", err)
538			}
539			LoadResult(None) => {
540				write!(f, "none")
541			}
542		}
543	}
544}
545
546impl fmt::Display for FollowResult {
547	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
548		match self {
549			FollowResult(Some(Ok(ref r))) => {
550				let m = &r.metrics;
551				write!(f, "parsed {}ms", m.duration.as_millis())
552			}
553			FollowResult(Some(Err(ref err))) => {
554				write!(f, "[err following]: {:#}", err)
555			}
556			FollowResult(None) => {
557				write!(f, "none")
558			}
559		}
560	}
561}
562
563impl<JS: JobStateValues, TS: TaskStateValues> fmt::Display for JobUpdate<JS, TS> {
564	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
565		match self.status {
566			JobStatus::Processing(Ok(ref r)) => {
567				write!(
568					f,
569					"{} {} {} (load: {}) | (follow: {}) {}",
570					r.resolve_data, r.head_status, r.status, r.load, r.follow, self.task
571				)
572			}
573			JobStatus::Processing(Err(ref err)) => {
574				write!(f, "[dns error : {:#}] {}", err, self.task)
575			}
576			JobStatus::Finished(Ok(ref r)) => {
577				write!(f, "[finished] {} {}", self.task, r)
578			}
579			JobStatus::Finished(Err(ref err)) => {
580				write!(f, "[finished : {:?}] {}", err, self.task)
581			}
582		}
583	}
584}
585
586impl fmt::Display for Link {
587	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
588		write!(f, "{} {:?}", &self.url, self.target)
589	}
590}
591
592impl fmt::Display for JobFinished {
593	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
594		write!(f, "")
595	}
596}
597
598impl fmt::Display for Task {
599	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
600		if self.level < 1 {
601			write!(f, "{} (root)", &self.link.url)
602		} else {
603			write!(f, "{} ({}) {}", &self.link.url, self.level, self.link.text.chars().take(32).collect::<String>())
604		}
605	}
606}