kumo 0.2.1

An async web crawling framework for Rust — Scrapy for Rust
Documentation
use kumo::{
    engine::CrawlEngine,
    error::KumoError,
    extract::Response,
    fetch::{Fetcher, MockFetcher},
    middleware::FetchRequest,
    spider::{Output, Spider},
};
use std::sync::{
    Arc,
    atomic::{AtomicU32, Ordering},
};
use tokio_stream::StreamExt;

#[tokio::test]
async fn stream_yields_items_as_scraped() {
    struct TitleStreamSpider;

    #[async_trait::async_trait]
    impl Spider for TitleStreamSpider {
        type Item = serde_json::Value;

        fn name(&self) -> &str {
            "title-stream"
        }

        fn start_urls(&self) -> Vec<String> {
            vec!["https://example.com".into()]
        }

        async fn parse(&self, res: &Response) -> Result<Output<Self::Item>, KumoError> {
            let title = res.css("h1").first().map(|e| e.text()).unwrap_or_default();
            Ok(Output::new().item(serde_json::json!({ "title": title })))
        }
    }

    let mock =
        MockFetcher::new().with_response("https://example.com", 200, "<h1>Stream Works</h1>");

    let mut stream = CrawlEngine::builder()
        .fetcher(mock)
        .respect_robots_txt(false)
        .stream(TitleStreamSpider)
        .await
        .unwrap();

    let item = stream.next().await.expect("stream should yield one item");
    assert_eq!(item["title"], "Stream Works");
    assert!(
        stream.next().await.is_none(),
        "stream should end after all items"
    );
}

#[tokio::test]
async fn dropping_stream_does_not_panic() {
    struct MultiPageSpider;

    #[async_trait::async_trait]
    impl Spider for MultiPageSpider {
        type Item = serde_json::Value;

        fn name(&self) -> &str {
            "multi-stream"
        }

        fn start_urls(&self) -> Vec<String> {
            vec![
                "https://example.com/1".into(),
                "https://example.com/2".into(),
            ]
        }

        async fn parse(&self, res: &Response) -> Result<Output<Self::Item>, KumoError> {
            Ok(Output::new().item(serde_json::json!({ "url": res.url() })))
        }
    }

    let mock = MockFetcher::new().with_default(200, "<p>page</p>");

    let stream = CrawlEngine::builder()
        .fetcher(mock)
        .respect_robots_txt(false)
        .stream(MultiPageSpider)
        .await
        .unwrap();

    drop(stream);
    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}

#[tokio::test]
async fn dropping_stream_stops_background_crawl() {
    struct CountingFetcher {
        calls: Arc<AtomicU32>,
    }

    #[async_trait::async_trait]
    impl Fetcher for CountingFetcher {
        async fn fetch(&self, req: &FetchRequest) -> Result<Response, KumoError> {
            self.calls.fetch_add(1, Ordering::SeqCst);
            Ok(Response::from_parts(req.url(), 200, "<html></html>"))
        }
    }

    struct EndlessSpider;

    #[async_trait::async_trait]
    impl Spider for EndlessSpider {
        type Item = serde_json::Value;

        fn name(&self) -> &str {
            "endless-stream"
        }

        fn start_urls(&self) -> Vec<String> {
            vec!["https://example.com/page/0".into()]
        }

        async fn parse(&self, res: &Response) -> Result<Output<Self::Item>, KumoError> {
            let current = res
                .url()
                .rsplit('/')
                .next()
                .and_then(|n| n.parse::<u32>().ok())
                .unwrap_or_default();
            Ok(Output::new()
                .item(serde_json::json!({ "page": current }))
                .follow(format!("https://example.com/page/{}", current + 1)))
        }
    }

    let calls = Arc::new(AtomicU32::new(0));
    let fetcher = CountingFetcher {
        calls: calls.clone(),
    };

    let mut stream = CrawlEngine::builder()
        .concurrency(1)
        .stream_buffer(1)
        .fetcher(fetcher)
        .respect_robots_txt(false)
        .stream(EndlessSpider)
        .await
        .unwrap();

    let first = stream.next().await.expect("stream should yield one item");
    assert_eq!(first["page"], 0);

    drop(stream);
    tokio::time::sleep(std::time::Duration::from_millis(50)).await;

    assert!(
        calls.load(Ordering::SeqCst) <= 2,
        "background crawl kept fetching after stream drop: {} calls",
        calls.load(Ordering::SeqCst)
    );
}