1use humansize::{file_size_opts, FileSize};
2use thiserror::{self, Error};
3
4use crate::{_prelude::*, config, load_filters, status_filters, task_expanders, task_filters};
5
6pub trait ParsedDocument: 'static {}
7pub type TaskFilters<JS, TS> = Vec<Box<dyn task_filters::Filter<JS, TS> + Send + Sync>>;
8pub type StatusFilters<JS, TS> = Vec<Box<dyn status_filters::Filter<JS, TS> + Send + Sync>>;
9pub type LoadFilters<JS, TS> = Vec<Box<dyn load_filters::Filter<JS, TS> + Send + Sync>>;
10pub type TaskExpanders<JS, TS, P> = Vec<Box<dyn task_expanders::Expander<JS, TS, P> + Send + Sync>>;
11pub type DocumentParser<P> = Box<dyn Fn(Box<dyn io::Read + Sync + Send>) -> Result<P> + Send + Sync + 'static>;
12
13pub type BoxedFn<T> = Box<dyn Fn() -> Box<T> + Send + Sync>;
14pub type BoxedTaskFilter<JS, TS> = BoxedFn<dyn task_filters::Filter<JS, TS> + Send + Sync>;
15pub type BoxedStatusFilter<JS, TS> = BoxedFn<dyn status_filters::Filter<JS, TS> + Send + Sync>;
16pub type BoxedLoadFilter<JS, TS> = BoxedFn<dyn load_filters::Filter<JS, TS> + Send + Sync>;
17pub type BoxedTaskExpander<JS, TS, P> = BoxedFn<dyn task_expanders::Expander<JS, TS, P> + Send + Sync>;
18
19pub struct Job<JS: JobStateValues, TS: TaskStateValues, P: ParsedDocument> {
20 pub url: url::Url,
21 pub addrs: Vec<SocketAddr>,
22 pub settings: Arc<config::CrawlingSettings>,
23 pub rules: Box<dyn JobRules<JS, TS, P>>,
24 pub user_arg: Option<u64>,
25 pub job_state: JS,
26}
27
28impl<JS: JobStateValues, TS: TaskStateValues, P: ParsedDocument> Job<JS, TS, P> {
29 pub fn new<R: JobRules<JS, TS, P>>(
30 url: &str,
31 settings: config::CrawlingSettings,
32 rules: R,
33 job_state: JS,
34 ) -> anyhow::Result<Job<JS, TS, P>> {
35 Self::new_with_shared_settings(url, Arc::new(settings), rules, job_state)
36 }
37
38 pub fn new_with_shared_settings<R: JobRules<JS, TS, P>>(
39 url: &str,
40 settings: Arc<config::CrawlingSettings>,
41 rules: R,
42 job_state: JS,
43 ) -> anyhow::Result<Job<JS, TS, P>> {
44 let url = Url::parse(url).context("cannot parse url")?;
45
46 Ok(Self { url, addrs: vec![], settings, rules: Box::new(rules), job_state, user_arg: None })
47 }
48
49 pub fn with_addrs(mut self, addrs: Vec<SocketAddr>) -> Self {
50 self.addrs = addrs;
51 self
52 }
53
54 pub fn with_user_arg(mut self, user_arg: u64) -> Self {
55 self.user_arg = Some(user_arg);
56 self
57 }
58}
59
60#[derive(Derivative)]
61#[derivative(Clone(bound = ""))]
62pub struct ResolvedJob<JS: JobStateValues, TS: TaskStateValues, P: ParsedDocument> {
63 pub url: url::Url,
64 pub addrs: Vec<SocketAddr>,
65 pub settings: Arc<config::CrawlingSettings>,
66 pub rules: Arc<Box<dyn JobRules<JS, TS, P>>>,
67 pub ctx: JobCtx<JS, TS>,
68}
69
70impl<JS: JobStateValues, TS: TaskStateValues, P: ParsedDocument> From<Job<JS, TS, P>> for ResolvedJob<JS, TS, P> {
71 fn from(job: Job<JS, TS, P>) -> ResolvedJob<JS, TS, P> {
72 ResolvedJob {
73 url: job.url.clone(),
74 addrs: job.addrs,
75 settings: Arc::clone(&job.settings),
76 rules: Arc::new(job.rules),
77 ctx: JobCtx::new(job.url, job.settings, job.job_state, TS::default(), job.user_arg),
78 }
79 }
80}
81
82pub trait JobRules<JS: JobStateValues, TS: TaskStateValues, P: ParsedDocument>: Send + Sync + 'static {
83 fn task_filters(&self) -> TaskFilters<JS, TS>;
84 fn status_filters(&self) -> StatusFilters<JS, TS>;
85 fn load_filters(&self) -> LoadFilters<JS, TS>;
86 fn task_expanders(&self) -> TaskExpanders<JS, TS, P>;
87 fn document_parser(&self) -> Arc<DocumentParser<P>>;
88}
89
90pub type BoxedJobRules<JS, TS, P> = Box<dyn JobRules<JS, TS, P>>;
91
92#[derive(Error, Debug)]
93pub enum Error {
94 #[error(transparent)]
95 Other(#[from] anyhow::Error),
96 #[error("timeout while awaiting for status response")]
97 StatusTimeout,
98 #[error("timeout during loading")]
99 LoadTimeout,
100}
101
102#[derive(Debug, Clone, IntoStaticStr)]
103pub enum FilterKind {
104 HeadStatusFilter,
105 GetStatusFilter,
106 LoadFilter,
107 FollowFilter,
108}
109
110#[derive(Error, Debug, Clone)]
111pub enum ExtStatusError {
112 #[error("terminated by {kind:?} {name:?}")]
113 Term { kind: FilterKind, name: &'static str, reason: &'static str },
114 #[error("error in {kind:?} {name:?}")]
115 Err {
116 kind: FilterKind,
117 name: &'static str,
118 #[source]
119 source: Arc<anyhow::Error>,
120 },
121 #[error("panic in {kind:?} {name:?}")]
122 Panic {
123 kind: FilterKind,
124 name: &'static str,
125 #[source]
126 source: Arc<anyhow::Error>,
127 },
128}
129
130pub type Result<T> = anyhow::Result<T, Error>;
131
132#[derive(Error, Debug)]
133pub enum ExtError {
134 #[error(transparent)]
135 Other(#[from] anyhow::Error),
136 #[error("terminated because '{reason:?}'")]
137 Term { reason: &'static str },
138}
139pub type ExtResult<T> = std::result::Result<T, ExtError>;
140
141#[derive(Error, Debug)]
142pub enum TaskError {
143 #[error("timeout while waiting for loading({limit:?})")]
144 Timeout { limit: Duration },
145 #[error("http response is too big(>{limit:?})")]
146 HttpTooBigResponse { limit: u32 },
147 #[error("http response is malformed")]
148 HttpMalformedResponse,
149 #[error("http redirect limit reached({limit:?})")]
150 HttpRedirectLimitReached { limit: u16 },
151 #[error("http response error, code {code:?}")]
152 HttpError { code: u16 },
153}
154
155#[derive(Error, Debug)]
156pub enum JobError {
157 #[error("job finished by soft timeout")]
158 JobFinishedBySoftTimeout,
159 #[error("job finished by hard timeout")]
160 JobFinishedByHardTimeout,
161 #[error("job finished by error in a root task")]
162 RootTaskError(TaskError),
163}
164
165#[derive(Clone, PartialEq, Debug, Copy, IntoStaticStr)]
166pub enum LinkTarget {
167 JustResolveDNS = 0,
168 Head = 1,
169 Load = 2,
170 HeadLoad = 3,
171 Follow = 4,
172 HeadFollow = 5,
173}
174
175impl fmt::Display for LinkTarget {
176 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
177 write!(f, "{:?}", self)
178 }
179}
180
181pub type LinkIter<'a> = Box<dyn Iterator<Item = &'a Link> + Send + 'a>;
182
183#[derive(Clone)]
184pub struct Link {
185 pub url: Url,
186 pub rel: String,
187 pub alt: String,
188 pub text: String,
189 pub redirect: usize,
190 pub target: LinkTarget,
191 pub marker: u8,
192}
193
194impl Link {
195 pub fn host(&self) -> Option<String> {
196 Some(self.url.host()?.to_string().trim().to_lowercase())
197 }
198}
199
200#[derive(Clone, Default)]
201pub struct ResolveMetrics {
202 pub duration: Duration,
203}
204
205#[derive(Clone)]
206pub struct ResolveData {
207 pub addrs: Vec<SocketAddr>,
208 pub metrics: ResolveMetrics,
209}
210
211#[derive(Clone)]
212pub struct HeaderMap(pub http::HeaderMap<http::HeaderValue>);
213
214impl HeaderMap {
215 pub fn get_str(&self, name: http::header::HeaderName) -> anyhow::Result<&str> {
216 let name_str = String::from(name.as_str());
217 self.0
218 .get(name)
219 .ok_or_else(|| anyhow!("{}: not found", name_str))?
220 .to_str()
221 .with_context(|| format!("cannot read {} value", name_str))
222 }
223}
224
225impl Deref for HeaderMap {
226 type Target = http::HeaderMap<http::HeaderValue>;
227
228 fn deref(&self) -> &Self::Target {
229 &self.0
230 }
231}
232
233#[derive(Clone)]
234pub struct HttpStatus {
235 pub started_at: Instant,
236 pub code: u16,
237 pub headers: HeaderMap,
238 pub metrics: StatusMetrics,
239 pub filter_err: Option<ExtStatusError>,
240}
241
242#[derive(Clone, Default)]
243pub struct StatusMetrics {
244 pub wait_duration: Duration,
245 pub duration: Duration,
246}
247
248#[derive(Clone, Default)]
249pub struct LoadMetrics {
250 pub wait_duration: Duration,
251 pub duration: Duration,
252 pub read_size: usize,
253 pub write_size: usize,
254}
255
256#[derive(Clone, Default)]
257pub struct FollowMetrics {
258 pub duration: Duration,
259}
260
261pub struct LoadData {
262 pub metrics: LoadMetrics,
263 pub filter_err: Option<ExtStatusError>,
264}
265
266pub struct StatusResult(pub Option<Result<HttpStatus>>);
267
268impl Deref for StatusResult {
269 type Target = Option<Result<HttpStatus>>;
270
271 fn deref(&self) -> &Self::Target {
272 &self.0
273 }
274}
275
276pub struct LoadResult(pub Option<Result<LoadData>>);
277
278impl Deref for LoadResult {
279 type Target = Option<Result<LoadData>>;
280
281 fn deref(&self) -> &Self::Target {
282 &self.0
283 }
284}
285
286pub struct FollowResult(pub Option<Result<FollowData>>);
287
288impl Deref for FollowResult {
289 type Target = Option<Result<FollowData>>;
290
291 fn deref(&self) -> &Self::Target {
292 &self.0
293 }
294}
295
296pub struct JobProcessing {
297 pub resolve_data: ResolveData,
298 pub head_status: StatusResult,
299 pub status: StatusResult,
300 pub load: LoadResult,
301 pub follow: FollowResult,
302 pub links: Vec<Arc<Link>>,
303}
304
305impl JobProcessing {
306 pub fn new(resolve_data: ResolveData) -> Self {
307 Self {
308 resolve_data,
309 head_status: StatusResult(None),
310 status: StatusResult(None),
311 load: LoadResult(None),
312 follow: FollowResult(None),
313 links: vec![],
314 }
315 }
316}
317
318pub struct FollowData {
319 pub metrics: FollowMetrics,
320 pub filter_err: Option<ExtStatusError>,
321}
322
323pub struct JobFinished {}
324
325pub enum JobStatus {
326 Processing(Result<Box<JobProcessing>>),
327 Finished(std::result::Result<JobFinished, JobError>),
328}
329
330#[derive(Clone)]
331pub struct Task {
332 pub queued_at: Instant,
333 pub link: Arc<Link>,
334 pub level: usize,
335}
336
337pub struct JobUpdate<JS: JobStateValues, TS: TaskStateValues> {
338 pub task: Arc<Task>,
339 pub status: JobStatus,
340 pub ctx: JobCtx<JS, TS>,
341}
342
343pub struct ParserTask {
344 pub payload: Box<dyn FnOnce() -> Result<FollowData> + Send + 'static>,
345 pub time: Instant,
346 pub res_tx: Sender<ParserResponse>,
347}
348
349pub struct ParserResponse {
350 pub payload: Result<FollowData>,
351 pub wait_duration: Duration,
352 pub work_duration: Duration,
353}
354
355pub trait JobStateValues: Send + Sync + 'static {}
356
357impl<T: Send + Sync + 'static> JobStateValues for T {}
358
359pub trait TaskStateValues: Send + Sync + Clone + Default + 'static {}
360
361impl<T: Send + Sync + Clone + Default + 'static> TaskStateValues for T {}
362
363pub type JobSharedState = Arc<Mutex<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>;
364
365#[derive(Derivative)]
366#[derivative(Clone(bound = "TS: Clone"))]
367pub struct JobCtx<JS, TS> {
368 pub settings: Arc<config::CrawlingSettings>,
369 pub started_at: Instant,
370 pub root_url: Url,
371 pub shared: JobSharedState,
372 pub job_state: Arc<Mutex<JS>>,
373 pub task_state: TS,
374 pub user_arg: Option<u64>,
375 links: Vec<Arc<Link>>,
376}
377
378impl<JS: JobStateValues, TS: TaskStateValues> JobCtx<JS, TS> {
379 pub fn new(
380 root_url: Url,
381 settings: Arc<config::CrawlingSettings>,
382 job_state: JS,
383 task_state: TS,
384 user_arg: Option<u64>,
385 ) -> Self {
386 Self {
387 started_at: Instant::now(),
388 root_url,
389 settings,
390 shared: Arc::new(Mutex::new(HashMap::new())),
391 job_state: Arc::new(Mutex::new(job_state)),
392 task_state,
393 links: vec![],
394 user_arg,
395 }
396 }
397
398 pub fn timeout_remaining(&self, t: Duration) -> PinnedFut<()> {
399 let elapsed = self.started_at.elapsed();
400
401 Box::pin(async move {
402 if t <= elapsed {
403 return
404 }
405 tokio::time::sleep(t - elapsed).await;
406 })
407 }
408
409 pub fn push_link(&mut self, link: Link) {
410 self.links.push(Arc::new(link))
411 }
412
413 pub fn push_links<I: IntoIterator<Item = Link>>(&mut self, links: I) {
414 self.links.extend(links.into_iter().map(Arc::new))
415 }
416
417 pub fn push_shared_links<I: IntoIterator<Item = Arc<Link>>>(&mut self, links: I) {
418 self.links.extend(links.into_iter())
419 }
420
421 pub(crate) fn consume_links(&mut self) -> Vec<Arc<Link>> {
422 let mut links = vec![];
423 mem::swap(&mut links, &mut self.links);
424 links
425 }
426}
427
428impl Link {
429 pub fn new(
430 href: &str,
431 rel: &str,
432 alt: &str,
433 text: &str,
434 redirect: usize,
435 target: LinkTarget,
436 parent: &Link,
437 ) -> Result<Self> {
438 let mut url = Url::parse(href)
439 .or_else(|_err| {
440 parent.url.join(href).with_context(|| format!("cannot join relative href {} to {}", href, &parent.url))
441 })
442 .context("cannot parse url")?;
443 url.set_fragment(None);
444
445 Ok(Self {
446 url,
447 rel: String::from(rel),
448 alt: alt.trim().to_string(),
449 text: text.trim().to_string(),
450 redirect,
451 target,
452 marker: 0,
453 })
454 }
455
456 pub(crate) fn new_abs(href: Url, rel: &str, alt: &str, text: &str, redirect: usize, target: LinkTarget) -> Self {
457 Self {
458 url: href,
459 rel: String::from(rel),
460 alt: alt.trim().to_string(),
461 text: text.trim().to_string(),
462 redirect,
463 target,
464 marker: 0,
465 }
466 }
467}
468
469impl Task {
470 pub(crate) fn new_root(url: &Url) -> Result<Task> {
471 let link = Arc::new(Link::new_abs(url.clone(), "", "", "", 0, LinkTarget::Follow));
472
473 Ok(Task { queued_at: Instant::now(), link, level: 0 })
474 }
475
476 pub(crate) fn new(link: Arc<Link>, parent: &Task) -> Result<Task> {
477 let scheme = link.url.scheme();
478 if scheme != "http" && scheme != "https" {
479 return Err(anyhow!("invalid scheme {:#?} in {:#?}", scheme, link.url.as_str()).into())
480 }
481
482 Ok(Task {
483 queued_at: Instant::now(),
484 level: if link.redirect > 0 { parent.level } else { parent.level + 1 },
485 link,
486 })
487 }
488
489 pub fn is_root(&self) -> bool {
490 self.level == 0
491 }
492}
493
494impl fmt::Display for ResolveData {
495 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
496 let addrs_str = self.addrs.iter().map(|a| a.ip().to_string()).collect::<Vec<String>>().join(", ");
497 write!(f, "[{}] resolve {}ms", addrs_str, self.metrics.duration.as_millis())
498 }
499}
500
501impl fmt::Display for StatusResult {
502 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
503 match self {
504 StatusResult(None) => {
505 write!(f, "")
506 }
507 StatusResult(Some(Ok(ref r))) => {
508 write!(
509 f,
510 "[{}] wait {}ms / status {}ms",
511 r.code,
512 r.metrics.wait_duration.as_millis(),
513 r.metrics.duration.as_millis()
514 )
515 }
516 StatusResult(Some(Err(ref err))) => {
517 write!(f, "[err]: {:#}", err)
518 }
519 }
520 }
521}
522
523impl fmt::Display for LoadResult {
524 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
525 match self {
526 LoadResult(Some(Ok(ref r))) => {
527 let m = &r.metrics;
528 write!(
529 f,
530 "loaded {}ms / write {} / read {}",
531 m.duration.as_millis(),
532 m.write_size.file_size(file_size_opts::CONVENTIONAL).unwrap(),
533 m.read_size.file_size(file_size_opts::CONVENTIONAL).unwrap()
534 )
535 }
536 LoadResult(Some(Err(ref err))) => {
537 write!(f, "[err loading]: {:#}", err)
538 }
539 LoadResult(None) => {
540 write!(f, "none")
541 }
542 }
543 }
544}
545
546impl fmt::Display for FollowResult {
547 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
548 match self {
549 FollowResult(Some(Ok(ref r))) => {
550 let m = &r.metrics;
551 write!(f, "parsed {}ms", m.duration.as_millis())
552 }
553 FollowResult(Some(Err(ref err))) => {
554 write!(f, "[err following]: {:#}", err)
555 }
556 FollowResult(None) => {
557 write!(f, "none")
558 }
559 }
560 }
561}
562
563impl<JS: JobStateValues, TS: TaskStateValues> fmt::Display for JobUpdate<JS, TS> {
564 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
565 match self.status {
566 JobStatus::Processing(Ok(ref r)) => {
567 write!(
568 f,
569 "{} {} {} (load: {}) | (follow: {}) {}",
570 r.resolve_data, r.head_status, r.status, r.load, r.follow, self.task
571 )
572 }
573 JobStatus::Processing(Err(ref err)) => {
574 write!(f, "[dns error : {:#}] {}", err, self.task)
575 }
576 JobStatus::Finished(Ok(ref r)) => {
577 write!(f, "[finished] {} {}", self.task, r)
578 }
579 JobStatus::Finished(Err(ref err)) => {
580 write!(f, "[finished : {:?}] {}", err, self.task)
581 }
582 }
583 }
584}
585
586impl fmt::Display for Link {
587 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
588 write!(f, "{} {:?}", &self.url, self.target)
589 }
590}
591
592impl fmt::Display for JobFinished {
593 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
594 write!(f, "")
595 }
596}
597
598impl fmt::Display for Task {
599 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
600 if self.level < 1 {
601 write!(f, "{} (root)", &self.link.url)
602 } else {
603 write!(f, "{} ({}) {}", &self.link.url, self.level, self.link.text.chars().take(32).collect::<String>())
604 }
605 }
606}