swiftide_integrations/scraping/
loader.rs1use derive_builder::Builder;
2use spider::website::Website;
3
4use swiftide_core::{
5 Loader,
6 indexing::{IndexingStream, Node},
7};
8
9#[derive(Debug, Builder, Clone)]
10#[builder(pattern = "owned")]
11pub struct ScrapingLoader {
16 spider_website: Website,
17}
18
19impl ScrapingLoader {
20 pub fn builder() -> ScrapingLoaderBuilder {
21 ScrapingLoaderBuilder::default()
22 }
23
24 #[allow(dead_code)]
26 pub fn from_spider(spider_website: Website) -> Self {
27 Self { spider_website }
28 }
29
30 pub fn from_url(url: impl AsRef<str>) -> Self {
32 Self::from_spider(Website::new(url.as_ref()))
33 }
34}
35
36impl Loader for ScrapingLoader {
37 fn into_stream(mut self) -> IndexingStream {
38 let (tx, rx) = tokio::sync::mpsc::channel(1000);
39 let mut spider_rx = self
40 .spider_website
41 .subscribe(0)
42 .expect("Failed to subscribe to spider");
43 tracing::info!("Subscribed to spider");
44
45 let _recv_thread = tokio::spawn(async move {
46 while let Ok(res) = spider_rx.recv().await {
47 let html = res.get_html();
48 let original_size = html.len();
49
50 let node = Node::builder()
51 .chunk(html)
52 .original_size(original_size)
53 .path(res.get_url())
54 .build();
55
56 tracing::debug!(?node, "[Spider] Received node from spider");
57
58 if let Err(error) = tx.send(node).await {
59 tracing::error!(?error, "[Spider] Failed to send node to stream");
60 break;
61 }
62 }
63 });
64
65 let mut spider_website = self.spider_website;
66
67 let _scrape_thread = tokio::spawn(async move {
68 tracing::info!("[Spider] Starting scrape loop");
69 spider_website.crawl().await;
72 tracing::info!("[Spider] Scrape loop finished");
73 });
74
75 rx.into()
77 }
78
79 fn into_stream_boxed(self: Box<Self>) -> IndexingStream {
80 self.into_stream()
81 }
82}
83
84#[cfg(test)]
85mod tests {
86 use super::*;
87 use anyhow::Result;
88 use futures_util::StreamExt;
89 use swiftide_core::indexing::Loader;
90 use wiremock::matchers::{method, path};
91 use wiremock::{Mock, MockServer, Request, ResponseTemplate};
92
93 #[test_log::test(tokio::test(flavor = "multi_thread"))]
94 async fn test_scraping_loader_with_wiremock() {
95 let mock_server = MockServer::start().await;
97
98 let body = "<html><body><h1>Test Page</h1></body></html>";
100 Mock::given(method("GET"))
101 .and(path("/"))
102 .respond_with(ResponseTemplate::new(200).set_body_string(body))
103 .mount(&mock_server)
104 .await;
105
106 let loader = ScrapingLoader::from_url(mock_server.uri());
108
109 let stream = loader.into_stream();
111
112 let nodes = stream.collect::<Vec<Result<Node>>>().await;
114
115 assert_eq!(nodes.len(), 1);
116
117 let first_node = nodes.first().unwrap().as_ref().unwrap();
118
119 assert_eq!(first_node.chunk, body);
120 }
121
122 #[test_log::test(tokio::test(flavor = "multi_thread"))]
123 async fn test_scraping_loader_multiple_pages() {
124 let mock_server = MockServer::start().await;
126
127 let body = "<html><body><h1>Test Page</h1><a href=\"/other\">link</a></body></html>";
129 Mock::given(method("GET"))
130 .and(path("/"))
131 .respond_with(ResponseTemplate::new(200).set_body_string(body))
132 .mount(&mock_server)
133 .await;
134
135 let body2 = "<html><body><h1>Test Page 2</h1></body></html>";
136 Mock::given(method("GET"))
137 .and(path("/other"))
138 .respond_with(move |_req: &Request| {
139 std::thread::sleep(std::time::Duration::from_secs(1));
140 ResponseTemplate::new(200).set_body_string(body2)
141 })
142 .mount(&mock_server)
143 .await;
144
145 let loader = ScrapingLoader::from_url(mock_server.uri());
147
148 let stream = loader.into_stream();
150
151 let mut nodes = stream.collect::<Vec<Result<Node>>>().await;
153
154 assert_eq!(nodes.len(), 2);
155
156 let first_node = nodes.pop().unwrap().unwrap();
157
158 assert_eq!(first_node.chunk, body2);
159
160 let second_node = nodes.pop().unwrap().unwrap();
161
162 assert_eq!(second_node.chunk, body);
163 }
164}