recursive_scraper/
schedule.rs

1use anyhow::Result;
2use bytes::Bytes;
3use futures::StreamExt;
4use log::{debug, error, info};
5
6use reqwest::{Client, Response, Url};
7use std::{collections::BTreeSet, time::Duration};
8use tokio::time::{sleep, timeout, Instant};
9
10use crate::{
11    config::SchedulerConfig,
12    file::FileContent,
13    io::{save_file, Writer},
14    middle::{spawn_process, spawn_request, Conclusion},
15    state::SchedulerState,
16    urls::Record,
17};
18
19pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
20pub const TIMEOUT_MULTIPLIER: u32 = 5;
21pub const WRITE_FREQUENCY: usize = 8;
22pub const RECORD_DIR: &str = "record.toml";
23
24pub fn client_with_timeout(timeout: Duration) -> Client {
25    Client::builder()
26        .connect_timeout(timeout)
27        .timeout(timeout * TIMEOUT_MULTIPLIER)
28        .build()
29        .expect("Failed to build the client.")
30}
31
32pub fn default_client() -> Client {
33    client_with_timeout(DEFAULT_TIMEOUT)
34}
35
36#[derive(Debug)]
37pub struct Scheduler {
38    cfg: SchedulerConfig,
39    client: Client,
40    rec: Record,
41    s: SchedulerState,
42}
43
44impl Default for Scheduler {
45    fn default() -> Self {
46        Self::new(SchedulerConfig::default())
47    }
48}
49
50impl Scheduler {
51    pub fn from_client(client: Client, cfg: SchedulerConfig) -> Self {
52        Self {
53            cfg,
54            client,
55            rec: Record::default(),
56            s: SchedulerState::default(),
57        }
58    }
59
60    pub fn new(cfg: SchedulerConfig) -> Self {
61        Self::from_client(default_client(), cfg)
62    }
63
64    pub fn delaying_requests(&self) -> bool {
65        self.s.time.elapsed() < self.cfg.delay
66    }
67
68    pub fn add_pending(&mut self, url: Url) {
69        if let Ok(index) = self.rec.check_add_url(url) {
70            self.s.pending.push_back(index);
71        }
72    }
73
74    pub fn add_next_pending(&mut self, url: Url) {
75        if let Some(ref mut ring) = self.cfg.ring {
76            if let Ok(index) = self.rec.check_add_url(url) {
77                ring.next.push_back(index);
78            }
79        }
80    }
81
82    pub async fn spawn_one_request(&mut self) -> bool {
83        let url_id = match self.s.pending.pop_front() {
84            Some(url_id) => url_id,
85            None => return false,
86        };
87        let url = self.rec.url_ids.get(&url_id).unwrap().to_owned();
88        info!("Requesting {url_id} | {url}.");
89        self.s
90            .requests
91            .push(spawn_request(url_id, self.client.get(url)).await);
92        true
93    }
94
95    pub async fn check_requests(&mut self) {
96        while self.check_one_request().await && self.delaying_requests() {}
97    }
98
99    pub async fn check_one_request(&mut self) -> bool {
100        let result = match timeout(Duration::ZERO, self.s.requests.next()).await {
101            Ok(r) => r,
102            Err(_) => return false,
103        };
104        let result = match result {
105            Some(r) => r,
106            None => return false,
107        };
108        match result {
109            Ok((url_id, response_result)) => match response_result {
110                Ok(response) => self.process_response(url_id, response).await,
111                Err(err) => {
112                    error!("{url_id}: {err}.");
113                    self.fail(url_id)
114                }
115            },
116            Err(err) => error!("Request: {}", err),
117        }
118        true
119    }
120
121    pub async fn check_processes(&mut self) {
122        while self.check_one_process().await && self.delaying_requests() {}
123    }
124
125    pub async fn check_one_process(&mut self) -> bool {
126        let result = match timeout(Duration::ZERO, self.s.processes.next()).await {
127            Ok(r) => r,
128            Err(_) => return false,
129        };
130        let result = match result {
131            Some(r) => r,
132            None => return false,
133        };
134        match result {
135            Ok((url_id, process_result)) => match process_result {
136                Ok(content) => self.s.conclusions.push_back(Conclusion { url_id, content }),
137                Err(err) => {
138                    error!("{url_id}: {err}.");
139                    self.fail(url_id)
140                }
141            },
142            Err(err) => error!("Request: {}", err),
143        }
144        true
145    }
146
147    async fn process_response(&mut self, url_id: usize, response: Response) {
148        let final_url_id = match self.rec.check_final_url(url_id, &response).await {
149            Some(id) => id,
150            None => return,
151        };
152        debug!("Processing {final_url_id}.");
153        self.s
154            .processes
155            .push(spawn_process(final_url_id, response).await);
156    }
157
158    pub async fn process_conclusions(&mut self) {
159        while self.process_one_conclusion().await && self.delaying_requests() {}
160    }
161
162    pub async fn process_one_conclusion(&mut self) -> bool {
163        let Conclusion { url_id, content } = match self.s.conclusions.pop_front() {
164            Some(conclusion) => conclusion,
165            None => return false, // No conclusions pending.
166        };
167        match content {
168            FileContent::Html(text, hrefs, imgs) => {
169                self.process_html(url_id, text, hrefs, imgs).await
170            }
171            FileContent::Other(extension, bytes) => {
172                self.process_other(url_id, &extension, bytes).await
173            }
174        }
175        .unwrap_or_else(|err| {
176            error!("{url_id}: {err}.");
177            self.fail(url_id);
178        });
179        true
180    }
181
182    async fn process_html(
183        &mut self,
184        url_id: usize,
185        text: String,
186        hrefs: BTreeSet<Url>,
187        imgs: BTreeSet<Url>,
188    ) -> Result<()> {
189        for href in hrefs {
190            let href_str = href.as_str();
191            if !self.cfg.blacklist.is_match(href_str) {
192                if self.cfg.filter.is_match(href_str) {
193                    self.add_pending(href);
194                } else {
195                    self.add_next_pending(href);
196                }
197            } else {
198                _ = self.rec.check_add_url(href)
199            }
200        }
201        if !self.cfg.disregard_other {
202            for img in imgs {
203                // Not filtering images.
204                self.add_pending(img);
205            }
206        }
207        if !self.cfg.disregard_html {
208            save_file(
209                &format!("{}/{url_id}.html", self.cfg.html_dir),
210                text.as_bytes(),
211            )
212            .await?;
213        }
214        Ok(())
215    }
216
217    async fn process_other(&mut self, url_id: usize, extension: &str, bytes: Bytes) -> Result<()> {
218        if self.cfg.disregard_other {
219            return Ok(());
220        }
221        save_file(
222            &format!("{}/{url_id}{extension}", self.cfg.other_dir),
223            &bytes,
224        )
225        .await?;
226        Ok(())
227    }
228
229    /// Recursively scrape until there are no more pending URLs.
230    pub async fn recursion(&mut self) {
231        self.s.time = Instant::now();
232        let mut state_lens = self.s.lens();
233        let mut record_lens = self.rec.lens();
234        let mut changes: usize = 0;
235        while self.s.has_more_tasks() || self.increment_ring() {
236            self.one_cycle().await;
237            if state_lens != self.s.lens() {
238                state_lens = self.s.lens();
239                debug!(
240                    "{} pending, {} requests, {} processes, {} conclusions.",
241                    state_lens.0, state_lens.1, state_lens.2, state_lens.3
242                );
243            }
244            if record_lens != self.rec.lens() {
245                changes += 1;
246                record_lens = self.rec.lens();
247                if changes % WRITE_FREQUENCY == 0 {
248                    self.write().await;
249                }
250            }
251        }
252
253        self.write_all().await;
254    }
255
256    fn increment_ring(&mut self) -> bool {
257        if let Some(ref mut ring) = self.cfg.ring {
258            if let Some(pending) = ring.increment() {
259                self.s.pending = pending;
260                return true;
261            }
262        }
263        false
264    }
265
266    async fn one_cycle(&mut self) {
267        self.check_spawn_request().await;
268        self.check_requests().await;
269        self.check_spawn_request().await;
270        self.check_processes().await;
271        self.check_spawn_request().await;
272        self.process_conclusions().await;
273        self.check_spawn_request().await;
274        sleep(self.cfg.delay.saturating_sub(self.s.time.elapsed())).await;
275    }
276
277    async fn check_spawn_request(&mut self) {
278        if !self.delaying_requests() && self.spawn_one_request().await {
279            self.s.time += self.cfg.delay;
280        }
281    }
282
283    /// Tell the scheduler to finish whatever is already started
284    /// and do not initiate any more requests.
285    pub async fn finish(&mut self) {
286        self.s.time = Instant::now();
287        while self.s.has_processing() {
288            self.check_requests().await;
289            self.check_processes().await;
290            self.process_one_conclusion().await;
291            sleep(self.cfg.delay.saturating_sub(self.s.time.elapsed())).await;
292            self.s.time += self.cfg.delay;
293        }
294        self.write_all().await;
295    }
296
297    fn fail(&mut self, url_id: usize) {
298        if self.rec.fails.contains(&url_id) {
299            return;
300        }
301        self.rec.fails.insert(url_id);
302        self.s.pending.push_back(url_id);
303    }
304
305    async fn write(&mut self) {
306        {
307            let _ = self.s.writer.take();
308        }
309        self.s.writer = Some(
310            Writer::spawn(
311                format!("{}/{RECORD_DIR}", self.cfg.log_dir),
312                toml::to_string_pretty(&self.rec).unwrap(),
313            )
314            .await,
315        );
316    }
317
318    async fn write_all(&mut self) {
319        for _ in 0..8 {
320            self.write().await;
321            let writer = self.s.writer.take().unwrap();
322            if let Err(err) = writer.wait().await {
323                error!("Write all: {err}.");
324                sleep(Duration::from_secs(1)).await;
325            } else {
326                return;
327            }
328        }
329        error!("Fatal! Write all: all eight attempts failed!");
330    }
331}