Skip to main content

spider_util/
stream_response.rs

1//! Stream response implementation for memory-efficient web scraping.
2//!
3//! This module provides stream response capabilities that allow processing
4//! of large responses without loading the entire body into memory at once.
5
6use crate::response::{Link, LinkType, Response};
7use bytes::Bytes;
8use dashmap::DashMap;
9use futures_util::StreamExt;
10use futures_util::stream::Stream;
11use http::StatusCode;
12use reqwest::header::HeaderMap;
13use scraper::Html;
14use serde_json::Value;
15use std::{borrow::Cow, pin::Pin};
16use url::Url;
17
18use std::fmt;
19
20/// A stream response that allows processing of large responses without
21/// loading the entire body into memory at once.
22pub struct StreamResponse {
23    /// The final URL of the response after any redirects.
24    pub url: Url,
25    /// The HTTP status code of the response.
26    pub status: StatusCode,
27    /// The headers of the response.
28    pub headers: HeaderMap,
29    /// The body of the response as a stream of Bytes chunks.
30    pub body_stream: Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>,
31    /// The original URL of the request that led to this response.
32    pub request_url: Url,
33    /// Metadata associated with the response, carried over from the request.
34    pub meta: DashMap<Cow<'static, str>, Value>,
35    /// Indicates if the response was served from a cache.
36    pub cached: bool,
37}
38
39impl fmt::Debug for StreamResponse {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        f.debug_struct("StreamResponse")
42            .field("url", &self.url)
43            .field("status", &self.status)
44            .field("headers", &self.headers)
45            .field("request_url", &self.request_url)
46            .field("cached", &self.cached)
47            .finish()
48    }
49}
50
51impl StreamResponse {
52    /// Converts the stream response to a regular response by collecting all body chunks.
53    /// This defeats the purpose of streaming but provides compatibility with existing code.
54    pub async fn to_response(self) -> Result<Response, std::io::Error> {
55        let mut body_bytes = Vec::new();
56        let mut stream = self.body_stream;
57
58        while let Some(chunk_result) = stream.next().await {
59            let chunk = chunk_result?;
60            body_bytes.extend_from_slice(&chunk);
61        }
62
63        Ok(Response {
64            url: self.url,
65            status: self.status,
66            headers: self.headers,
67            body: bytes::Bytes::from(body_bytes),
68            request_url: self.request_url,
69            meta: self.meta,
70            cached: self.cached,
71        })
72    }
73
74    /// Provides a way to parse the stream response as HTML by collecting chunks
75    /// until enough data is available for parsing.
76    /// Note: This consumes the stream response to collect all data.
77    pub async fn into_html(self) -> Result<Html, std::io::Error> {
78        let mut body_bytes = Vec::new();
79        let mut stream = self.body_stream;
80
81        while let Some(chunk_result) = stream.next().await {
82            let chunk = chunk_result?;
83            body_bytes.extend_from_slice(&chunk);
84        }
85
86        let body_str = std::str::from_utf8(&body_bytes)
87            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
88
89        Ok(Html::parse_document(body_str))
90    }
91
92    /// Extracts links from the stream response by consuming and parsing the content.
93    /// Note: This consumes the stream response to collect all data.
94    pub async fn into_links(self) -> Result<Vec<Link>, std::io::Error> {
95        let base_url = self.url.clone();
96        let html = self.into_html().await?;
97        let mut links = Vec::new();
98
99        let selectors = vec![
100            ("a[href]", "href"),
101            ("link[href]", "href"),
102            ("script[src]", "src"),
103            ("img[src]", "src"),
104            ("audio[src]", "src"),
105            ("video[src]", "src"),
106            ("source[src]", "src"),
107        ];
108
109        for (selector_str, attr_name) in selectors {
110            if let Ok(selector) = scraper::Selector::parse(selector_str) {
111                for element in html.select(&selector) {
112                    if let Some(attr_value) = element.value().attr(attr_name)
113                        && let Ok(url) = base_url.join(attr_value)
114                    {
115                        let link_type = match element.value().name() {
116                            "a" => LinkType::Page,
117                            "link" => LinkType::Stylesheet,
118                            "script" => LinkType::Script,
119                            "img" => LinkType::Image,
120                            "audio" | "video" | "source" => LinkType::Media,
121                            _ => LinkType::Other(element.value().name().to_string()),
122                        };
123                        links.push(Link { url, link_type });
124                    }
125                }
126            }
127        }
128
129        Ok(links)
130    }
131}
132