seedframe_webscraper/
lib.rs1use async_trait::async_trait;
7use chrono::Utc;
8use scraper::{Html, Selector};
9use seedframe::document::Document;
10use seedframe::loader::Loader;
11use serde::de::Error;
12use serde::Deserialize;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::broadcast::{self, Receiver, Sender};
16use tokio::sync::Mutex;
17
18#[derive(Debug, Deserialize)]
33#[serde(deny_unknown_fields)]
34struct Config {
35 url: String,
36 interval: Option<u64>,
37 selector: Option<String>,
38}
39
40pub struct WebScraper {
62 sender: Arc<Mutex<Sender<Document>>>,
63}
64
65impl WebScraper {
66 #[allow(clippy::missing_panics_doc)]
67 pub fn new(json_str: Option<&str>) -> Result<Self, serde_json::Error> {
74 if json_str.is_none() {
75 Err(serde_json::Error::custom(
76 "Expected a json config with atleast the `url` field specified! ",
77 ))?;
78 }
79 let config: Config = serde_json::from_str(json_str.unwrap())?;
80 let (sender, _) = broadcast::channel(1);
81 let sender = Arc::new(Mutex::new(sender));
82
83 let url = config.url;
84 let interval = config.interval.map(Duration::from_secs);
85 let selector = config.selector;
86
87 let task_sender = Arc::clone(&sender);
88 tokio::spawn(async move {
89 let run_once = interval.is_none();
90 let selector = selector.and_then(|s| Selector::parse(&s).ok());
91
92 loop {
93 match Self::fetch_and_parse(&url, selector.as_ref()).await {
94 Ok(document) => {
95 let sender = task_sender.lock().await;
96 let _ = sender.send(document);
97 }
98 Err(e) => eprintln!("Scraping failed: {e}"),
99 }
100
101 if run_once {
102 break;
103 }
104
105 if let Some(dur) = interval {
106 tokio::time::sleep(dur).await;
107 } else {
108 break;
109 }
110 }
111 });
112
113 Ok(Self { sender })
114 }
115
116 async fn fetch_and_parse(
118 url: &str,
119 selector: Option<&Selector>,
120 ) -> Result<Document, reqwest::Error> {
121 let html = reqwest::get(url).await?.text().await?;
122 let data = match selector {
123 Some(sel) => Html::parse_document(&html)
124 .select(sel)
125 .map(|e| e.html())
126 .collect::<Vec<_>>()
127 .join("\n"),
128 None => html,
129 };
130
131 Ok(Document {
132 id: format!("{}-{}", url, Utc::now().timestamp_millis()),
133 data,
134 })
135 }
136}
137
138#[async_trait]
139impl Loader for WebScraper {
140 async fn subscribe(&self) -> Receiver<Document> {
141 self.sender.lock().await.subscribe()
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148 use tokio::time::{self, Duration};
149
150 #[test]
151 fn test_config_deserialization() {
152 let json = r#"{
153 "url": "https://example.com",
154 "interval": 60,
155 "selector": "div.content"
156 }"#;
157
158 let config: Result<Config, _> = serde_json::from_str(json);
159 assert!(config.is_ok());
160 let config = config.unwrap();
161 assert_eq!(config.url, "https://example.com");
162 assert_eq!(config.interval, Some(60));
163 assert_eq!(config.selector, Some("div.content".to_string()));
164 }
165
166 #[tokio::test]
167 async fn test_fetch_and_parse_with_selector() {
168 let mut mock_server = mockito::Server::new_async().await;
169 let url = mock_server.url();
170 let mock_server = mock_server
171 .mock("GET", "/")
172 .with_status(200)
173 .with_body(r#"<html><div class="content">Test</div></html>"#)
174 .create();
175
176 let selector = Selector::parse("div.content").unwrap();
177 let selector = Some(&selector);
178 let result = WebScraper::fetch_and_parse(&url, selector).await;
179
180 mock_server.assert();
181 assert!(result.is_ok());
182 let doc = result.unwrap();
183 assert!(doc.data.contains("Test"));
184 assert!(!doc.data.contains("html"));
185 }
186
187 #[tokio::test]
188 async fn test_full_loader_cycle() {
189 let mut mock_server = mockito::Server::new_async().await;
190 let url = mock_server.url();
191 let _ = mock_server
192 .mock("GET", "/")
193 .with_body("Test Content")
194 .create();
195
196 let json = format!(
197 r#"{{
198 "url": "{}",
199 "interval": 1,
200 "selector": null
201 }}"#,
202 url
203 );
204
205 let scraper = WebScraper::new(Some(&json)).unwrap();
206 let mut receiver = scraper.subscribe().await;
207
208 let first = receiver.recv().await.unwrap();
209
210 let _ = mock_server
211 .mock("GET", "/")
212 .with_body("Just Content")
213 .create();
214
215 let second = time::timeout(Duration::from_secs(2), receiver.recv())
216 .await
217 .expect("Didn't receive second message")
218 .unwrap();
219
220 assert_ne!(first.id, second.id);
221 assert_eq!(first.data, "Test Content");
222 assert_eq!(second.data, "Just Content");
223 }
224
225 #[tokio::test]
226 #[ignore]
227 async fn test_one_time_scraping() {
228 let server = mockito::Server::new_async();
229 let url = server.await.url();
230 let json = &format!("{{\"url\": \"{}\"}}", url);
231
232 let scraper = WebScraper::new(Some(json)).unwrap();
233 let mut receiver = scraper.subscribe().await;
234
235 let mut received = Vec::new();
236 while let Ok(doc) = tokio::time::timeout(Duration::from_millis(100), receiver.recv()).await
237 {
238 received.push(doc.unwrap());
239 }
240
241 assert_eq!(received.len(), 1);
242 assert!(&received.first().unwrap().id.starts_with(&format!("{url}")));
243 }
244
245 #[tokio::test]
246 async fn test_invalid_url_handling() {
247 let json = r#"{"url": "invalid://url", "interval": null}"#;
248 let scraper = WebScraper::new(Some(json)).unwrap();
249 let mut receiver = scraper.subscribe().await;
250
251 let result = time::timeout(Duration::from_secs(1), receiver.recv()).await;
252 assert!(result.is_err());
253 }
254}