use bytes::Bytes;
use http::HeaderMap;
use parlov_analysis::StopRule;
use parlov_core::{Error, ResponseClass, StrategyMetaForStop, StrategyOutcome};
use parlov_elicit::ProbeSpec;
use parlov_output::ScanFinding;
use parlov_probe::http::HttpProbe;
use tracing::warn;
use crate::pipeline_state::ScanPipelineState;
use crate::scan_runner::{run_burst, run_header_diff, run_pair, FindingOpts};
use crate::strategy_filter::spec_strategy_id;
use parlov_analysis::StopDecision;
#[derive(Debug, Clone, Copy)]
pub(crate) struct RunOpts {
pub exhaustive: bool,
pub confirm_threshold: f64,
pub repro: bool,
pub verbose: bool,
}
impl RunOpts {
fn finding_opts(self) -> FindingOpts {
FindingOpts {
repro: self.repro,
verbose: self.verbose,
}
}
}
pub(crate) async fn run_plan_specs(
plan: &[ProbeSpec],
target: &str,
state: &mut ScanPipelineState,
stop_rule: &StopRule,
probe: &HttpProbe,
opts: RunOpts,
) -> Vec<(ResponseClass, HeaderMap, Bytes)> {
let mut exchanges: Vec<(ResponseClass, HeaderMap, Bytes)> = Vec::new();
for (idx, spec) in plan.iter().enumerate() {
let strategy_id = spec_strategy_id(spec);
match dispatch_spec(target, spec, probe, opts.finding_opts()).await {
Ok((finding, outcome, exchange)) => {
ingest_outcome(
state,
spec,
&outcome,
&finding,
opts.exhaustive,
strategy_id,
opts.confirm_threshold,
);
if let Some((status, ref headers, ref body)) = exchange {
admit_exchange(status, headers, body, &mut exchanges);
}
let remaining = build_remaining_meta(&plan[idx + 1..]);
let decision = stop_rule.evaluate(&state.accumulator, &remaining);
if decision != StopDecision::Continue && !opts.exhaustive {
state.stop_decision = Some(decision);
break;
}
}
Err(e) => warn!("strategy failed, skipping: {e}"),
}
}
exchanges
}
pub(crate) fn rate_limit_gate(exchanges: &[(ResponseClass, HeaderMap, Bytes)]) -> bool {
exchanges.iter().any(|(class, headers, _)| {
if !matches!(class, ResponseClass::RateLimited) {
return false;
}
if headers.contains_key(http::header::RETRY_AFTER) {
return true;
}
parse_rate_limit_remaining(headers) == Some(0)
})
}
fn parse_rate_limit_remaining(headers: &HeaderMap) -> Option<u64> {
for name in &["ratelimit-remaining", "x-ratelimit-remaining"] {
if let Some(val) = headers.get(*name) {
if let Ok(s) = val.to_str() {
if let Ok(n) = s.trim().parse::<u64>() {
return Some(n);
}
}
}
}
None
}
fn ingest_outcome(
state: &mut ScanPipelineState,
spec: &ProbeSpec,
outcome: &StrategyOutcome,
finding: &ScanFinding,
exhaustive: bool,
strategy_id: &str,
confirm_threshold: f64,
) {
let family_vector = spec.technique().vector;
state.accumulator.ingest(outcome, family_vector);
state.findings.push((finding.clone(), outcome.clone()));
state.strategies_run += 1;
if exhaustive
&& state.first_threshold_crossed_by.is_none()
&& state.accumulator.log_odds_current() >= confirm_threshold
{
state.first_threshold_crossed_by = Some(strategy_id.to_owned());
}
}
fn admit_exchange(
status: http::StatusCode,
headers: &HeaderMap,
body: &Bytes,
exchanges: &mut Vec<(ResponseClass, HeaderMap, Bytes)>,
) {
let class = ResponseClass::classify(status, headers);
if matches!(
class,
ResponseClass::Success
| ResponseClass::Redirect
| ResponseClass::PartialContent
| ResponseClass::RangeNotSatisfiable
| ResponseClass::StructuredError
| ResponseClass::AuthChallenge
| ResponseClass::RateLimited
) {
exchanges.push((class, headers.clone(), body.clone()));
}
}
pub(crate) fn build_remaining_meta(remaining: &[ProbeSpec]) -> Vec<StrategyMetaForStop> {
remaining
.iter()
.map(|spec| {
let t = spec.technique();
StrategyMetaForStop {
vector: t.vector,
normative_strength: t.strength,
}
})
.collect()
}
async fn dispatch_spec(
target: &str,
spec: &ProbeSpec,
probe: &HttpProbe,
opts: FindingOpts,
) -> Result<
(
ScanFinding,
StrategyOutcome,
Option<(http::StatusCode, HeaderMap, Bytes)>,
),
Error,
> {
match spec {
ProbeSpec::Pair(pair) => run_pair(target, pair, probe, opts).await,
ProbeSpec::Burst(burst) => run_burst(target, burst, probe, opts).await,
ProbeSpec::HeaderDiff(pair) => run_header_diff(target, pair, probe, opts).await,
}
}
#[cfg(test)]
#[path = "scan_exec_tests.rs"]
mod tests;