use std::collections::hash_map::Iter;
use std::collections::HashMap;
use std::ops::{Add, Deref, Sub};
use std::str::FromStr;
use std::sync::Arc;
use parking_lot::Mutex;
use std::time::Duration;
use chrono::{DateTime, NaiveDateTime, Utc};
use serde_json::Value;
use serde::{ Serialize, Deserialize };
use product_os_proxy::Proxy;
use product_os_store::{ProductOSKeyValueStore, ProductOSQueueStore};
use product_os_browser::{ProductOSBrowser, BrowserStatus, Request, WindowSelect, ProductOSError};
#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum CrawlMethod {
Ignore,
FocusCrawl,
DeepCrawl,
BroadDeepCrawl
}
impl CrawlMethod {
pub fn from_str(method: &str) -> CrawlMethod {
match method {
"focus" => CrawlMethod::FocusCrawl,
"broad_deep" => CrawlMethod::BroadDeepCrawl,
"deep" => CrawlMethod::DeepCrawl,
_ => CrawlMethod::Ignore
}
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum ThrottleDirection {
Loosen,
Tighten
}
#[derive(Clone, Serialize, Deserialize, Debug)]
struct ActiveCrawl {
pub request: Request,
pub crawl_method: CrawlMethod,
pub count: u64
}
pub struct Seeder {
instances: HashMap<String, Arc<Mutex<ProductOSBrowser>>>,
config: product_os_configuration::Crawler,
browser_config: product_os_configuration::Browser,
key_value_store: Arc<ProductOSKeyValueStore>,
queue_store: Arc<ProductOSQueueStore>
}
impl Seeder {
pub fn new(config: product_os_configuration::Crawler, browser_config: product_os_configuration::Browser,
proxy: Option<Arc<Mutex<Proxy>>>, key_value_store: Arc<ProductOSKeyValueStore>, queue_store: Arc<ProductOSQueueStore>,
browser_helpers: Vec<Arc<dyn product_os_browser::BrowserHelper>>) -> Self {
let mut instances = HashMap::new();
let instance_count = if config.seeder.multi_instance { config.seeder.instances } else { 1 };
for _ in 0..instance_count {
let identifier = product_os_security::RandomGenerator::get_random_string_one_time(10, &mut None::<product_os_random::RNG>);
let mut instance = ProductOSBrowser::new(identifier.clone(), Some(key_value_store.clone()), browser_config.clone());
instance.set_proxy(proxy.clone());
for helper in &browser_helpers {
instance.add_helper(helper.to_owned());
}
instances.insert(identifier, Arc::new(Mutex::new(instance)));
}
Self {
instances,
config,
browser_config,
key_value_store,
queue_store
}
}
pub fn get_downloaders(&mut self) -> Iter<String, Arc<Mutex<ProductOSBrowser>>> {
self.instances.iter()
}
pub fn enqueue(&self, request: Request) -> Result<(), ()> {
tracing::info!("Enqueuing request: {:?}", request);
let url_last_visit_tracker_group = String::from("url_last_visit_tracker");
let mut okay_to_enqueue = false;
match self.key_value_store.group_specific_get(request.url.to_string().as_str(),url_last_visit_tracker_group.as_str()) {
Ok(timestamp) => {
let last_download: DateTime<Utc> = DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp_opt(serde_json::from_str(timestamp.as_str()).unwrap(), 0).unwrap(), Utc);
let time_since_last_visit = Utc::now().timestamp() - last_download.timestamp();
if time_since_last_visit >= i64::from(self.config.revisit_delay) {
tracing::info!("Time since last visit to {:?} in seconds {} meets revisit delay - okay to enqueue", request.url, time_since_last_visit);
okay_to_enqueue = true;
}
else {
tracing::info!("Time since last visit to {:?} in seconds {} too short - skipping enqueue", request.url, time_since_last_visit);
}
}
Err(_) => {
tracing::info!("Page not yet visited according to last visit tracker: {}", request.url);
okay_to_enqueue = true;
}
}
if okay_to_enqueue {
match self.queue_store.queue_specific_push(serde_json::to_string(&request).unwrap().as_str(), "crawler_request_queue") {
Ok(_) => Ok(()),
Err(_) => Err(())
}
}
else {
Ok(())
}
}
pub fn enqueue_queued_domain(&self, request: Request, crawl_method: CrawlMethod) {
let mut focus_active_crawl_domains: HashMap<String, ActiveCrawl> = match self.key_value_store.group_specific_get("focus", "active_crawl_domains") { Ok(v) => serde_json::from_str(v.as_str()).unwrap(), Err(_) => HashMap::new() };
let mut deep_active_crawl_domains: HashMap<String, ActiveCrawl> = match self.key_value_store.group_specific_get("deep", "active_crawl_domains") { Ok(v) => serde_json::from_str(v.as_str()).unwrap(), Err(_) => HashMap::new() };
let mut broad_deep_active_crawl_domains: HashMap<String, ActiveCrawl> = match self.key_value_store.group_specific_get("broad_deep", "active_crawl_domains") { Ok(v) => serde_json::from_str(v.as_str()).unwrap(), Err(_) => HashMap::new() };
let domain = request.url.domain().unwrap().to_string();
match crawl_method {
CrawlMethod::FocusCrawl => {
focus_active_crawl_domains.insert(domain.to_string(), ActiveCrawl { request: request.clone(), crawl_method: CrawlMethod::FocusCrawl, count: 0 });
match self.key_value_store.group_specific_set("focus", serde_json::to_string(&focus_active_crawl_domains).unwrap().as_str(), "active_crawl_domains") {
Ok(_) => {},
Err(_) => {}
};
}
CrawlMethod::DeepCrawl => {
deep_active_crawl_domains.insert(domain.to_string(), ActiveCrawl { request: request.clone(), count: 0, crawl_method: CrawlMethod::DeepCrawl });
match self.key_value_store.group_specific_set("deep", serde_json::to_string(&deep_active_crawl_domains).unwrap().as_str(), "active_crawl_domains") {
Ok(_) => {},
Err(_) => {}
};
}
CrawlMethod::BroadDeepCrawl => {
broad_deep_active_crawl_domains.insert(domain.to_string(), ActiveCrawl { request: request.clone(), count: 0, crawl_method: CrawlMethod::BroadDeepCrawl });
match self.key_value_store.group_specific_set("broad_deep", serde_json::to_string(&broad_deep_active_crawl_domains).unwrap().as_str(), "active_crawl_domains") {
Ok(_) => {},
Err(_) => {}
};
}
CrawlMethod::Ignore => {}
}
let throttling_config = &self.config.seeder.throttling;
match self.key_value_store.group_specific_set(domain.as_str(), throttling_config.default_frequency.to_string().as_str(), "current_frequencies") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), Duration::new(u64::from(throttling_config.default_rate), 0).as_secs().to_string().as_str(), "current_rates") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), throttling_config.default_wait.to_string().as_str(), "current_waits") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "number_of_downloads_within_rates") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), Utc::now().to_rfc3339().as_str(), "last_throttle_window_times") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "wait_counters") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "change_tight_counters") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "change_loose_counters") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "last_adjust_requests") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "crawl_download_count") { Ok(_) => {}, Err(_) => {} };
match self.enqueue(request) {
Ok(_) => {}
Err(_) => {}
}
}
pub fn seed(&mut self, url: url::Url, crawl_method: CrawlMethod, method: product_os_request::Method, payload: Option<Value>) {
let request = Request {
url: url.clone(),
link_chain: vec![],
method,
payload,
headers: HashMap::new()
};
tracing::info!("Performing crawler seeding process");
let mut focus_active_crawl_domains: HashMap<String, ActiveCrawl> = match self.key_value_store.group_specific_get("focus", "active_crawl_domains") { Ok(v) => serde_json::from_str(v.as_str()).unwrap(), Err(_) => HashMap::new() };
let mut deep_active_crawl_domains: HashMap<String, ActiveCrawl> = match self.key_value_store.group_specific_get("broad", "active_crawl_domains") { Ok(v) => serde_json::from_str(v.as_str()).unwrap(), Err(_) => HashMap::new() };
let mut broad_deep_active_crawl_domains: HashMap<String, ActiveCrawl> = match self.key_value_store.group_specific_get("broad", "active_crawl_domains") { Ok(v) => serde_json::from_str(v.as_str()).unwrap(), Err(_) => HashMap::new() };
let mut queue_crawl_domains: Vec<Request> = match self.key_value_store.group_specific_get("queue", "active_crawl_domains") { Ok(v) => serde_json::from_str(v.as_str()).unwrap(), Err(_) => vec!() };
let concurrent_crawls = focus_active_crawl_domains.len() + deep_active_crawl_domains.len() + broad_deep_active_crawl_domains.len();
let mut crawl_domain = false;
let domain = url.domain().unwrap().to_string();
let mut is_queued = false;
for request in &queue_crawl_domains {
if request.url.domain().unwrap() == domain { is_queued = true };
}
if !focus_active_crawl_domains.contains_key(&domain.to_string()) && !deep_active_crawl_domains.contains_key(&domain.to_string()) && !broad_deep_active_crawl_domains.contains_key(&domain.to_string()) && !is_queued {
match crawl_method {
CrawlMethod::FocusCrawl => {
if concurrent_crawls < self.config.max_concurrent_crawls {
crawl_domain = true;
focus_active_crawl_domains.insert(domain.to_string(), ActiveCrawl { request: request.clone(), count: 0, crawl_method: CrawlMethod::FocusCrawl });
match self.key_value_store.group_specific_set("focus", serde_json::to_string(&focus_active_crawl_domains).unwrap().as_str(), "active_crawl_domains") {
Ok(_) => {},
Err(_) => {}
};
}
else {
queue_crawl_domains.push(request.clone());
match self.key_value_store.group_specific_set("queue", serde_json::to_string(&queue_crawl_domains).unwrap().as_str(), "active_crawl_domains") {
Ok(_) => {},
Err(_) => {}
};
}
}
CrawlMethod::DeepCrawl => {
if concurrent_crawls < self.config.max_concurrent_crawls {
crawl_domain = true;
deep_active_crawl_domains.insert(domain.to_string(), ActiveCrawl { request: request.clone(), count: 0, crawl_method: CrawlMethod::DeepCrawl });
match self.key_value_store.group_specific_set("deep", serde_json::to_string(&deep_active_crawl_domains).unwrap().as_str(), "active_crawl_domains") {
Ok(_) => {},
Err(_) => {}
};
}
else {
queue_crawl_domains.push(request.clone());
match self.key_value_store.group_specific_set("queue", serde_json::to_string(&queue_crawl_domains).unwrap().as_str(), "active_crawl_domains") {
Ok(_) => {},
Err(_) => {}
};
}
}
CrawlMethod::BroadDeepCrawl => {
if concurrent_crawls < self.config.max_concurrent_crawls {
crawl_domain = true;
broad_deep_active_crawl_domains.insert(domain.to_string(), ActiveCrawl { request: request.clone(), count: 0, crawl_method: CrawlMethod::BroadDeepCrawl });
match self.key_value_store.group_specific_set("broad_deep", serde_json::to_string(&broad_deep_active_crawl_domains).unwrap().as_str(), "active_crawl_domains") {
Ok(_) => {},
Err(_) => {}
};
}
else {
queue_crawl_domains.push(request.clone());
match self.key_value_store.group_specific_set("queue", serde_json::to_string(&queue_crawl_domains).unwrap().as_str(), "active_crawl_domains") {
Ok(_) => {},
Err(_) => {}
};
}
}
CrawlMethod::Ignore => {}
}
}
if crawl_domain {
let throttling_config = &self.config.seeder.throttling;
match self.key_value_store.group_specific_set(domain.as_str(), throttling_config.default_frequency.to_string().as_str(), "current_frequencies") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), Duration::new(u64::from(throttling_config.default_rate), 0).as_secs().to_string().as_str(), "current_rates") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), throttling_config.default_wait.to_string().as_str(), "current_waits") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "number_of_downloads_within_rates") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), Utc::now().to_rfc3339().as_str(), "last_throttle_window_times") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "wait_counters") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "change_tight_counters") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "change_loose_counters") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "last_adjust_requests") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "crawl_download_count") { Ok(_) => {}, Err(_) => {} };
match self.enqueue(request) {
Ok(_) => {}
Err(_) => {}
}
}
}
pub async fn download(&mut self) {
let browsers = self.instances.to_owned();
for browser in browsers.values() {
let browser_unlocked = browser.clone();
match browser_unlocked.try_lock_for(core::time::Duration::new(10, 0)) {
None => {},
Some(browser_locked) => {
tracing::info!("Attempting download using browser ID: {}", browser_locked.id());
let status = match browser_locked.get_status(&WindowSelect::Visible) {
Ok(s) => s,
Err(_) => BrowserStatus::Error("Problem getting status".to_string())
};
drop(browser_locked);
if status == BrowserStatus::Idle {
let browser_unlocked = browser.clone();
let crawl_method = CrawlMethod::from_str(self.config.method.as_str());
match self.get_request(crawl_method).await {
None => {}
Some(request) => {
tokio::spawn(async move {
match browser_unlocked.try_lock_for(core::time::Duration::new(10, 0)) {
None => { tracing::info!("Failed to lock browser"); },
Some(mut browser_locked) => {
tracing::info!("Starting download: {:?}", request);
match browser_locked.download_content(&WindowSelect::Visible, request).await {
Ok(_) => {}
Err(_) => {}
}
}
}
});
}
}
}
}
};
}
}
async fn get_request(&mut self, crawl_method: CrawlMethod) -> Option<Request> {
let mut request = Some(Request::new());
while request.clone().is_some() {
request = match self.queue_store.queue_specific_remove("crawler_request_queue") { Ok(v) => Some(serde_json::from_str(v.as_str()).unwrap()), Err(_) => None };
tracing::info!("Crawl download request result: {:?}", request);
if request.is_none() {
let mut queue_crawl_domains: Vec<Request> = match self.key_value_store.group_specific_get("queue", "active_crawl_domains") { Ok(v) => serde_json::from_str(v.as_str()).unwrap(), Err(_) => vec!() };
let queued_domain = if queue_crawl_domains.len() > 0 { Some(queue_crawl_domains.remove(0)) } else { None };
match self.key_value_store.group_specific_set("queue", serde_json::to_string(&queue_crawl_domains).unwrap().as_str(), "active_crawl_domains") {
Ok(_) => {},
Err(_) => {}
};
match queued_domain {
None => {
tracing::info!("No new domain to queue");
}
Some(d) => {
tracing::info!("Queuing new domain to crawl: {:?}", d);
self.enqueue_queued_domain(d, crawl_method.to_owned());
}
}
}
match request.clone() {
None => return request,
Some(req) => {
tracing::info!("Request evaluating: {:?}", req);
let url_last_visit_tracker_group = String::from("url_last_visit_tracker");
let mut okay_to_visit = false;
match self.key_value_store.group_specific_get(req.url.to_string().as_str(),url_last_visit_tracker_group.as_str()) {
Ok(timestamp) => {
let last_download: DateTime<Utc> = DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp_opt(serde_json::from_str(timestamp.as_str()).unwrap(), 0).unwrap(), Utc);
let time_since_last_visit = Utc::now().timestamp() - last_download.timestamp();
if time_since_last_visit >= i64::from(self.config.revisit_delay) {
tracing::info!("Time since last visit to {:?} in seconds {} meets revisit delay", req.url, time_since_last_visit);
okay_to_visit = true;
}
else {
tracing::info!("Time since last visit to {:?} in seconds {} too short - skipping visit", req.url, time_since_last_visit);
}
}
Err(_) => {
tracing::info!("Page not yet visited according to last visit tracker: {}", req.url);
okay_to_visit = true;
}
}
if okay_to_visit {
let domain = match req.url.domain() {
None => String::from("unknown"),
Some(d) => String::from(d)
};
let throttling_config = &self.config.seeder.throttling;
match self.key_value_store.group_specific_get(domain.as_str(), "last_throttle_window_times") {
Ok(_) => {
tracing::info!("Last throttle match: {:?}", req);
let last_throttle_window_time = match self.key_value_store.group_specific_get(domain.as_str(), "last_throttle_window_times") { Ok(v) => DateTime::parse_from_rfc3339(v.as_str()).unwrap(), Err(_) => DateTime::parse_from_rfc3339(Utc::now().to_rfc3339().as_str()).unwrap() };
let time_passed = Utc::now().signed_duration_since(last_throttle_window_time);
let last_adjust_request = match self.key_value_store.group_specific_get(domain.as_str(), "last_adjust_requests") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
let wait_counter = match self.key_value_store.group_specific_get(domain.as_str(), "wait_counters") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
let tight_adjuster = if throttling_config.change_tight_range > wait_counter { wait_counter } else { wait_counter - throttling_config.change_tight_range };
let change_tight_counter = match self.key_value_store.group_specific_get(domain.as_str(), "change_tight_counters") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
if change_tight_counter > 0 && last_adjust_request < tight_adjuster {
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "change_tight_counters") { Ok(_) => {}, Err(_) => {} };
}
let loose_adjuster = if throttling_config.change_loose_range > wait_counter { wait_counter } else { wait_counter - throttling_config.change_loose_range };
let change_loose_counter = match self.key_value_store.group_specific_get(domain.as_str(), "change_loose_counters") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
if change_loose_counter > 0 && last_adjust_request < loose_adjuster {
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "change_loose_counters") { Ok(_) => {}, Err(_) => {} };
}
let current_rate = match self.key_value_store.group_specific_get(domain.as_str(), "current_rates") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
let current_frequency = match self.key_value_store.group_specific_get(domain.as_str(), "current_frequencies") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
let number_of_downloads_within_rate = match self.key_value_store.group_specific_get(domain.as_str(), "number_of_downloads_within_rates") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
let seeder_statistics_group = String::from("seeder_statistics");
match self.key_value_store.group_specific_get(domain.as_str(),seeder_statistics_group.as_str()) {
Ok(stats) => {
let mut statistics: serde_json::Map<String, serde_json::Value> = match serde_json::from_str(stats.as_str()) {
Ok(value) => value,
Err(_) => serde_json::Map::new()
};
let number_of_downloads_within_rate = match self.key_value_store.group_specific_get(domain.as_str(), "number_of_downloads_within_rates") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
statistics.insert("downloads_within_rate".to_string(), serde_json::Value::Number(serde_json::Number::from(number_of_downloads_within_rate)));
statistics.insert("time_passed".to_string(), serde_json::Value::Number(serde_json::Number::from(time_passed.num_seconds())));
match self.key_value_store.group_specific_set(domain.as_str(), serde_json::to_string(&statistics).unwrap().as_str(), seeder_statistics_group.as_str()) {
Ok(_) => {},
Err(_) => {}
}
}
Err(_) => {
let mut statistics = serde_json::Map::new();
let number_of_downloads_within_rate = match self.key_value_store.group_specific_get(domain.as_str(), "number_of_downloads_within_rates") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
statistics.insert("downloads_within_rate".to_string(), serde_json::Value::Number(serde_json::Number::from(number_of_downloads_within_rate)));
statistics.insert("time_passed".to_string(), serde_json::Value::Number(serde_json::Number::from(time_passed.num_seconds())));
match self.key_value_store.group_specific_set(domain.as_str(), serde_json::to_string(&statistics).unwrap().as_str(), seeder_statistics_group.as_str()) {
Ok(_) => {},
Err(_) => {}
}
}
}
if time_passed.gt(&chrono::Duration::seconds(i64::try_from(current_rate).unwrap())) {
tracing::info!("Throttle time threshold met");
let now = Utc::now();
match self.key_value_store.group_specific_set(domain.as_str(), now.to_rfc3339().as_str(), "last_throttle_window_times") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "number_of_downloads_within_rates") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), (wait_counter + 1).to_string().as_str(), "wait_counters") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), (number_of_downloads_within_rate + 1).to_string().as_str(), "number_of_downloads_within_rates") { Ok(_) => {}, Err(_) => {} };
return Some(req)
}
else if throttling_config.enable && number_of_downloads_within_rate >= current_frequency {
let wait_time = chrono::Duration::seconds(i64::try_from(current_rate).unwrap()).sub(time_passed).add(chrono::Duration::seconds(1));
tracing::info!("Going to throttle, downloads excessive - waiting for: {} seconds", wait_time.num_seconds());
self.adjust_throttle(domain.to_string(), ThrottleDirection::Loosen);
tokio::time::sleep(Duration::from_secs(u64::try_from(wait_time.num_seconds()).unwrap())).await;
match self.enqueue(req) {
Ok(_) => {}
Err(_) => {}
}
}
else {
tracing::info!("No need to throttle");
return Some(req)
}
}
Err(_) => {
match self.key_value_store.group_specific_set(domain.as_str(), throttling_config.default_frequency.to_string().as_str(), "current_frequencies") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), Duration::new(u64::from(throttling_config.default_rate), 0).as_secs().to_string().as_str(), "current_rates") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), throttling_config.default_wait.to_string().as_str(), "current_waits") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "number_of_downloads_within_rates") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), Utc::now().to_rfc3339().as_str(), "last_throttle_window_times") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "wait_counters") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "change_tight_counters") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "change_loose_counters") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "last_adjust_requests") { Ok(_) => {}, Err(_) => {} };
return Some(req)
}
}
}
}
}
}
return None
}
fn adjust_throttle(&mut self, domain: String, direction: ThrottleDirection) {
let throttling_config = &self.config.seeder.throttling;
let last_adjust_request = match self.key_value_store.group_specific_get(domain.as_str(), "last_adjust_requests") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
let wait_counter = match self.key_value_store.group_specific_get(domain.as_str(), "wait_counters") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
let current_wait = match self.key_value_store.group_specific_get(domain.as_str(), "current_waits") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
match direction {
ThrottleDirection::Tighten => {
let tight_adjuster = if throttling_config.change_tight_range > wait_counter { wait_counter } else { wait_counter - throttling_config.change_tight_range };
if last_adjust_request >= tight_adjuster {
let change_tight_counter = match self.key_value_store.group_specific_get(domain.as_str(), "change_tight_counters") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
match self.key_value_store.group_specific_set(domain.as_str(), (change_tight_counter + 1).to_string().as_str(), "change_tight_counters") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), tight_adjuster.to_string().as_str(), "last_adjust_requests") { Ok(_) => {}, Err(_) => {} };
}
else {
match self.key_value_store.group_specific_set(domain.as_str(), 1.to_string().as_str(), "change_tight_counters") { Ok(_) => {}, Err(_) => {} };
}
if current_wait * throttling_config.multiplier <= throttling_config.wait_limit {
match self.key_value_store.group_specific_set(domain.as_str(), (current_wait * throttling_config.multiplier).to_string().as_str(), "current_waits") { Ok(_) => {}, Err(_) => {} };
}
let change_tight_counter = match self.key_value_store.group_specific_get(domain.as_str(), "change_tight_counters") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
if change_tight_counter >= throttling_config.change_tight_threshold {
let current_frequency = match self.key_value_store.group_specific_get(domain.as_str(), "current_frequencies") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
let current_rate = match self.key_value_store.group_specific_get(domain.as_str(), "current_rates") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
if current_frequency > 1 {
let new_frequency = (f64::from(current_frequency) / f64::from(throttling_config.multiplier)).ceil();
match self.key_value_store.group_specific_set(domain.as_str(), new_frequency.to_string().as_str(), "current_frequencies") { Ok(_) => {}, Err(_) => {} };
}
else if current_rate < throttling_config.rate_limit {
match self.key_value_store.group_specific_set(domain.as_str(), (current_rate + 1).to_string().as_str(), "current_rates") { Ok(_) => {}, Err(_) => {} };
}
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "change_tight_counters") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "change_loose_counters") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "wait_counters") { Ok(_) => {}, Err(_) => {} };
}
}
ThrottleDirection::Loosen => {
let loose_adjuster = if throttling_config.change_loose_range > wait_counter { wait_counter } else { wait_counter - throttling_config.change_loose_range };
if wait_counter > current_wait &&
last_adjust_request >= loose_adjuster {
let change_loose_counter = match self.key_value_store.group_specific_get(domain.as_str(), "change_loose_counters") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
match self.key_value_store.group_specific_set(domain.as_str(), (change_loose_counter + 1).to_string().as_str(), "change_loose_counters") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), loose_adjuster.to_string().as_str(), "last_adjust_requests") { Ok(_) => {}, Err(_) => {} };
}
else {
match self.key_value_store.group_specific_set(domain.as_str(), 1.to_string().as_str(), "change_loose_counters") { Ok(_) => {}, Err(_) => {} };
}
if wait_counter <= current_wait {
if wait_counter <= throttling_config.change_loose_range && current_wait + 1 <= throttling_config.wait_limit {
match self.key_value_store.group_specific_set(domain.as_str(), (current_wait + 1).to_string().as_str(), "current_waits") { Ok(_) => {}, Err(_) => {} };
}
}
else {
let new_wait = (f64::from(current_wait) / f64::from(throttling_config.multiplier)).ceil();
if new_wait >= 1.0 {
match self.key_value_store.group_specific_set(domain.as_str(), new_wait.to_string().as_str(), "current_waits") { Ok(_) => {}, Err(_) => {} };
}
let change_loose_counter = match self.key_value_store.group_specific_get(domain.as_str(), "change_loose_counters") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
if change_loose_counter >= throttling_config.change_loose_threshold {
let current_frequency = match self.key_value_store.group_specific_get(domain.as_str(), "current_frequencies") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
let current_rate = match self.key_value_store.group_specific_get(domain.as_str(), "current_rates") { Ok(v) => u32::from_str(v.as_str()).unwrap(), Err(_) => 0 };
if current_rate > 1 {
let new_rate = (f64::from(current_rate) / f64::from(throttling_config.multiplier)).ceil();
match self.key_value_store.group_specific_set(domain.as_str(), new_rate.to_string().as_str(), "current_rates") { Ok(_) => {}, Err(_) => {} };
}
else if current_frequency < throttling_config.frequency_limit {
match self.key_value_store.group_specific_set(domain.as_str(), (current_frequency + 1).to_string().as_str(), "current_frequencies") { Ok(_) => {}, Err(_) => {} };
}
}
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "change_tight_counters") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "change_loose_counters") { Ok(_) => {}, Err(_) => {} };
match self.key_value_store.group_specific_set(domain.as_str(), 0.to_string().as_str(), "wait_counters") { Ok(_) => {}, Err(_) => {} };
}
}
}
match self.key_value_store.group_specific_set(domain.as_str(), wait_counter.to_string().as_str(), "last_adjust_requests") { Ok(_) => {}, Err(_) => {} };
}
}