kumo 0.2.9

An async web crawling framework for Rust — Scrapy for Rust
Documentation
mod support;

use kumo::{
    engine::CrawlEngine,
    error::KumoError,
    extract::Response,
    fetch::Fetcher,
    middleware::{FetchRequest, StatusRetry},
    retry::RetryPolicy,
    spider::{Output, Spider},
    store::StdoutStore,
};
use std::sync::{
    Arc,
    atomic::{AtomicU32, Ordering},
};
use std::time::{Duration, Instant};
use support::VecStore;

struct SequentialFetcher {
    calls: Arc<AtomicU32>,
    fail_times: u32,
}

#[async_trait::async_trait]
impl Fetcher for SequentialFetcher {
    async fn fetch(&self, req: &FetchRequest) -> Result<Response, KumoError> {
        let n = self.calls.fetch_add(1, Ordering::SeqCst);
        let status = if n < self.fail_times { 503 } else { 200 };
        Ok(Response::from_parts(
            req.url(),
            status,
            "<html><body><h1>ok</h1></body></html>",
        ))
    }
}

#[tokio::test]
async fn status_retry_retries_on_429_and_succeeds() {
    let mut server = mockito::Server::new_async().await;
    let _m1 = server
        .mock("GET", "/")
        .with_status(429)
        .create_async()
        .await;
    let _m2 = server
        .mock("GET", "/")
        .with_status(200)
        .with_header("content-type", "text/html")
        .with_body("<html><body><h1>ok</h1></body></html>")
        .create_async()
        .await;

    struct RetrySpider(String);

    #[async_trait::async_trait]
    impl Spider for RetrySpider {
        type Item = serde_json::Value;

        fn name(&self) -> &str {
            "retry"
        }

        fn start_urls(&self) -> Vec<String> {
            vec![self.0.clone()]
        }

        async fn parse(&self, _res: &Response) -> Result<Output<Self::Item>, KumoError> {
            Ok(Output::new())
        }
    }

    let stats = CrawlEngine::builder()
        .respect_robots_txt(false)
        .retry(1, std::time::Duration::from_millis(1))
        .middleware(StatusRetry::new())
        .store(StdoutStore)
        .run(RetrySpider(server.url()))
        .await
        .unwrap();

    assert_eq!(stats.pages_crawled, 1);
    assert_eq!(stats.errors, 0);
}

#[tokio::test]
async fn retry_policy_exhausted_counts_as_single_error() {
    let calls = Arc::new(AtomicU32::new(0));
    let fetcher = SequentialFetcher {
        calls: calls.clone(),
        fail_times: 100,
    };

    struct S;

    #[async_trait::async_trait]
    impl Spider for S {
        type Item = serde_json::Value;

        fn name(&self) -> &str {
            "retry-exhaust"
        }

        fn start_urls(&self) -> Vec<String> {
            vec!["https://example.com/".into()]
        }

        async fn parse(&self, _r: &Response) -> Result<Output<Self::Item>, KumoError> {
            Ok(Output::new())
        }
    }

    let stats = CrawlEngine::builder()
        .fetcher(fetcher)
        .middleware(StatusRetry::new())
        .retry_policy(
            RetryPolicy::new(3)
                .base_delay(std::time::Duration::from_millis(1))
                .max_delay(std::time::Duration::from_millis(4)),
        )
        .respect_robots_txt(false)
        .store(StdoutStore)
        .run(S)
        .await
        .unwrap();

    assert_eq!(stats.errors, 1, "all retries exhausted = one error");
    assert_eq!(stats.pages_crawled, 0);
    assert_eq!(
        calls.load(Ordering::SeqCst),
        4,
        "initial + 3 retries = 4 fetches"
    );
}

#[tokio::test]
async fn retry_policy_succeeds_on_later_attempt() {
    let calls = Arc::new(AtomicU32::new(0));
    let fetcher = SequentialFetcher {
        calls: calls.clone(),
        fail_times: 2,
    };

    struct S;

    #[async_trait::async_trait]
    impl Spider for S {
        type Item = serde_json::Value;

        fn name(&self) -> &str {
            "retry-success"
        }

        fn start_urls(&self) -> Vec<String> {
            vec!["https://example.com/".into()]
        }

        async fn parse(&self, res: &Response) -> Result<Output<Self::Item>, KumoError> {
            let title = res.css("h1").first().map(|e| e.text()).unwrap_or_default();
            Ok(Output::new().item(serde_json::json!({ "title": title })))
        }
    }

    let store = VecStore::default();
    let stats = CrawlEngine::builder()
        .fetcher(fetcher)
        .middleware(StatusRetry::new())
        .retry_policy(
            RetryPolicy::new(3)
                .base_delay(std::time::Duration::from_millis(1))
                .max_delay(std::time::Duration::from_millis(4)),
        )
        .respect_robots_txt(false)
        .store(store.clone())
        .run(S)
        .await
        .unwrap();

    assert_eq!(stats.errors, 0, "succeeded after 2 failures");
    assert_eq!(stats.pages_crawled, 1);
    assert_eq!(stats.items_scraped, 1);
    assert_eq!(
        calls.load(Ordering::SeqCst),
        3,
        "2 failures + 1 success = 3 fetches"
    );
    assert_eq!(store.collected()[0]["title"], "ok");
}

#[tokio::test]
async fn status_retry_uses_retry_after_before_requeueing() {
    let mut server = mockito::Server::new_async().await;
    let _m1 = server
        .mock("GET", "/")
        .with_status(429)
        .with_header("retry-after", "1")
        .create_async()
        .await;
    let _m2 = server
        .mock("GET", "/")
        .with_status(200)
        .with_header("content-type", "text/html")
        .with_body("<html><body><h1>ok</h1></body></html>")
        .create_async()
        .await;

    struct RetrySpider(String);

    #[async_trait::async_trait]
    impl Spider for RetrySpider {
        type Item = serde_json::Value;

        fn name(&self) -> &str {
            "retry-after"
        }

        fn start_urls(&self) -> Vec<String> {
            vec![self.0.clone()]
        }

        async fn parse(&self, _res: &Response) -> Result<Output<Self::Item>, KumoError> {
            Ok(Output::new())
        }
    }

    let started = Instant::now();
    let stats = CrawlEngine::builder()
        .respect_robots_txt(false)
        .retry_policy(
            RetryPolicy::new(1)
                .base_delay(Duration::from_millis(1))
                .max_delay(Duration::from_secs(2)),
        )
        .middleware(StatusRetry::new())
        .store(StdoutStore)
        .run(RetrySpider(server.url()))
        .await
        .unwrap();

    assert_eq!(stats.pages_crawled, 1);
    assert_eq!(stats.errors, 0);
    assert!(
        started.elapsed() >= Duration::from_millis(900),
        "retry should respect Retry-After delay"
    );
}