webshift 0.2.11

Denoised web search library — fetch, clean, and rerank web content for AI agents.
//! Concurrent HTTP fetcher with streaming size cap, UA rotation, and retry backoff.
//!
//! Port of `../mcp-webshift/src/mcp_webshift/scraper/fetcher.py`.
//!
//! # Anti-flooding guarantee
//! The response body is read chunk-by-chunk and aborted once `max_bytes` is
//! exceeded. **Never** use `response.text()` — it buffers the entire response.

use futures::StreamExt;
use rand::seq::SliceRandom;
use std::collections::HashMap;
use std::time::{Duration, Instant};

// ---------------------------------------------------------------------------
// User-Agent pool (40 entries — same as Python reference)
// ---------------------------------------------------------------------------

static USER_AGENTS: &[&str] = &[
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4 Safari/605.1.15",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:123.0) Gecko/20100101 Firefox/123.0",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
    "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:123.0) Gecko/20100101 Firefox/123.0",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 OPR/107.0.0.0",
    "Mozilla/5.0 (iPhone; CPU iPhone OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4 Mobile/15E148 Safari/604.1",
    "Mozilla/5.0 (iPhone; CPU iPhone OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/122.0.6261.89 Mobile/15E148 Safari/604.1",
    "Mozilla/5.0 (iPad; CPU OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4 Mobile/15E148 Safari/604.1",
    "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36",
    "Mozilla/5.0 (Linux; Android 14; Pixel 8) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.6261.105 Mobile Safari/537.36",
    "Mozilla/5.0 (Linux; Android 14; SM-S921B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.6261.105 Mobile Safari/537.36",
    "Mozilla/5.0 (Linux; Android 13; SAMSUNG SM-A546B) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/24.0 Chrome/117.0.0.0 Mobile Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Vivaldi/6.6.3271.45",
    "Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4 Safari/605.1.15",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:124.0) Gecko/20100101 Firefox/124.0",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36 Edg/123.0.0.0",
    "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 OPR/108.0.0.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_6_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.3 Safari/605.1.15",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 13.6; rv:124.0) Gecko/20100101 Firefox/124.0",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
    "Mozilla/5.0 (X11; Fedora; Linux x86_64; rv:124.0) Gecko/20100101 Firefox/124.0",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Vivaldi/6.6.3271.57",
    "Mozilla/5.0 (iPhone; CPU iPhone OS 17_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.3 Mobile/15E148 Safari/604.1",
    "Mozilla/5.0 (iPhone; CPU iPhone OS 16_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1",
    "Mozilla/5.0 (iPhone; CPU iPhone OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) FxiOS/123.0 Mobile/15E148 Safari/604.1",
    "Mozilla/5.0 (iPad; CPU OS 16_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1",
    "Mozilla/5.0 (Linux; Android 13; Pixel 7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.6261.105 Mobile Safari/537.36",
    "Mozilla/5.0 (Linux; Android 14; SM-A546E) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.6261.105 Mobile Safari/537.36",
    "Mozilla/5.0 (Linux; Android 12; SAMSUNG SM-G991B) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/23.0 Chrome/115.0.0.0 Mobile Safari/537.36",
    "Mozilla/5.0 (Linux; Android 11; Redmi Note 9 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Mobile Safari/537.36",
    "Mozilla/5.0 (Linux; Android 13; OnePlus Nord 3 5G) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36",
];

/// HTTP status codes that trigger retry with backoff.
const RETRYABLE_STATUSES: &[u16] = &[429, 502, 503];

/// Backoff delays in seconds for each retry attempt (index = attempt, 0-based).
const BACKOFF_DELAYS: &[f64] = &[1.0, 2.5];

// ---------------------------------------------------------------------------
// Internal helpers
// ---------------------------------------------------------------------------

fn elapsed_ms(t0: Instant) -> f64 {
    t0.elapsed().as_secs_f64() * 1000.0
}

fn parse_retry_after(response: &reqwest::Response) -> f64 {
    response
        .headers()
        .get("retry-after")
        .and_then(|v| v.to_str().ok())
        .and_then(|s| s.parse::<f64>().ok())
        .unwrap_or(0.0)
}

fn random_ua() -> &'static str {
    USER_AGENTS
        .choose(&mut rand::thread_rng())
        .copied()
        .unwrap_or(USER_AGENTS[0])
}

// ---------------------------------------------------------------------------
// Core fetch logic
// ---------------------------------------------------------------------------

/// Fetch a single URL with streaming, size cap, and retry backoff.
///
/// Returns `(url, html | None, elapsed_ms, raw_bytes)`.
async fn fetch_single(
    client: reqwest::Client,
    url: String,
    max_bytes: usize,
) -> (String, Option<String>, f64, usize) {
    let t0 = Instant::now();
    let mut raw_bytes = 0usize;

    #[allow(clippy::needless_range_loop)]
    for attempt in 0..=BACKOFF_DELAYS.len() {
        let response = match client
            .get(&url)
            .header(reqwest::header::USER_AGENT, random_ua())
            .send()
            .await
        {
            Ok(r) => r,
            Err(_) => return (url, None, elapsed_ms(t0), raw_bytes),
        };

        let status = response.status().as_u16();

        if RETRYABLE_STATUSES.contains(&status) {
            let retry_after = parse_retry_after(&response);
            if attempt < BACKOFF_DELAYS.len() {
                let delay = f64::max(retry_after, BACKOFF_DELAYS[attempt]);
                tokio::time::sleep(Duration::from_secs_f64(delay)).await;
                continue;
            }
            return (url, None, elapsed_ms(t0), raw_bytes);
        }

        if !response.status().is_success() {
            return (url, None, elapsed_ms(t0), raw_bytes);
        }

        // Stream body with hard size cap — DO NOT switch to response.text()
        let mut stream = response.bytes_stream();
        let mut body: Vec<u8> = Vec::new();
        let mut stream_err = false;
        while let Some(chunk) = stream.next().await {
            match chunk {
                Ok(c) => {
                    body.extend_from_slice(&c);
                    if body.len() > max_bytes {
                        break; // cap hit — drop the rest
                    }
                }
                Err(_) => {
                    stream_err = true;
                    break;
                }
            }
        }

        if stream_err && body.is_empty() {
            return (url, None, elapsed_ms(t0), raw_bytes);
        }

        raw_bytes = body.len();
        let text = String::from_utf8_lossy(&body).into_owned();
        return (url, Some(text), elapsed_ms(t0), raw_bytes);
    }

    (url, None, elapsed_ms(t0), raw_bytes)
}

// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------

/// Fetch multiple URLs concurrently.
///
/// Returns:
/// - `html_map`:   `{url: html}` for successful fetches only.
/// - `timing_map`: `{url: (elapsed_ms, raw_bytes)}` for all attempted fetches.
pub async fn fetch_urls(
    urls: &[String],
    max_bytes: usize,
    timeout_secs: u64,
) -> (HashMap<String, String>, HashMap<String, (f64, usize)>) {
    if urls.is_empty() {
        return (HashMap::new(), HashMap::new());
    }

    let client = match reqwest::Client::builder()
        .timeout(Duration::from_secs(timeout_secs))
        .connect_timeout(Duration::from_secs(5))
        .pool_max_idle_per_host(5)
        .redirect(reqwest::redirect::Policy::limited(10))
        .build()
    {
        Ok(c) => c,
        Err(_) => return (HashMap::new(), HashMap::new()),
    };

    let tasks: Vec<_> = urls
        .iter()
        .map(|url| {
            let client = client.clone();
            let url = url.clone();
            tokio::spawn(async move { fetch_single(client, url, max_bytes).await })
        })
        .collect();

    let results = futures::future::join_all(tasks).await;

    let mut html_map: HashMap<String, String> = HashMap::new();
    let mut timing_map: HashMap<String, (f64, usize)> = HashMap::new();

    for (url, html, elapsed, bytes) in results.into_iter().flatten() {
        timing_map.insert(url.clone(), (elapsed, bytes));
        if let Some(h) = html {
            html_map.insert(url, h);
        }
    }

    (html_map, timing_map)
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::*;
    use wiremock::matchers::{method, path};
    use wiremock::{Mock, MockServer, ResponseTemplate};

    #[tokio::test]
    async fn fetches_page_successfully() {
        let server = MockServer::start().await;
        Mock::given(method("GET"))
            .and(path("/page"))
            .respond_with(ResponseTemplate::new(200).set_body_string("<html><body>hello</body></html>"))
            .mount(&server)
            .await;

        let url = format!("{}/page", server.uri());
        let (html_map, timing_map) = fetch_urls(&[url.clone()], 1024 * 1024, 5).await;

        assert!(html_map.contains_key(&url));
        assert!(html_map[&url].contains("hello"));
        assert!(timing_map.contains_key(&url));
    }

    #[tokio::test]
    async fn returns_empty_on_404() {
        let server = MockServer::start().await;
        Mock::given(method("GET"))
            .and(path("/missing"))
            .respond_with(ResponseTemplate::new(404))
            .mount(&server)
            .await;

        let url = format!("{}/missing", server.uri());
        let (html_map, timing_map) = fetch_urls(&[url.clone()], 1024 * 1024, 5).await;

        assert!(!html_map.contains_key(&url));
        assert!(timing_map.contains_key(&url));
    }

    #[tokio::test]
    async fn size_cap_mechanism_is_in_place() {
        // The streaming cap breaks on chunk boundaries: after a chunk pushes the body
        // over max_bytes, no further chunks are read. Wiremock delivers the full body
        // as a single chunk, so the invariant here is that the fetch completes without
        // error and timing is recorded — not that the body is exactly `max_bytes`.
        // Full byte-level truncation is verified in integration tests against a real
        // chunked server.
        let server = MockServer::start().await;
        Mock::given(method("GET"))
            .and(path("/big"))
            .respond_with(ResponseTemplate::new(200).set_body_string("x".repeat(10_000)))
            .mount(&server)
            .await;

        let url = format!("{}/big", server.uri());
        let (_, timing_map) = fetch_urls(&[url.clone()], 100, 5).await;
        assert!(timing_map.contains_key(&url), "timing must be recorded for a capped fetch");
    }

    #[tokio::test]
    async fn empty_url_list_returns_empty_maps() {
        let (html_map, timing_map) = fetch_urls(&[], 1024 * 1024, 5).await;
        assert!(html_map.is_empty());
        assert!(timing_map.is_empty());
    }

    #[tokio::test]
    async fn fetches_multiple_urls_concurrently() {
        let server = MockServer::start().await;
        for i in 0..3 {
            Mock::given(method("GET"))
                .and(path(format!("/page{i}")))
                .respond_with(
                    ResponseTemplate::new(200)
                        .set_body_string(format!("<html>page{i}</html>")),
                )
                .mount(&server)
                .await;
        }

        let urls: Vec<String> = (0..3).map(|i| format!("{}/page{i}", server.uri())).collect();
        let (html_map, _) = fetch_urls(&urls, 1024 * 1024, 5).await;

        assert_eq!(html_map.len(), 3);
    }

    // -- Ported from Python test_fetcher.py --

    #[test]
    fn user_agent_pool_has_40_entries() {
        assert_eq!(USER_AGENTS.len(), 40);
    }

    #[test]
    fn all_user_agents_contain_mozilla() {
        for ua in USER_AGENTS {
            assert!(ua.contains("Mozilla"), "UA should contain Mozilla: {ua}");
        }
    }

    #[tokio::test]
    async fn retries_on_429_then_succeeds() {
        let server = MockServer::start().await;
        Mock::given(method("GET"))
            .and(path("/retry"))
            .respond_with(ResponseTemplate::new(429))
            .up_to_n_times(1)
            .expect(1)
            .mount(&server)
            .await;
        Mock::given(method("GET"))
            .and(path("/retry"))
            .respond_with(ResponseTemplate::new(200).set_body_string("<html>OK</html>"))
            .mount(&server)
            .await;

        let url = format!("{}/retry", server.uri());
        let (html_map, _) = fetch_urls(&[url.clone()], 1024 * 1024, 10).await;
        assert!(html_map.contains_key(&url));
        assert!(html_map[&url].contains("OK"));
    }

    #[tokio::test]
    async fn retries_on_503_then_succeeds() {
        let server = MockServer::start().await;
        Mock::given(method("GET"))
            .and(path("/retry503"))
            .respond_with(ResponseTemplate::new(503))
            .up_to_n_times(1)
            .expect(1)
            .mount(&server)
            .await;
        Mock::given(method("GET"))
            .and(path("/retry503"))
            .respond_with(ResponseTemplate::new(200).set_body_string("<html>OK</html>"))
            .mount(&server)
            .await;

        let url = format!("{}/retry503", server.uri());
        let (html_map, _) = fetch_urls(&[url.clone()], 1024 * 1024, 10).await;
        assert!(html_map.contains_key(&url));
    }

    #[tokio::test]
    async fn exhausts_retries_returns_none() {
        let server = MockServer::start().await;
        // All attempts return 429 → should exhaust retries (BACKOFF_DELAYS.len() + 1 = 3 attempts)
        Mock::given(method("GET"))
            .and(path("/always429"))
            .respond_with(ResponseTemplate::new(429))
            .expect(3)
            .mount(&server)
            .await;

        let url = format!("{}/always429", server.uri());
        let (html_map, timing_map) = fetch_urls(&[url.clone()], 1024 * 1024, 30).await;
        assert!(!html_map.contains_key(&url), "should return None after exhausting retries");
        assert!(timing_map.contains_key(&url));
    }

    #[tokio::test]
    async fn non_retryable_404_does_not_retry() {
        let server = MockServer::start().await;
        Mock::given(method("GET"))
            .and(path("/notfound"))
            .respond_with(ResponseTemplate::new(404))
            .expect(1) // exactly 1 request — no retries
            .mount(&server)
            .await;

        let url = format!("{}/notfound", server.uri());
        let (html_map, _) = fetch_urls(&[url.clone()], 1024 * 1024, 5).await;
        assert!(!html_map.contains_key(&url));
    }
}