crusty 0.11.0

Fast && scalable Broad Web Crawler developed on top of crusty-core
use clickhouse::Row;
use crusty_core::{types as ct, types::StatusResult};
use serde::{Deserialize, Serialize};

#[allow(unused_imports)]
use crate::prelude::*;
use crate::{clickhouse_utils as chu, config::CONFIG};

pub type Result<T> = anyhow::Result<T>;

#[derive(Debug, Clone)]
pub struct Domain {
	is_discovery:     bool,
	pub shard:        u16,
	pub shards_total: usize,

	pub url:    Option<Url>,
	pub domain: String,
	addr_key:   [u8; 4],
}

impl Domain {
	pub fn new(
		domain: String,
		addrs: Vec<[u8; 4]>,
		shards_total: usize,
		addr_key_mask: u8,
		url: Option<Url>,
		is_discovery: bool,
	) -> Domain {
		let mut addrs = addrs;
		addrs.sort_unstable();
		let mut addr = addrs.into_iter().next().unwrap_or([255, 255, 255, 255]);

		let mut left = addr_key_mask;
		for a in &mut addr {
			if left >= 8 {
				left -= 8;
			} else {
				let mut mask = 0;
				for k in 0..left {
					mask |= 1 << k;
				}
				*a &= mask;
			}
		}

		let mut domain = Domain { addr_key: addr, is_discovery, shard: 0, shards_total, url, domain };
		domain.calc_shard();
		domain
	}

	fn calc_shard(&mut self) {
		let mut hasher = crc32fast::Hasher::new();
		hasher.update(&self.addr_key);
		self.shard = (hasher.finalize() % self.shards_total as u32 + 1) as u16;
	}
}

#[derive(Clone, Debug, Serialize, Deserialize, Row)]
pub struct DomainDBEntry {
	pub shard:      u16,
	pub addr_key:   [u8; 4],
	pub domain:     String,
	pub created_at: u32,
	pub updated_at: u32,
}

impl From<Domain> for DomainDBEntry {
	fn from(s: Domain) -> Self {
		let now = now();

		Self {
			shard:      s.shard,
			addr_key:   s.addr_key,
			domain:     s.domain,
			created_at: now,
			updated_at: if s.is_discovery { 644616000 } else { now },
		}
	}
}

#[derive(Clone, Debug, Serialize, Deserialize, Row)]
pub struct DBRWNotificationDBEntry {
	host:          String,
	app_id:        String,
	created_at:    u32,
	table_name:    String,
	label:         String,
	took_ms:       u32,
	since_last_ms: u32,
	items:         u32,
}

impl From<chu::GenericNotification> for DBRWNotificationDBEntry {
	fn from(s: chu::GenericNotification) -> Self {
		let c = CONFIG.lock().unwrap();
		DBRWNotificationDBEntry {
			host:          c.host.clone(),
			app_id:        c.app_id.clone(),
			created_at:    now(),
			table_name:    s.table_name,
			label:         s.label,
			took_ms:       s.duration.as_millis() as u32,
			since_last_ms: s.since_last.as_millis() as u32,
			items:         s.items as u32,
		}
	}
}

#[derive(Debug, Clone)]
pub struct TaskMeasurementData {
	pub status_code:    u16,
	pub wait_time_ms:   u32,
	pub status_time_ms: u32,
	pub load_time_ms:   u32,
	pub parse_time_ms:  u32,
	pub write_size_b:   u32,
	pub read_size_b:    u32,
}

#[derive(Clone, Debug)]
pub struct TaskMeasurement {
	pub time: u32,
	pub url:  String,
	pub md:   Option<TaskMeasurementData>,
}

#[derive(Clone, Debug, Serialize, Deserialize, Row)]
pub struct TaskMeasurementDBEntry {
	host:           String,
	app_id:         String,
	created_at:     u32,
	url:            String,
	error:          u8,
	status_code:    u16,
	wait_time_ms:   u32,
	status_time_ms: u32,
	load_time_ms:   u32,
	parse_time_ms:  u32,
	write_size_b:   u32,
	read_size_b:    u32,
}

impl From<TaskMeasurement> for TaskMeasurementDBEntry {
	fn from(s: TaskMeasurement) -> Self {
		let c = CONFIG.lock().unwrap();
		if s.md.is_none() {
			return Self {
				host:           c.host.clone(),
				app_id:         c.app_id.clone(),
				created_at:     s.time,
				url:            s.url,
				error:          1,
				status_code:    0,
				wait_time_ms:   0,
				status_time_ms: 0,
				load_time_ms:   0,
				parse_time_ms:  0,
				write_size_b:   0,
				read_size_b:    0,
			}
		}
		let md = s.md.unwrap();
		Self {
			host:           c.host.clone(),
			app_id:         c.app_id.clone(),
			created_at:     s.time,
			url:            s.url,
			error:          0,
			status_code:    md.status_code,
			wait_time_ms:   md.wait_time_ms,
			status_time_ms: md.status_time_ms,
			load_time_ms:   md.load_time_ms,
			parse_time_ms:  md.parse_time_ms,
			write_size_b:   md.write_size_b,
			read_size_b:    md.read_size_b,
		}
	}
}

#[derive(Debug, Clone)]
pub struct QueueStats {
	pub len:  usize,
	pub time: u32,
}

#[derive(Debug, Clone)]
pub enum QueueKind {
	Job,
	JobUpdate,
	MetricsTask,
	MetricsQueue,
	MetricsDB,
	DomainUpdate,
	DomainInsert,

	DomainUpdateNotifyP,
	DomainUpdateNotify,
	DomainInsertNotify,
}

#[derive(Debug, Clone)]
pub struct QueueMeasurement {
	pub kind:  QueueKind,
	pub stats: QueueStats,
}

#[derive(Debug, Serialize, Deserialize, Row)]
pub struct QueueMeasurementDBEntry {
	host:           String,
	app_id:         String,
	pub name:       String,
	pub updated_at: u32,
	pub len:        u32,
}

impl From<QueueMeasurement> for QueueMeasurementDBEntry {
	fn from(s: QueueMeasurement) -> Self {
		let c = CONFIG.lock().unwrap();
		Self {
			host:       c.host.clone(),
			app_id:     c.app_id.clone(),
			name:       format!("{:?}", s.kind),
			updated_at: s.stats.time,
			len:        s.stats.len as u32,
		}
	}
}
impl<JS: ct::JobStateValues, TS: ct::TaskStateValues> From<ct::JobUpdate<JS, TS>> for TaskMeasurement {
	fn from(r: ct::JobUpdate<JS, TS>) -> Self {
		if let ct::JobStatus::Processing(Ok(ref load_data)) = r.status {
			let parse_time_ms = if let ct::FollowResult::Ok(ref follow) = load_data.follow {
				follow.metrics.duration.as_millis() as u32
			} else {
				0
			};

			let (load_time_ms, write_size_b, read_size_b) = if let ct::LoadResult::Ok(ref load) = load_data.load {
				(
					load.metrics.duration.as_millis() as u32,
					load.metrics.write_size as u32,
					load.metrics.read_size as u32,
				)
			} else {
				(0, 0, 0)
			};

			if let StatusResult::Ok(status) = &load_data.status {
				return TaskMeasurement {
					time: now(),
					url:  r.task.link.url.to_string(),
					md:   Some(TaskMeasurementData {
						status_code: status.code as u16,
						wait_time_ms: status.metrics.wait_duration.as_millis() as u32,
						status_time_ms: status.metrics.duration.as_millis() as u32,
						load_time_ms,
						write_size_b,
						read_size_b,
						parse_time_ms,
					}),
				}
			}
		}

		TaskMeasurement { time: now(), url: r.task.link.url.to_string(), md: None }
	}
}