parlov 0.8.0

HTTP oracle detection tool — systematic probing for RFC-compliant information leakage.
Documentation
//! Execution helpers for the two-phase scan pipeline.
//!
//! Owns `run_plan_specs` (Phase 1/2 dispatcher loop), `dispatch_spec` (spec router),
//! `build_remaining_meta` (stop-rule input builder), and the C9 rate-limit gate.

use bytes::Bytes;
use http::HeaderMap;
use parlov_analysis::StopRule;
use parlov_core::{Error, ResponseClass, StrategyMetaForStop, StrategyOutcome};
use parlov_elicit::ProbeSpec;
use parlov_output::ScanFinding;
use parlov_probe::http::HttpProbe;
use tracing::warn;

use crate::pipeline_state::ScanPipelineState;
use crate::scan_runner::{run_burst, run_header_diff, run_pair, FindingOpts};
use crate::strategy_filter::spec_strategy_id;

use parlov_analysis::StopDecision;

/// Scalar knobs for `run_plan_specs`. `confirm_threshold` is the log-odds at
/// which `first_threshold_crossed_by` is set in exhaustive mode; `repro` enables curl
/// capture on every finding; `verbose` enables filtered request/response
/// header capture and body samples in each finding.
#[derive(Debug, Clone, Copy)]
pub(crate) struct RunOpts {
    pub exhaustive: bool,
    pub confirm_threshold: f64,
    pub repro: bool,
    pub verbose: bool,
}

impl RunOpts {
    fn finding_opts(self) -> FindingOpts {
        FindingOpts {
            repro: self.repro,
            verbose: self.verbose,
        }
    }
}

/// Executes each `ProbeSpec` in `plan`, pushing `(finding, outcome)` pairs into `state`.
///
/// After each successful dispatch: ingests the outcome into the Bayesian accumulator,
/// records the strategy run, and evaluates the stop rule against the unrun tail.
/// When the stop rule fires and `opts.exhaustive` is false, sets `state.stop_decision` and halts.
///
/// Returns all qualifying `(ResponseClass, HeaderMap, Bytes)` exchanges. Strategy errors
/// are logged and skipped; the scan does not abort on failure.
pub(crate) async fn run_plan_specs(
    plan: &[ProbeSpec],
    target: &str,
    state: &mut ScanPipelineState,
    stop_rule: &StopRule,
    probe: &HttpProbe,
    opts: RunOpts,
) -> Vec<(ResponseClass, HeaderMap, Bytes)> {
    let mut exchanges: Vec<(ResponseClass, HeaderMap, Bytes)> = Vec::new();

    for (idx, spec) in plan.iter().enumerate() {
        let strategy_id = spec_strategy_id(spec);
        match dispatch_spec(target, spec, probe, opts.finding_opts()).await {
            Ok((finding, outcome, exchange)) => {
                ingest_outcome(
                    state,
                    spec,
                    &outcome,
                    &finding,
                    opts.exhaustive,
                    strategy_id,
                    opts.confirm_threshold,
                );
                if let Some((status, ref headers, ref body)) = exchange {
                    admit_exchange(status, headers, body, &mut exchanges);
                }
                let remaining = build_remaining_meta(&plan[idx + 1..]);
                let decision = stop_rule.evaluate(&state.accumulator, &remaining);
                if decision != StopDecision::Continue && !opts.exhaustive {
                    state.stop_decision = Some(decision);
                    break;
                }
            }
            Err(e) => warn!("strategy failed, skipping: {e}"),
        }
    }

    exchanges
}

/// Returns true when any 429/503 exchange signals exhausted rate-limit budget.
///
/// Gates Phase 2: if `Retry-After` is present or `RateLimit-Remaining` is zero,
/// firing chained probes would immediately saturate the limit further.
pub(crate) fn rate_limit_gate(exchanges: &[(ResponseClass, HeaderMap, Bytes)]) -> bool {
    exchanges.iter().any(|(class, headers, _)| {
        if !matches!(class, ResponseClass::RateLimited) {
            return false;
        }
        if headers.contains_key(http::header::RETRY_AFTER) {
            return true;
        }
        parse_rate_limit_remaining(headers) == Some(0)
    })
}

/// IETF draft-standard `RateLimit-Remaining` first, then legacy `X-` form.
fn parse_rate_limit_remaining(headers: &HeaderMap) -> Option<u64> {
    for name in &["ratelimit-remaining", "x-ratelimit-remaining"] {
        if let Some(val) = headers.get(*name) {
            if let Ok(s) = val.to_str() {
                if let Ok(n) = s.trim().parse::<u64>() {
                    return Some(n);
                }
            }
        }
    }
    None
}

fn ingest_outcome(
    state: &mut ScanPipelineState,
    spec: &ProbeSpec,
    outcome: &StrategyOutcome,
    finding: &ScanFinding,
    exhaustive: bool,
    strategy_id: &str,
    confirm_threshold: f64,
) {
    let family_vector = spec.technique().vector;
    state.accumulator.ingest(outcome, family_vector);
    state.findings.push((finding.clone(), outcome.clone()));
    state.strategies_run += 1;

    if exhaustive
        && state.first_threshold_crossed_by.is_none()
        && state.accumulator.log_odds_current() >= confirm_threshold
    {
        state.first_threshold_crossed_by = Some(strategy_id.to_owned());
    }
}

fn admit_exchange(
    status: http::StatusCode,
    headers: &HeaderMap,
    body: &Bytes,
    exchanges: &mut Vec<(ResponseClass, HeaderMap, Bytes)>,
) {
    let class = ResponseClass::classify(status, headers);
    // AuthChallenge admitted: C8 harvests WWW-Authenticate scheme/realm from 401/407.
    // RateLimited admitted: C9 rate-limit gate reads Retry-After/RateLimit-Remaining.
    if matches!(
        class,
        ResponseClass::Success
            | ResponseClass::Redirect
            | ResponseClass::PartialContent
            | ResponseClass::RangeNotSatisfiable
            | ResponseClass::StructuredError
            | ResponseClass::AuthChallenge
            | ResponseClass::RateLimited
    ) {
        exchanges.push((class, headers.clone(), body.clone()));
    }
}

pub(crate) fn build_remaining_meta(remaining: &[ProbeSpec]) -> Vec<StrategyMetaForStop> {
    remaining
        .iter()
        .map(|spec| {
            let t = spec.technique();
            StrategyMetaForStop {
                vector: t.vector,
                normative_strength: t.strength,
            }
        })
        .collect()
}

async fn dispatch_spec(
    target: &str,
    spec: &ProbeSpec,
    probe: &HttpProbe,
    opts: FindingOpts,
) -> Result<
    (
        ScanFinding,
        StrategyOutcome,
        Option<(http::StatusCode, HeaderMap, Bytes)>,
    ),
    Error,
> {
    match spec {
        ProbeSpec::Pair(pair) => run_pair(target, pair, probe, opts).await,
        ProbeSpec::Burst(burst) => run_burst(target, burst, probe, opts).await,
        ProbeSpec::HeaderDiff(pair) => run_header_diff(target, pair, probe, opts).await,
    }
}

#[cfg(test)]
#[path = "scan_exec_tests.rs"]
mod tests;