crusty 0.8.0

Fast && scalable Broad Web Crawler developed on top of crusty-core
use clickhouse::Row;
use crusty_core::{types as ct, types::StatusResult};
use serde::{Deserialize, Serialize};

#[allow(unused_imports)]
use crate::prelude::*;
use crate::{clickhouse_utils as chu, config::CONFIG};

pub type Result<T> = anyhow::Result<T>;

#[derive(Debug, Clone)]
pub struct Domain {
	is_discovery:     bool,
	pub shard:        u16,
	pub shards_total: usize,

	pub url:    Option<Url>,
	pub domain: String,
	pub head:   String,
	pub tail:   String,
}

impl Domain {
	fn _new(domain: String, shards_total: usize, url: Option<Url>, is_discovery: bool) -> Domain {
		let split: Vec<String> = domain.split('.').map(String::from).collect();

		if split.len() < 2 {
			return Domain {
				is_discovery,
				shard: 0,
				shards_total,
				url,
				domain: domain.clone(),
				head: domain,
				tail: "".into(),
			}
		}
		if split.len() == 2 {
			return Domain { is_discovery, shard: 0, shards_total, url, domain, head: split.join("."), tail: "".into() }
		}

		Domain {
			is_discovery,
			shard: 0,
			shards_total,
			url,
			domain,
			head: split[split.len() - 2..].join("."),
			tail: split[..split.len() - 2].join("."),
		}
	}

	pub fn new(domain: String, shards_total: usize, url: Option<Url>, is_discovery: bool) -> Domain {
		let mut domain = Domain::_new(domain, shards_total, url, is_discovery);
		domain.calc_shard();
		domain
	}

	fn calc_shard(&mut self) {
		let mut hasher = crc32fast::Hasher::new();
		hasher.update(self.head.as_bytes());
		self.shard = (hasher.finalize() % self.shards_total as u32 + 1) as u16;
	}
}

#[derive(Clone, Debug, Serialize, Deserialize, Row)]
pub struct DomainDBEntry {
	pub shard:       u16,
	pub domain:      String,
	pub domain_tail: String,
	pub created_at:  u32,
	pub updated_at:  u32,
}

impl From<Domain> for DomainDBEntry {
	fn from(s: Domain) -> Self {
		let now = now();

		Self {
			shard:       s.shard,
			domain:      s.head,
			domain_tail: s.tail,
			created_at:  now,
			updated_at:  if s.is_discovery { 644616000 } else { now },
		}
	}
}

#[derive(Clone, Debug, Serialize, Deserialize, Row)]
pub struct DBRWNotificationDBEntry {
	host:          String,
	app_id:        String,
	created_at:    u32,
	table_name:    String,
	label:         String,
	took_ms:       u32,
	since_last_ms: u32,
	items:         u32,
}

impl From<chu::GenericNotification> for DBRWNotificationDBEntry {
	fn from(s: chu::GenericNotification) -> Self {
		let c = CONFIG.lock().unwrap();
		DBRWNotificationDBEntry {
			host:          c.host.clone(),
			app_id:        c.app_id.clone(),
			created_at:    now(),
			table_name:    s.table_name,
			label:         s.label,
			took_ms:       s.duration.as_millis() as u32,
			since_last_ms: s.since_last.as_millis() as u32,
			items:         s.items as u32,
		}
	}
}

#[derive(Debug, Clone)]
pub struct TaskMeasurementData {
	pub status_code:    u16,
	pub wait_time_ms:   u32,
	pub status_time_ms: u32,
	pub load_time_ms:   u32,
	pub parse_time_ms:  u32,
	pub write_size_b:   u32,
	pub read_size_b:    u32,
}

#[derive(Clone, Debug)]
pub struct TaskMeasurement {
	pub time: u32,
	pub url:  String,
	pub md:   Option<TaskMeasurementData>,
}

#[derive(Clone, Debug, Serialize, Deserialize, Row)]
pub struct TaskMeasurementDBEntry {
	host:           String,
	app_id:         String,
	created_at:     u32,
	url:            String,
	error:          u8,
	status_code:    u16,
	wait_time_ms:   u32,
	status_time_ms: u32,
	load_time_ms:   u32,
	parse_time_ms:  u32,
	write_size_b:   u32,
	read_size_b:    u32,
}

impl From<TaskMeasurement> for TaskMeasurementDBEntry {
	fn from(s: TaskMeasurement) -> Self {
		let c = CONFIG.lock().unwrap();
		if s.md.is_none() {
			return Self {
				host:           c.host.clone(),
				app_id:         c.app_id.clone(),
				created_at:     s.time,
				url:            s.url,
				error:          1,
				status_code:    0,
				wait_time_ms:   0,
				status_time_ms: 0,
				load_time_ms:   0,
				parse_time_ms:  0,
				write_size_b:   0,
				read_size_b:    0,
			}
		}
		let md = s.md.unwrap();
		Self {
			host:           c.host.clone(),
			app_id:         c.app_id.clone(),
			created_at:     s.time,
			url:            s.url,
			error:          0,
			status_code:    md.status_code,
			wait_time_ms:   md.wait_time_ms,
			status_time_ms: md.status_time_ms,
			load_time_ms:   md.load_time_ms,
			parse_time_ms:  md.parse_time_ms,
			write_size_b:   md.write_size_b,
			read_size_b:    md.read_size_b,
		}
	}
}

#[derive(Debug, Clone)]
pub struct QueueStats {
	pub len:  usize,
	pub time: u32,
}

#[derive(Debug, Clone)]
pub enum QueueKind {
	Job,
	JobUpdate,
	MetricsTask,
	MetricsQueue,
	MetricsDB,
	DomainUpdate,
	DomainInsert,

	DomainUpdateNotifyP,
	DomainUpdateNotify,
	DomainInsertNotify,
}

#[derive(Debug, Clone)]
pub struct QueueMeasurement {
	pub kind:  QueueKind,
	pub stats: QueueStats,
}

#[derive(Debug, Serialize, Deserialize, Row)]
pub struct QueueMeasurementDBEntry {
	host:           String,
	app_id:         String,
	pub name:       String,
	pub updated_at: u32,
	pub len:        u32,
}

impl From<QueueMeasurement> for QueueMeasurementDBEntry {
	fn from(s: QueueMeasurement) -> Self {
		let c = CONFIG.lock().unwrap();
		Self {
			host:       c.host.clone(),
			app_id:     c.app_id.clone(),
			name:       format!("{:?}", s.kind),
			updated_at: s.stats.time,
			len:        s.stats.len as u32,
		}
	}
}
impl<JS: ct::JobStateValues, TS: ct::TaskStateValues> From<ct::JobUpdate<JS, TS>> for TaskMeasurement {
	fn from(r: ct::JobUpdate<JS, TS>) -> Self {
		if let ct::JobStatus::Processing(Ok(ref load_data)) = r.status {
			let parse_time_ms = if let ct::FollowResult::Ok(ref follow) = load_data.follow {
				follow.metrics.duration.as_millis() as u32
			} else {
				0
			};

			let (load_time_ms, write_size_b, read_size_b) = if let ct::LoadResult::Ok(ref load) = load_data.load {
				(
					load.metrics.duration.as_millis() as u32,
					load.metrics.write_size as u32,
					load.metrics.read_size as u32,
				)
			} else {
				(0, 0, 0)
			};

			if let StatusResult::Ok(status) = &load_data.status {
				return TaskMeasurement {
					time: now(),
					url:  r.task.link.url.to_string(),
					md:   Some(TaskMeasurementData {
						status_code: status.code as u16,
						wait_time_ms: status.metrics.wait_duration.as_millis() as u32,
						status_time_ms: status.metrics.duration.as_millis() as u32,
						load_time_ms,
						write_size_b,
						read_size_b,
						parse_time_ms,
					}),
				}
			}
		}

		TaskMeasurement { time: now(), url: r.task.link.url.to_string(), md: None }
	}
}