#[allow(unused_imports)]
use crate::prelude::*;
use crate::{
config::CONFIG,
clickhouse_utils as chu,
};
use crusty_core::{types as ct};
use clickhouse::Reflection;
use serde::{Deserialize, Serialize};
use crusty_core::types::StatusResult;
pub type Result<T> = anyhow::Result<T>;
pub type Job = ct::Job<JobState, TaskState>;
#[derive(Debug, Clone)]
pub struct Domain {
is_discovery: bool,
pub shard: u16,
pub shards_total: usize,
pub url: Option<Url>,
pub domain: String,
pub head: String,
pub tail: String
}
impl Domain {
fn _new(domain: String, shards_total: usize, url: Option<Url>, is_discovery: bool) -> Domain {
let split : Vec<String> = domain.split('.').map(String::from).collect();
if split.len() < 2 {
return Domain{
is_discovery,
shard: 0,
shards_total,
url,
domain: domain.clone(),
head: domain,
tail: "".into(),
}
}
if split.len() == 2 {
return Domain{
is_discovery,
shard: 0,
shards_total,
url,
domain,
head: split.join("."),
tail: "".into(),
}
}
Domain{
is_discovery,
shard: 0,
shards_total,
url,
domain,
head: split[split.len() - 2..].join("."),
tail: split[..split.len() - 2].join(".")
}
}
pub fn new(domain: String, shards_total: usize, url: Option<Url>, is_discovery: bool) -> Domain {
let mut domain = Domain::_new(domain, shards_total, url, is_discovery);
domain.calc_shard();
domain
}
fn calc_shard(&mut self) {
let mut hasher = crc32fast::Hasher::new();
hasher.update(self.head.as_bytes());
self.shard = (hasher.finalize() % self.shards_total as u32 + 1) as u16;
}
}
#[derive(Clone, Debug, Reflection, Serialize, Deserialize)]
pub struct DomainDBEntry {
pub shard: u16,
pub domain: String,
pub domain_tail: String,
pub created_at: u32,
pub updated_at: u32,
}
impl From<Domain> for DomainDBEntry {
fn from(s: Domain) -> Self {
let now = now();
Self {
shard: s.shard,
domain: s.head,
domain_tail: s.tail,
created_at: now,
updated_at: if s.is_discovery {644616000} else { now },
}
}
}
#[derive(Debug, Clone)]
pub struct JobState {
pub selected_domain: Domain,
}
#[derive(Debug, Default, Clone)]
pub struct TaskState {
}
pub struct CrawlingRules {}
impl ct::JobRules<JobState, TaskState> for CrawlingRules {
fn task_filters(
&self,
) -> ct::TaskFilters<JobState, TaskState> {
vec![
Box::new(crusty_core::task_filters::MaxRedirect::new(5)),
Box::new(crusty_core::task_filters::SameDomain::new(true)),
Box::new(crusty_core::task_filters::TotalPageBudget::new(50)),
Box::new(crusty_core::task_filters::LinkPerPageBudget::new(10)),
Box::new(crusty_core::task_filters::PageLevel::new(10)),
Box::new(crusty_core::task_filters::HashSetDedup::new()),
]
}
fn status_filters(&self) -> ct::StatusFilters<JobState, TaskState> {
vec![
Box::new(crusty_core::status_filters::ContentType::new(vec![
String::from("text/html"),
])),
Box::new(crusty_core::status_filters::Redirect::new())
]
}
fn load_filters(&self) -> ct::LoadFilters<JobState, TaskState> {
vec![
]
}
fn task_expanders(&self, ) -> ct::TaskExpanders<JobState, TaskState> {
vec![
Box::new(crusty_core::task_expanders::FollowLinks::new(ct::LinkTarget::Follow)),
]
}
}
#[derive(Clone, Debug, Reflection, Serialize, Deserialize)]
pub struct DBRWNotificationDBEntry {
host: String,
app_id: String,
created_at: u32,
table_name: String,
label: String,
took_ms: u32,
since_last_ms: u32,
items: u32,
}
impl From<chu::GenericNotification> for DBRWNotificationDBEntry {
fn from(s: chu::GenericNotification) -> Self {
let c = CONFIG.lock().unwrap();
DBRWNotificationDBEntry {
host: c.host.clone(),
app_id: c.app_id.clone(),
created_at: now(),
table_name: s.table_name,
label: s.label,
took_ms: s.duration.as_millis() as u32,
since_last_ms: s.since_last.as_millis() as u32,
items: s.items as u32,
}
}
}
#[derive(Debug, Clone)]
pub struct TaskMeasurementData {
pub status_code: u16,
pub wait_time_ms: u32,
pub status_time_ms: u32,
pub load_time_ms: u32,
pub parse_time_ms: u32,
pub write_size_b: u32,
pub read_size_b: u32,
}
#[derive(Clone, Debug)]
pub struct TaskMeasurement {
pub time: u32,
pub url: String,
pub md: Option<TaskMeasurementData>,
}
#[derive(Clone, Debug, Serialize, Reflection, Deserialize)]
pub struct TaskMeasurementDBEntry {
host: String,
app_id: String,
created_at: u32,
url: String,
error: u8,
status_code: u16,
wait_time_ms: u32,
status_time_ms: u32,
load_time_ms: u32,
parse_time_ms: u32,
write_size_b: u32,
read_size_b: u32,
}
impl From<TaskMeasurement> for TaskMeasurementDBEntry {
fn from(s: TaskMeasurement) -> Self {
let c = CONFIG.lock().unwrap();
if s.md.is_none() {
return Self {
host: c.host.clone(),
app_id: c.app_id.clone(),
created_at: s.time,
url: s.url,
error: 1,
status_code: 0,
wait_time_ms: 0,
status_time_ms: 0,
load_time_ms: 0,
parse_time_ms: 0,
write_size_b: 0,
read_size_b: 0,
};
}
let md = s.md.unwrap();
Self {
host: c.host.clone(),
app_id: c.app_id.clone(),
created_at: s.time,
url: s.url,
error: 0,
status_code: md.status_code,
wait_time_ms: md.wait_time_ms,
status_time_ms: md.status_time_ms,
load_time_ms: md.load_time_ms,
parse_time_ms: md.parse_time_ms,
write_size_b: md.write_size_b,
read_size_b: md.read_size_b,
}
}
}
#[derive(Debug, Clone)]
pub struct QueueStats {
pub len: usize,
pub time: u32,
}
#[derive(Debug, Clone)]
pub enum QueueKind {
Job,
JobUpdate,
MetricsTask,
MetricsQueue,
MetricsDB,
DomainUpdate,
DomainInsert,
DomainUpdateNotifyP,
DomainUpdateNotify,
DomainInsertNotify,
}
#[derive(Debug, Clone)]
pub struct QueueMeasurement {
pub kind: QueueKind,
pub stats: QueueStats,
}
#[derive(Debug, Serialize, Deserialize, Reflection)]
pub struct QueueMeasurementDBEntry {
host: String,
app_id: String,
pub name: String,
pub updated_at: u32,
pub len: u32,
}
impl From<QueueMeasurement> for QueueMeasurementDBEntry {
fn from(s: QueueMeasurement) -> Self {
let c = CONFIG.lock().unwrap();
Self {
host: c.host.clone(),
app_id: c.app_id.clone(),
name: format!("{:?}", s.kind),
updated_at: s.stats.time,
len: s.stats.len as u32,
}
}
}
impl<JS: ct::JobStateValues, TS: ct::TaskStateValues> From<ct::JobUpdate<JS, TS>> for TaskMeasurement {
fn from(r: ct::JobUpdate<JS, TS>) -> Self {
if let ct::JobStatus::Processing(Ok(ref load_data)) = r.status {
let parse_time_ms =
if let ct::FollowResult::Ok(ref follow_data) = load_data.follow_data {
follow_data.metrics.parse_dur.as_millis() as u32
} else { 0 };
let (load_time_ms, write_size_b, read_size_b) = if let ct::LoadResult::Ok(ref load_data) = load_data.load_data {
(
load_data.metrics.load_dur.as_millis() as u32,
load_data.metrics.write_size as u32,
load_data.metrics.read_size as u32
)
} else {
(0, 0, 0)
};
if let StatusResult::Ok(status) = &load_data.status {
return TaskMeasurement {
time: now(),
url: r.task.link.url.to_string(),
md: Some(TaskMeasurementData {
status_code: status.status_code as u16,
wait_time_ms: status.status_metrics.wait_dur.as_millis() as u32,
status_time_ms: status.status_metrics.status_dur.as_millis() as u32,
load_time_ms,
write_size_b,
read_size_b,
parse_time_ms,
}),
};
}
}
TaskMeasurement {
time: now(),
url: r.task.link.url.to_string(),
md: None,
}
}
}