use std::{
cmp::max,
collections::HashSet,
sync::{
self,
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
};
use anyhow::Result;
use console::style;
use lazy_static::lazy_static;
use leaky_bucket::RateLimiter;
use tokio::{
sync::RwLock,
time::{sleep, Duration},
};
use crate::{
atomic_load, atomic_store,
config::RequesterPolicy,
event_handlers::{
Command::{AddError, SubtractFromUsizeField},
Handles,
},
extractor::{ExtractionTarget, ExtractorBuilder},
filters::SimilarityFilter,
nlp::{Document, TfIdf},
response::FeroxResponse,
scan_manager::{FeroxScan, ScanStatus},
statistics::{StatError::Other, StatField::TotalExpected},
url::FeroxUrl,
utils::{logged_request, send_try_recursion_command, should_deny_url},
HIGH_ERROR_RATIO, UNIQUE_DISTANCE,
};
use super::{policy_data::PolicyData, FeroxScanner, PolicyTrigger};
lazy_static! {
pub(crate) static ref TF_IDF: Arc<sync::RwLock<TfIdf>> = Arc::new(sync::RwLock::new(TfIdf::new()));
}
pub(super) struct Requester {
handles: Arc<Handles>,
target_url: String,
rate_limiter: RwLock<Option<RateLimiter>>,
policy_data: PolicyData,
ferox_scan: Arc<FeroxScan>,
seen_links: RwLock<HashSet<String>>,
tuning_lock: Mutex<usize>,
policy_triggered: AtomicBool,
}
impl Requester {
pub fn from(scanner: &FeroxScanner, ferox_scan: Arc<FeroxScan>) -> Result<Self> {
let limit = scanner.handles.config.rate_limit;
let mut policy_data = PolicyData::new(
scanner.handles.config.requester_policy,
scanner.handles.config.timeout,
);
let rate_limiter = if limit > 0 {
policy_data = policy_data.with_rate_limit(limit);
Some(Self::build_a_bucket(limit)?)
} else {
None
};
Ok(Self {
ferox_scan,
policy_data,
seen_links: RwLock::new(HashSet::<String>::new()),
rate_limiter: RwLock::new(rate_limiter),
handles: scanner.handles.clone(),
target_url: scanner.target_url.to_owned(),
tuning_lock: Mutex::new(0),
policy_triggered: AtomicBool::new(false),
})
}
fn build_a_bucket(limit: usize) -> Result<RateLimiter> {
let limit = max(limit, 1);
let refill = limit;
let tokens = max((limit as f64 / 2.0).round() as usize, 1);
let interval = 1000;
Ok(RateLimiter::builder()
.interval(Duration::from_millis(interval))
.refill(refill)
.initial(tokens) .max(limit)
.build())
}
async fn cool_down(&self) {
sleep(Duration::from_millis(self.policy_data.wait_time)).await;
self.ferox_scan.progress_bar().set_message("");
atomic_store!(self.policy_data.cooling_down, false, Ordering::Release);
}
pub async fn limit(&self) -> Result<()> {
let guard = self.rate_limiter.read().await;
if let Some(limiter) = guard.as_ref() {
limiter.acquire_one().await;
}
Ok(())
}
fn too_many_errors(&self) -> bool {
let total = self.ferox_scan.num_errors(PolicyTrigger::Errors);
let threshold = max(self.handles.config.threads / 2, 25);
total >= threshold
}
fn too_many_status_errors(&self, trigger: PolicyTrigger) -> bool {
let total = self.ferox_scan.num_errors(trigger);
let requests = self.ferox_scan.requests();
let ratio = total as f64 / requests as f64;
match trigger {
PolicyTrigger::Status403 => ratio >= HIGH_ERROR_RATIO,
PolicyTrigger::Status429 => ratio >= HIGH_ERROR_RATIO / 3.0,
_ => false,
}
}
fn should_enforce_policy(&self) -> Option<PolicyTrigger> {
if self
.policy_data
.cooling_down
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return None;
}
let requests = self.ferox_scan.requests() as usize;
if requests < max(self.handles.config.threads, 50) {
atomic_store!(self.policy_data.cooling_down, false, Ordering::Release);
return None;
}
if self.too_many_errors() {
return Some(PolicyTrigger::Errors);
}
if self.too_many_status_errors(PolicyTrigger::Status403) {
return Some(PolicyTrigger::Status403);
}
if self.too_many_status_errors(PolicyTrigger::Status429) {
return Some(PolicyTrigger::Status429);
}
atomic_store!(self.policy_data.cooling_down, false, Ordering::Release);
None
}
async fn adjust_limit(&self, trigger: PolicyTrigger, create_limiter: bool) -> Result<()> {
let scan_errors = self.ferox_scan.num_errors(trigger);
let policy_errors = self.policy_data.get_errors(trigger);
let pb_message: Option<String>;
{
let mut guard = match self.tuning_lock.lock() {
Ok(g) => g,
Err(e) => {
log::error!("tuning_lock poisoned in adjust_limit: {}", e);
return Ok(()); }
};
if scan_errors > policy_errors {
*guard = 0; if policy_errors != 0 {
self.policy_data.adjust_down();
log::info!(
"auto-tune: errors increased; reducing speed to {} reqs/sec for {}",
self.policy_data.get_limit(),
self.target_url
);
let styled_direction = style("reduced").red();
pb_message = Some(format!(
"=> 🚦 {styled_direction} scan speed ({}/s)",
self.policy_data.get_limit()
));
} else {
pb_message = None;
}
self.policy_data.set_errors(trigger, scan_errors);
} else {
*guard += 1;
self.policy_data.adjust_up(&guard);
log::info!(
"auto-tune: errors decreased; increasing speed to {} reqs/sec for {}",
self.policy_data.get_limit(),
self.target_url
);
let styled_direction = style("increased").green();
pb_message = Some(format!(
"=> 🚦 {styled_direction} scan speed ({}/s)",
self.policy_data.get_limit()
));
}
if let Some(ref msg) = pb_message {
self.ferox_scan.progress_bar().set_message(msg.clone());
}
}
if atomic_load!(self.policy_data.remove_limit) {
if let Some(rate_limit) = self.policy_data.rate_limit {
self.set_rate_limiter(Some(rate_limit)).await?;
} else {
self.set_rate_limiter(None).await?;
}
atomic_store!(self.policy_data.remove_limit, false);
atomic_store!(self.policy_triggered, false, Ordering::Release);
self.policy_data.reset_heap();
if let Ok(_guard) = self.tuning_lock.try_lock() {
self.ferox_scan
.progress_bar()
.set_message("=> 🚦 removed rate limiter 🚀");
}
} else if create_limiter {
let new_limit = self.policy_data.get_limit(); self.set_rate_limiter(Some(new_limit)).await?;
}
Ok(())
}
async fn set_rate_limiter(&self, new_limit: Option<usize>) -> Result<()> {
let mut guard = self.rate_limiter.write().await;
let new_bucket = if let Some(limit) = new_limit {
if guard.is_some() && guard.as_ref().unwrap().max() == limit {
return Ok(());
} else {
Some(Self::build_a_bucket(limit)?)
}
} else {
None
};
let _ = std::mem::replace(&mut *guard, new_bucket);
Ok(())
}
async fn tune(&self, trigger: PolicyTrigger) -> Result<()> {
if !self.policy_data.heap_initialized() {
let reqs_sec = self.ferox_scan.requests_per_second() as usize;
if reqs_sec < 2 {
log::debug!("auto-tune: {} reqs/sec is too low; not initializing heap and resetting cooldown period", reqs_sec);
self.policy_data.reset_heap();
atomic_store!(self.policy_data.cooling_down, false, Ordering::Release);
atomic_store!(self.policy_triggered, false, Ordering::Release);
return Ok(());
}
let seed = if let Some(cap) = self.policy_data.rate_limit {
reqs_sec.min(cap)
} else {
reqs_sec
};
self.policy_data.set_reqs_sec(seed);
atomic_store!(self.policy_triggered, true);
let new_limit = self.policy_data.get_limit();
log::info!(
"auto-tune: {} reqs/sec was too fast; enforcing limit {} reqs/sec for {}",
reqs_sec,
new_limit,
self.target_url
);
self.set_rate_limiter(Some(new_limit)).await?;
self.ferox_scan
.progress_bar()
.set_message(format!("=> 🚦 set rate limit ({new_limit}/s)"));
}
self.adjust_limit(trigger, true).await?;
self.cool_down().await;
Ok(())
}
async fn bail(&self, trigger: PolicyTrigger) -> Result<()> {
if self.ferox_scan.is_active() {
log::warn!(
"too many {:?} ({}) triggered {:?} Policy on {}",
trigger,
self.ferox_scan.num_errors(trigger),
self.handles.config.requester_policy,
self.ferox_scan
);
self.ferox_scan
.set_status(ScanStatus::Cancelled)
.unwrap_or_else(|e| log::warn!("Could not set scan status: {e}"));
let scans = self.handles.ferox_scans()?;
let active_bars = scans.number_of_bars();
self.ferox_scan
.abort(active_bars)
.await
.unwrap_or_else(|e| log::warn!("Could not bail on scan: {e}"));
let pb = self.ferox_scan.progress_bar();
let num_skipped = pb.length().unwrap_or(0).saturating_sub(pb.position()) as usize;
let styled_trigger = style(format!("{trigger:?}")).red();
pb.set_message(format!(
"=> 💀 too many {} ({}) 💀 bailing",
styled_trigger,
self.ferox_scan.num_errors(trigger),
));
self.handles
.stats
.send(SubtractFromUsizeField(TotalExpected, num_skipped))
.unwrap_or_else(|e| log::warn!("Could not update overall scan bar: {e}"));
}
Ok(())
}
pub async fn request(&self, word: &str) -> Result<()> {
log::trace!("enter: request({word})");
let collected = self.handles.collected_extensions();
let urls = FeroxUrl::from_string(&self.target_url, self.handles.clone())
.formatted_urls(word, collected)?;
let should_test_deny = !self.handles.config.url_denylist.is_empty()
|| !self.handles.config.regex_denylist.is_empty();
for url in urls {
for method in self.handles.config.methods.iter() {
if should_test_deny && should_deny_url(&url, self.handles.clone())? {
continue;
}
let should_tune =
self.handles.config.auto_tune || self.handles.config.rate_limit > 0;
let should_limit = should_tune && self.rate_limiter.read().await.is_some();
if should_limit {
if let Err(e) = self.limit().await {
log::warn!("Could not rate limit scan: {e}");
self.handles.stats.send(AddError(Other)).unwrap_or_default();
}
}
let data = if self.handles.config.data.is_empty() {
None
} else {
Some(self.handles.config.data.as_slice())
};
let response =
logged_request(&url, method.as_str(), data, self.handles.clone()).await?;
if (should_tune || self.handles.config.auto_bail)
&& !atomic_load!(self.policy_data.cooling_down, Ordering::Acquire)
{
match self.policy_data.policy {
RequesterPolicy::AutoTune => {
if let Some(trigger) = self.should_enforce_policy() {
if let Err(e) = self.tune(trigger).await {
atomic_store!(
self.policy_data.cooling_down,
false,
Ordering::Release
);
atomic_store!(self.policy_triggered, false, Ordering::Release);
return Err(e);
}
} else if atomic_load!(self.policy_triggered) {
if self
.policy_data
.cooling_down
.compare_exchange(
false,
true,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_ok()
{
self.adjust_limit(PolicyTrigger::TryAdjustUp, true).await?;
self.cool_down().await;
}
}
}
RequesterPolicy::AutoBail => {
if let Some(trigger) = self.should_enforce_policy() {
if let Err(e) = self.bail(trigger).await {
atomic_store!(
self.policy_data.cooling_down,
false,
Ordering::Release
);
return Err(e);
}
}
}
RequesterPolicy::Default => {}
}
}
let mut ferox_response = FeroxResponse::from(
response,
&self.target_url,
method,
self.handles.config.output_level,
self.handles.config.response_size_limit,
)
.await;
if !self.handles.config.no_recursion && !self.handles.config.force_recursion {
send_try_recursion_command(self.handles.clone(), ferox_response.clone())
.await?;
}
if self
.handles
.filters
.data
.should_filter_response(&ferox_response, self.handles.stats.tx.clone())
{
continue;
}
if self.handles.config.unique {
let mut unique_filter = SimilarityFilter::from(&ferox_response);
unique_filter.cutoff = UNIQUE_DISTANCE;
self.handles.filters.data.push(Box::new(unique_filter))?;
}
if !self.handles.config.no_recursion && self.handles.config.force_recursion {
if self.handles.config.filter_status.is_empty() {
if self
.handles
.config
.status_codes
.contains(&ferox_response.status().as_u16())
{
send_try_recursion_command(
self.handles.clone(),
ferox_response.clone(),
)
.await?;
}
} else {
send_try_recursion_command(self.handles.clone(), ferox_response.clone())
.await?;
}
}
if self.handles.config.collect_extensions {
ferox_response.parse_extension(self.handles.clone())?;
}
if self.handles.config.collect_words {
if let Ok(mut guard) = TF_IDF.write() {
if let Some(doc) = Document::from_html(ferox_response.text()) {
guard.add_document(doc);
if guard.num_documents().is_multiple_of(12)
|| (guard.num_documents() < 5
&& guard.num_documents().is_multiple_of(2))
{
guard.calculate_tf_idf_scores();
}
}
}
}
if self.handles.config.extract_links {
let mut extractor = ExtractorBuilder::default()
.target(ExtractionTarget::ResponseBody)
.response(&ferox_response)
.handles(self.handles.clone())
.url(self.ferox_scan.url())
.build()?;
let new_links: HashSet<_>;
let result = extractor.extract().await?;
{
let read_links = self.seen_links.read().await;
new_links = result.difference(&read_links).cloned().collect();
}
if !new_links.is_empty() {
let mut write_links = self.seen_links.write().await;
for new_link in &new_links {
write_links.insert(new_link.to_owned());
}
}
if !new_links.is_empty() {
let extraction_task = extractor.request_links(new_links).await?;
if let Some(task) = extraction_task {
_ = task.await;
}
}
}
if let Err(e) = ferox_response.send_report(self.handles.output.tx.clone()) {
log::warn!("Could not send FeroxResponse to output handler: {e}");
}
}
}
log::trace!("exit: request");
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::time::Instant;
use reqwest::StatusCode;
use crate::{
config::Configuration,
config::OutputLevel,
event_handlers::Command::AddStatus,
event_handlers::{FiltersHandler, ScanHandler, StatsHandler, Tasks, TermOutHandler},
filters,
scan_manager::{ScanOrder, ScanType},
statistics::StatError,
};
use super::*;
async fn setup_requester_test(config: Option<Arc<Configuration>>) -> (Arc<Handles>, Tasks) {
let configuration = config.unwrap_or_else(|| Arc::new(Configuration::new().unwrap()));
let (stats_task, stats_handle) = StatsHandler::initialize(configuration.clone());
let (filters_task, filters_handle) = FiltersHandler::initialize();
let (out_task, out_handle) =
TermOutHandler::initialize(configuration.clone(), stats_handle.tx.clone());
let wordlist = Arc::new(vec![String::from("this_is_a_test")]);
let handles = Arc::new(Handles::new(
stats_handle,
filters_handle,
out_handle,
configuration.clone(),
wordlist,
));
let (scan_task, scan_handle) = ScanHandler::initialize(handles.clone());
handles.set_scan_handle(scan_handle);
filters::initialize(handles.clone()).await.unwrap();
let tasks = Tasks::new(out_task, stats_task, filters_task, scan_task);
(handles, tasks)
}
async fn increment_errors(handles: Arc<Handles>, scan: Arc<FeroxScan>, num_errors: usize) {
for _ in 0..num_errors {
handles.stats.send(AddError(StatError::Other)).unwrap();
scan.add_error();
scan.progress_bar().inc(1);
}
handles.stats.sync().await.unwrap();
}
async fn increment_scan_errors(handles: Arc<Handles>, url: &str, num_errors: usize) {
let scans = handles.ferox_scans().unwrap();
for _ in 0..num_errors {
scans.increment_error(format!("{url}/").as_str());
}
}
async fn increment_scan_status_codes(
handles: Arc<Handles>,
url: &str,
code: StatusCode,
num_errors: usize,
) {
let scans = handles.ferox_scans().unwrap();
for _ in 0..num_errors {
scans.increment_status_code(format!("{url}/").as_str(), code);
}
}
async fn increment_status_codes(
handles: Arc<Handles>,
scan: Arc<FeroxScan>,
num_codes: usize,
code: StatusCode,
) {
for _ in 0..num_codes {
handles.stats.send(AddStatus(code)).unwrap();
scan.progress_bar().inc(1);
if code == StatusCode::FORBIDDEN {
scan.add_403();
} else {
scan.add_429();
}
}
handles.stats.sync().await.unwrap();
}
async fn create_scan(
handles: Arc<Handles>,
url: &str,
num_errors: usize,
trigger: PolicyTrigger,
) -> Arc<FeroxScan> {
let scan = FeroxScan::new(
url,
ScanType::Directory,
ScanOrder::Initial,
1000,
OutputLevel::Default,
None,
true,
handles.clone(),
);
scan.set_status(ScanStatus::Running).unwrap();
scan.progress_bar();
let scans = handles.ferox_scans().unwrap();
scans.insert(scan.clone());
match trigger {
PolicyTrigger::Status403 => {
increment_scan_status_codes(
handles.clone(),
url,
StatusCode::FORBIDDEN,
num_errors,
)
.await;
}
PolicyTrigger::Status429 => {
increment_scan_status_codes(
handles.clone(),
url,
StatusCode::TOO_MANY_REQUESTS,
num_errors,
)
.await;
}
PolicyTrigger::Errors => {
increment_scan_errors(handles.clone(), url, num_errors).await;
}
_ => {}
}
assert_eq!(scan.num_errors(trigger), num_errors);
scan
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn should_enforce_policy_returns_false_on_not_enough_requests_seen() {
let (handles, _) = setup_requester_test(None).await;
let requester = Requester {
handles,
target_url: "http://localhost".to_string(),
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan: Arc::new(FeroxScan::default()),
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
policy_triggered: AtomicBool::new(false),
};
let ferox_scan = Arc::new(FeroxScan::default());
increment_errors(requester.handles.clone(), ferox_scan.clone(), 49).await;
assert_eq!(atomic_load!(requester.handles.stats.data.requests), 49);
assert_eq!(requester.should_enforce_policy(), None);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn should_enforce_policy_returns_true_on_error_times_threads() {
let mut config = Configuration::new().unwrap_or_default();
config.threads = 50;
let (handles, _) = setup_requester_test(Some(Arc::new(config))).await;
let ferox_scan = Arc::new(FeroxScan::default());
let requester = Requester {
handles,
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan: ferox_scan.clone(),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
policy_triggered: AtomicBool::new(false),
};
increment_errors(requester.handles.clone(), ferox_scan.clone(), 25).await;
assert_eq!(requester.should_enforce_policy(), None);
increment_errors(requester.handles.clone(), ferox_scan, 25).await;
assert_eq!(
requester.should_enforce_policy(),
Some(PolicyTrigger::Errors)
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn should_enforce_policy_returns_true_on_excessive_403s() {
let (handles, _) = setup_requester_test(None).await;
let ferox_scan = Arc::new(FeroxScan::default());
let requester = Requester {
handles,
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan: ferox_scan.clone(),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
policy_triggered: AtomicBool::new(false),
};
increment_status_codes(
requester.handles.clone(),
ferox_scan.clone(),
45,
StatusCode::FORBIDDEN,
)
.await;
assert_eq!(requester.should_enforce_policy(), None);
increment_status_codes(
requester.handles.clone(),
ferox_scan.clone(),
5,
StatusCode::OK,
)
.await;
assert_eq!(
requester.should_enforce_policy(),
Some(PolicyTrigger::Status403)
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn should_enforce_policy_returns_true_on_excessive_429s() {
let mut config = Configuration::new().unwrap_or_default();
config.threads = 50;
let (handles, _) = setup_requester_test(Some(Arc::new(config))).await;
let ferox_scan = Arc::new(FeroxScan::default());
let requester = Requester {
handles,
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan: ferox_scan.clone(),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
policy_triggered: AtomicBool::new(false),
};
increment_status_codes(
requester.handles.clone(),
ferox_scan.clone(),
15,
StatusCode::TOO_MANY_REQUESTS,
)
.await;
assert_eq!(requester.should_enforce_policy(), None);
increment_status_codes(
requester.handles.clone(),
ferox_scan.clone(),
35,
StatusCode::OK,
)
.await;
assert_eq!(
requester.should_enforce_policy(),
Some(PolicyTrigger::Status429)
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn bail_calls_abort_on_highest_errored_feroxscan() {
let (handles, _) = setup_requester_test(None).await;
let scan_one = create_scan(handles.clone(), "http://one", 10, PolicyTrigger::Errors).await;
let scan_two = create_scan(handles.clone(), "http://two", 14, PolicyTrigger::Errors).await;
let scan_three =
create_scan(handles.clone(), "http://three", 4, PolicyTrigger::Errors).await;
let scan_four = create_scan(handles.clone(), "http://four", 7, PolicyTrigger::Errors).await;
let dummy_task =
tokio::spawn(async move { tokio::time::sleep(Duration::new(15, 0)).await });
scan_two.set_task(dummy_task).await.unwrap();
assert!(scan_one.is_active());
assert!(scan_two.is_active());
let scans = handles.ferox_scans().unwrap();
assert_eq!(scans.get_active_scans().len(), 4);
let req_clone = scan_two.clone();
let requester = Requester {
handles,
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan: req_clone,
target_url: "http://one/one/stuff.php".to_string(),
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
policy_triggered: AtomicBool::new(false),
};
requester.bail(PolicyTrigger::Errors).await.unwrap();
assert_eq!(scans.get_active_scans().len(), 3);
assert!(scan_one.is_active());
assert!(scan_three.is_active());
assert!(scan_four.is_active());
assert!(!scan_two.is_active());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn bail_returns_ok_on_no_active_scans() {
let (handles, _) = setup_requester_test(None).await;
let scan_one =
create_scan(handles.clone(), "http://one", 10, PolicyTrigger::Status403).await;
let scan_two =
create_scan(handles.clone(), "http://two", 10, PolicyTrigger::Status429).await;
scan_one.set_status(ScanStatus::Complete).unwrap();
scan_two.set_status(ScanStatus::Cancelled).unwrap();
let scans = handles.ferox_scans().unwrap();
assert_eq!(scans.get_active_scans().len(), 0);
let requester = Requester {
handles,
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan: Arc::new(FeroxScan::default()),
target_url: "http://one/one/stuff.php".to_string(),
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
policy_triggered: AtomicBool::new(false),
};
let result = requester.bail(PolicyTrigger::Status403).await;
assert!(result.is_ok());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn should_enforce_policy_returns_none_on_cooldown() {
let mut config = Configuration::new().unwrap_or_default();
config.threads = 50;
let (handles, _) = setup_requester_test(Some(Arc::new(config))).await;
let requester = Requester {
handles,
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan: Arc::new(FeroxScan::default()),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
policy_triggered: AtomicBool::new(false),
};
requester
.policy_data
.cooling_down
.store(true, Ordering::Relaxed);
assert_eq!(requester.should_enforce_policy(), None);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn cooldown_pauses_for_wait_time() {
let (handles, _) = setup_requester_test(None).await;
let requester = Arc::new(Requester {
handles,
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan: Arc::new(FeroxScan::default()),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(None),
policy_data: PolicyData::new(RequesterPolicy::AutoBail, 7),
policy_triggered: AtomicBool::new(false),
});
let start = Instant::now();
requester.cool_down().await;
assert!(start.elapsed().as_millis() >= 3500);
assert!(!requester.policy_data.cooling_down.load(Ordering::Relaxed));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn adjust_limit_increments_streak_counter_on_upward_movement() {
let (handles, _) = setup_requester_test(None).await;
let requester = Requester {
handles,
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan: Arc::new(FeroxScan::default()),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(None),
policy_data: PolicyData::new(RequesterPolicy::AutoBail, 7),
policy_triggered: AtomicBool::new(false),
};
requester.policy_data.set_reqs_sec(400);
requester
.adjust_limit(PolicyTrigger::Errors, true)
.await
.unwrap();
assert_eq!(*requester.tuning_lock.lock().unwrap(), 1);
assert_eq!(requester.policy_data.get_limit(), 300);
assert_eq!(
requester.rate_limiter.read().await.as_ref().unwrap().max(),
300
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn adjust_limit_resets_streak_counter_on_downward_movement() {
let (handles, _) = setup_requester_test(None).await;
let limiter = RateLimiter::builder()
.interval(Duration::from_secs(1))
.max(200)
.build();
let scan = FeroxScan::default();
scan.add_error();
scan.add_error();
let requester = Requester {
handles,
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan: Arc::new(scan),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(Some(limiter)),
policy_data: PolicyData::new(RequesterPolicy::AutoBail, 7),
policy_triggered: AtomicBool::new(false),
};
requester.policy_data.set_reqs_sec(400);
requester.policy_data.set_errors(PolicyTrigger::Errors, 1);
{
let mut guard = requester.tuning_lock.lock().unwrap();
*guard = 2;
}
requester
.adjust_limit(PolicyTrigger::Errors, false)
.await
.unwrap();
assert_eq!(*requester.tuning_lock.lock().unwrap(), 0);
assert_eq!(requester.policy_data.get_limit(), 100);
assert_eq!(requester.policy_data.get_errors(PolicyTrigger::Errors), 2);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn adjust_limit_removes_rate_limiter() {
let (handles, _) = setup_requester_test(None).await;
let scan = FeroxScan::default();
scan.add_error();
scan.add_error();
let requester = Requester {
handles,
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan: Arc::new(scan),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(None),
policy_data: PolicyData::new(RequesterPolicy::AutoBail, 7),
policy_triggered: AtomicBool::new(false),
};
requester.policy_data.set_reqs_sec(400);
requester
.policy_data
.remove_limit
.store(true, Ordering::Relaxed);
requester
.adjust_limit(PolicyTrigger::Errors, true)
.await
.unwrap();
assert!(requester.rate_limiter.read().await.is_none());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn too_many_status_errors_returns_correct_values() {
let (handles, _) = setup_requester_test(None).await;
let mut requester = Requester {
handles,
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan: Arc::new(FeroxScan::default()),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(None),
policy_data: PolicyData::new(RequesterPolicy::AutoBail, 7),
policy_triggered: AtomicBool::new(false),
};
assert!(!requester.too_many_status_errors(PolicyTrigger::Errors));
assert!(!requester.too_many_status_errors(PolicyTrigger::Status429));
requester.ferox_scan.progress_bar().set_position(10);
requester.ferox_scan.add_429();
requester.ferox_scan.add_429();
requester.ferox_scan.add_429();
assert!(requester.too_many_status_errors(PolicyTrigger::Status429));
assert!(!requester.too_many_status_errors(PolicyTrigger::Status403));
requester.ferox_scan = Arc::new(FeroxScan::default());
requester.ferox_scan.progress_bar().set_position(10);
requester.ferox_scan.add_403();
requester.ferox_scan.add_403();
requester.ferox_scan.add_403();
requester.ferox_scan.add_403();
requester.ferox_scan.add_403();
requester.ferox_scan.add_403();
requester.ferox_scan.add_403();
requester.ferox_scan.add_403();
requester.ferox_scan.add_403();
assert!(requester.too_many_status_errors(PolicyTrigger::Status403));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn set_rate_limiter_early_exit() {
let (handles, _) = setup_requester_test(None).await;
let limiter = RateLimiter::builder()
.interval(Duration::from_secs(1))
.max(200)
.build();
let requester = Requester {
handles,
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan: Arc::new(FeroxScan::default()),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(Some(limiter)),
policy_data: PolicyData::new(RequesterPolicy::AutoBail, 7),
policy_triggered: AtomicBool::new(false),
};
requester.set_rate_limiter(Some(200)).await.unwrap();
assert_eq!(
requester.rate_limiter.read().await.as_ref().unwrap().max(),
200
);
requester.set_rate_limiter(Some(200)).await.unwrap();
assert_eq!(
requester.rate_limiter.read().await.as_ref().unwrap().max(),
200
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn tune_sets_expected_values_and_then_waits() {
let (handles, _) = setup_requester_test(None).await;
let limiter = RateLimiter::builder()
.interval(Duration::from_secs(1))
.max(200)
.build();
let scan = FeroxScan::new(
"http://localhost",
ScanType::Directory,
ScanOrder::Initial,
1000,
OutputLevel::Default,
None,
true,
Arc::new(Handles::for_testing(None, None).0),
);
scan.set_status(ScanStatus::Running).unwrap();
scan.add_429();
let requester = Requester {
handles,
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan: scan.clone(),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(Some(limiter)),
policy_data: PolicyData::new(RequesterPolicy::AutoTune, 4),
policy_triggered: AtomicBool::new(false),
};
let start = Instant::now();
let pb = scan.progress_bar();
pb.set_length(1000);
pb.set_position(400);
sleep(Duration::new(1, 0)).await;
assert_eq!(
requester.policy_data.get_errors(PolicyTrigger::Status429),
0
);
requester.tune(PolicyTrigger::Status429).await.unwrap();
let original = requester.policy_data.heap.read().unwrap().original;
assert!(
(399..=401).contains(&original),
"Expected ~400 req/s original, got {}",
original
);
let limit = requester.policy_data.get_limit();
assert!(
(199..=201).contains(&limit),
"Expected limit ~200, got {}",
limit
);
let rate_limiter_max = requester.rate_limiter.read().await.as_ref().unwrap().max();
assert!(
(199..=201).contains(&rate_limiter_max),
"Expected rate limiter max ~200, got {}",
rate_limiter_max
);
scan.finish(0).unwrap();
assert!(start.elapsed().as_millis() >= 2000);
}
#[test]
fn build_a_bucket_handles_low_rates_correctly() {
for limit in 1..=20 {
let result = Requester::build_a_bucket(limit);
assert!(result.is_ok(), "build_a_bucket failed for limit {}", limit);
let bucket = result.unwrap();
assert_eq!(
bucket.max(),
limit,
"Bucket max should equal requested limit {} but got {}",
limit,
bucket.max()
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn policy_triggered_reset_when_limiter_removed() {
let (handles, _) = setup_requester_test(None).await;
let ferox_scan = Arc::new(FeroxScan::default());
let requester = Requester {
handles,
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan,
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(None),
policy_data: PolicyData::new(RequesterPolicy::AutoTune, 7),
policy_triggered: AtomicBool::new(false),
};
atomic_store!(requester.policy_triggered, true, Ordering::Release);
requester.policy_data.set_reqs_sec(100);
assert!(requester.policy_data.heap_initialized());
atomic_store!(requester.policy_data.remove_limit, true);
requester
.adjust_limit(PolicyTrigger::Errors, true)
.await
.unwrap();
assert!(
!atomic_load!(requester.policy_triggered),
"policy_triggered should be reset to false when limiter is removed"
);
assert!(
!requester.policy_data.heap_initialized(),
"heap should be reset when limiter is removed"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn should_enforce_policy_uses_per_scan_requests() {
let mut config = Configuration::new().unwrap_or_default();
config.threads = 50;
let (handles, _) = setup_requester_test(Some(Arc::new(config))).await;
let ferox_scan = Arc::new(FeroxScan::default());
let requester = Requester {
handles: handles.clone(),
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan: ferox_scan.clone(),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(None),
policy_data: PolicyData::new(RequesterPolicy::AutoTune, 7),
policy_triggered: AtomicBool::new(false),
};
for _ in 0..100 {
handles.stats.send(AddError(StatError::Other)).unwrap();
}
handles.stats.sync().await.unwrap();
ferox_scan.progress_bar().inc(5);
for _ in 0..5 {
ferox_scan.add_error();
}
assert_eq!(
requester.should_enforce_policy(),
None,
"should_enforce_policy should use per-scan requests, not global"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn heap_values_clamped_to_rate_limit_cap() {
let policy_data = PolicyData::new(RequesterPolicy::AutoTune, 7).with_rate_limit(100);
policy_data.set_reqs_sec(500);
let heap = policy_data.heap.read().unwrap();
for i in 0..heap.inner.len() {
if heap.inner[i] > 0 {
assert!(
heap.inner[i] <= 100,
"Heap value at index {} is {}, expected <= 100",
i,
heap.inner[i]
);
}
}
assert_eq!(heap.inner[0], 100, "Root should be clamped to cap");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn auto_tune_with_cap_adjusts_down_on_errors() {
let policy_data = PolicyData::new(RequesterPolicy::AutoTune, 7).with_rate_limit(100);
policy_data.set_reqs_sec(100);
assert_eq!(policy_data.get_limit(), 50);
policy_data.adjust_down();
assert_eq!(policy_data.get_limit(), 25);
policy_data.adjust_down();
let new_limit = policy_data.get_limit();
assert!(new_limit < 25, "Limit should decrease further");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn auto_tune_with_cap_never_exceeds_cap_on_upward_adjustment() {
let policy_data = PolicyData::new(RequesterPolicy::AutoTune, 7).with_rate_limit(100);
policy_data.set_reqs_sec(100);
{
let mut heap = policy_data.heap.write().unwrap();
heap.move_to(15); }
for _ in 0..10 {
policy_data.adjust_up(&3); let current_limit = policy_data.get_limit();
assert!(
current_limit <= 100,
"Limit {} exceeded cap of 100",
current_limit
);
}
let final_limit = policy_data.get_limit();
assert!(
(50..=100).contains(&final_limit),
"Final limit {} should be between 50 and 100",
final_limit
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn remove_limit_with_cap_sets_to_cap_instead_of_removing() {
let mut config = Configuration::new().unwrap_or_default();
config.rate_limit = 100;
config.auto_tune = true;
config.requester_policy = RequesterPolicy::AutoTune;
let (handles, _) = setup_requester_test(Some(Arc::new(config))).await;
let ferox_scan = Arc::new(FeroxScan::default());
let policy_data = PolicyData::new(RequesterPolicy::AutoTune, 7).with_rate_limit(100);
let requester = Requester {
handles: handles.clone(),
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan: ferox_scan.clone(),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(Some(Requester::build_a_bucket(50).unwrap())),
policy_data,
policy_triggered: AtomicBool::new(true),
};
atomic_store!(requester.policy_data.remove_limit, true);
requester
.adjust_limit(PolicyTrigger::Errors, true)
.await
.unwrap();
let limiter = requester.rate_limiter.read().await;
assert!(
limiter.is_some(),
"Limiter should not be removed when cap exists"
);
assert_eq!(
limiter.as_ref().unwrap().max(),
100,
"Limiter should be set to cap value"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn initial_limiter_set_to_cap_when_both_flags_present() {
let mut config = Configuration::new().unwrap_or_default();
config.rate_limit = 100;
config.auto_tune = true;
let (handles, _) = setup_requester_test(Some(Arc::new(config))).await;
let ferox_scan = Arc::new(FeroxScan::default());
let policy_data = PolicyData::new(RequesterPolicy::AutoTune, 7).with_rate_limit(100);
let requester = Requester {
handles: handles.clone(),
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan: ferox_scan.clone(),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(Some(Requester::build_a_bucket(100).unwrap())),
policy_data,
policy_triggered: AtomicBool::new(false),
};
let limiter = requester.rate_limiter.read().await;
assert!(limiter.is_some(), "Limiter should be initialized");
assert_eq!(
limiter.as_ref().unwrap().max(),
100,
"Initial limiter should be set to rate_limit value"
);
assert_eq!(
requester.policy_data.rate_limit,
Some(100),
"PolicyData should have rate_limit set"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn capped_auto_tune_full_lifecycle() {
let mut config = Configuration::new().unwrap_or_default();
config.rate_limit = 100;
config.auto_tune = true;
config.requester_policy = RequesterPolicy::AutoTune;
config.threads = 50;
let (handles, _) = setup_requester_test(Some(Arc::new(config))).await;
let ferox_scan = FeroxScan::new(
"http://localhost",
ScanType::Directory,
ScanOrder::Latest,
0,
OutputLevel::Default,
None,
true,
handles.clone(),
);
ferox_scan.set_status(ScanStatus::Running).unwrap();
ferox_scan.set_start_time(Instant::now()).unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
ferox_scan.progress_bar().inc(100);
let policy_data = PolicyData::new(RequesterPolicy::AutoTune, 7).with_rate_limit(100);
let requester = Requester {
handles: handles.clone(),
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan: ferox_scan.clone(),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(Some(Requester::build_a_bucket(100).unwrap())),
policy_data,
policy_triggered: AtomicBool::new(false),
};
for _ in 0..50 {
ferox_scan.add_error();
}
requester.tune(PolicyTrigger::Errors).await.unwrap();
assert!(
requester.policy_data.heap_initialized(),
"Heap should be initialized after tune()"
);
let initial_limit = requester.policy_data.get_limit();
assert!(
initial_limit <= 100,
"Initial limit {} should not exceed cap",
initial_limit
);
assert_eq!(
initial_limit, 50,
"Initial limit should be 50 (half of capped seed 100)"
);
for _ in 0..25 {
ferox_scan.add_error();
}
requester
.adjust_limit(PolicyTrigger::Errors, true)
.await
.unwrap();
let reduced_limit = requester.policy_data.get_limit();
assert!(
reduced_limit < initial_limit,
"Limit should decrease on errors: {} < {}",
reduced_limit,
initial_limit
);
requester.policy_data.set_errors(PolicyTrigger::Errors, 200);
for i in 0..5 {
requester
.adjust_limit(PolicyTrigger::Errors, true)
.await
.unwrap();
let current_limit = requester.policy_data.get_limit();
assert!(
current_limit <= 100,
"Iteration {}: Limit {} exceeded cap of 100",
i,
current_limit
);
}
atomic_store!(requester.policy_data.remove_limit, true);
requester
.adjust_limit(PolicyTrigger::Errors, true)
.await
.unwrap();
let final_limiter = requester.rate_limiter.read().await;
assert!(
final_limiter.is_some(),
"Limiter should not be removed when cap exists"
);
assert_eq!(
final_limiter.as_ref().unwrap().max(),
100,
"Limiter should be at cap value"
);
}
}