spider_util/
stream_response.rs1use crate::response::{Link, LinkType, Response};
7use bytes::Bytes;
8use dashmap::DashMap;
9use futures_util::StreamExt;
10use futures_util::stream::Stream;
11use http::StatusCode;
12use reqwest::header::HeaderMap;
13use scraper::Html;
14use serde_json::Value;
15use std::{borrow::Cow, pin::Pin};
16use url::Url;
17
18use std::fmt;
19
20pub struct StreamResponse {
23 pub url: Url,
25 pub status: StatusCode,
27 pub headers: HeaderMap,
29 pub body_stream: Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>,
31 pub request_url: Url,
33 pub meta: DashMap<Cow<'static, str>, Value>,
35 pub cached: bool,
37}
38
39impl fmt::Debug for StreamResponse {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 f.debug_struct("StreamResponse")
42 .field("url", &self.url)
43 .field("status", &self.status)
44 .field("headers", &self.headers)
45 .field("request_url", &self.request_url)
46 .field("cached", &self.cached)
47 .finish()
48 }
49}
50
51impl StreamResponse {
52 pub async fn to_response(self) -> Result<Response, std::io::Error> {
55 let mut body_bytes = Vec::new();
56 let mut stream = self.body_stream;
57
58 while let Some(chunk_result) = stream.next().await {
59 let chunk = chunk_result?;
60 body_bytes.extend_from_slice(&chunk);
61 }
62
63 Ok(Response {
64 url: self.url,
65 status: self.status,
66 headers: self.headers,
67 body: bytes::Bytes::from(body_bytes),
68 request_url: self.request_url,
69 meta: self.meta,
70 cached: self.cached,
71 })
72 }
73
74 pub async fn into_html(self) -> Result<Html, std::io::Error> {
78 let mut body_bytes = Vec::new();
79 let mut stream = self.body_stream;
80
81 while let Some(chunk_result) = stream.next().await {
82 let chunk = chunk_result?;
83 body_bytes.extend_from_slice(&chunk);
84 }
85
86 let body_str = std::str::from_utf8(&body_bytes)
87 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
88
89 Ok(Html::parse_document(body_str))
90 }
91
92 pub async fn into_links(self) -> Result<Vec<Link>, std::io::Error> {
95 let base_url = self.url.clone();
96 let html = self.into_html().await?;
97 let mut links = Vec::new();
98
99 let selectors = vec![
100 ("a[href]", "href"),
101 ("link[href]", "href"),
102 ("script[src]", "src"),
103 ("img[src]", "src"),
104 ("audio[src]", "src"),
105 ("video[src]", "src"),
106 ("source[src]", "src"),
107 ];
108
109 for (selector_str, attr_name) in selectors {
110 if let Ok(selector) = scraper::Selector::parse(selector_str) {
111 for element in html.select(&selector) {
112 if let Some(attr_value) = element.value().attr(attr_name)
113 && let Ok(url) = base_url.join(attr_value)
114 {
115 let link_type = match element.value().name() {
116 "a" => LinkType::Page,
117 "link" => LinkType::Stylesheet,
118 "script" => LinkType::Script,
119 "img" => LinkType::Image,
120 "audio" | "video" | "source" => LinkType::Media,
121 _ => LinkType::Other(element.value().name().to_string()),
122 };
123 links.push(Link { url, link_type });
124 }
125 }
126 }
127 }
128
129 Ok(links)
130 }
131}
132