1use anyhow::Result;
2use bytes::Bytes;
3use futures::StreamExt;
4use log::{debug, error, info};
5
6use reqwest::{Client, Response, Url};
7use std::{collections::BTreeSet, time::Duration};
8use tokio::time::{sleep, timeout, Instant};
9
10use crate::{
11 config::SchedulerConfig,
12 file::FileContent,
13 io::{save_file, Writer},
14 middle::{spawn_process, spawn_request, Conclusion},
15 state::SchedulerState,
16 urls::Record,
17};
18
19pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
20pub const TIMEOUT_MULTIPLIER: u32 = 5;
21pub const WRITE_FREQUENCY: usize = 8;
22pub const RECORD_DIR: &str = "record.toml";
23
24pub fn client_with_timeout(timeout: Duration) -> Client {
25 Client::builder()
26 .connect_timeout(timeout)
27 .timeout(timeout * TIMEOUT_MULTIPLIER)
28 .build()
29 .expect("Failed to build the client.")
30}
31
32pub fn default_client() -> Client {
33 client_with_timeout(DEFAULT_TIMEOUT)
34}
35
36#[derive(Debug)]
37pub struct Scheduler {
38 cfg: SchedulerConfig,
39 client: Client,
40 rec: Record,
41 s: SchedulerState,
42}
43
44impl Default for Scheduler {
45 fn default() -> Self {
46 Self::new(SchedulerConfig::default())
47 }
48}
49
50impl Scheduler {
51 pub fn from_client(client: Client, cfg: SchedulerConfig) -> Self {
52 Self {
53 cfg,
54 client,
55 rec: Record::default(),
56 s: SchedulerState::default(),
57 }
58 }
59
60 pub fn new(cfg: SchedulerConfig) -> Self {
61 Self::from_client(default_client(), cfg)
62 }
63
64 pub fn delaying_requests(&self) -> bool {
65 self.s.time.elapsed() < self.cfg.delay
66 }
67
68 pub fn add_pending(&mut self, url: Url) {
69 if let Ok(index) = self.rec.check_add_url(url) {
70 self.s.pending.push_back(index);
71 }
72 }
73
74 pub fn add_next_pending(&mut self, url: Url) {
75 if let Some(ref mut ring) = self.cfg.ring {
76 if let Ok(index) = self.rec.check_add_url(url) {
77 ring.next.push_back(index);
78 }
79 }
80 }
81
82 pub async fn spawn_one_request(&mut self) -> bool {
83 let url_id = match self.s.pending.pop_front() {
84 Some(url_id) => url_id,
85 None => return false,
86 };
87 let url = self.rec.url_ids.get(&url_id).unwrap().to_owned();
88 info!("Requesting {url_id} | {url}.");
89 self.s
90 .requests
91 .push(spawn_request(url_id, self.client.get(url)).await);
92 true
93 }
94
95 pub async fn check_requests(&mut self) {
96 while self.check_one_request().await && self.delaying_requests() {}
97 }
98
99 pub async fn check_one_request(&mut self) -> bool {
100 let result = match timeout(Duration::ZERO, self.s.requests.next()).await {
101 Ok(r) => r,
102 Err(_) => return false,
103 };
104 let result = match result {
105 Some(r) => r,
106 None => return false,
107 };
108 match result {
109 Ok((url_id, response_result)) => match response_result {
110 Ok(response) => self.process_response(url_id, response).await,
111 Err(err) => {
112 error!("{url_id}: {err}.");
113 self.fail(url_id)
114 }
115 },
116 Err(err) => error!("Request: {}", err),
117 }
118 true
119 }
120
121 pub async fn check_processes(&mut self) {
122 while self.check_one_process().await && self.delaying_requests() {}
123 }
124
125 pub async fn check_one_process(&mut self) -> bool {
126 let result = match timeout(Duration::ZERO, self.s.processes.next()).await {
127 Ok(r) => r,
128 Err(_) => return false,
129 };
130 let result = match result {
131 Some(r) => r,
132 None => return false,
133 };
134 match result {
135 Ok((url_id, process_result)) => match process_result {
136 Ok(content) => self.s.conclusions.push_back(Conclusion { url_id, content }),
137 Err(err) => {
138 error!("{url_id}: {err}.");
139 self.fail(url_id)
140 }
141 },
142 Err(err) => error!("Request: {}", err),
143 }
144 true
145 }
146
147 async fn process_response(&mut self, url_id: usize, response: Response) {
148 let final_url_id = match self.rec.check_final_url(url_id, &response).await {
149 Some(id) => id,
150 None => return,
151 };
152 debug!("Processing {final_url_id}.");
153 self.s
154 .processes
155 .push(spawn_process(final_url_id, response).await);
156 }
157
158 pub async fn process_conclusions(&mut self) {
159 while self.process_one_conclusion().await && self.delaying_requests() {}
160 }
161
162 pub async fn process_one_conclusion(&mut self) -> bool {
163 let Conclusion { url_id, content } = match self.s.conclusions.pop_front() {
164 Some(conclusion) => conclusion,
165 None => return false, };
167 match content {
168 FileContent::Html(text, hrefs, imgs) => {
169 self.process_html(url_id, text, hrefs, imgs).await
170 }
171 FileContent::Other(extension, bytes) => {
172 self.process_other(url_id, &extension, bytes).await
173 }
174 }
175 .unwrap_or_else(|err| {
176 error!("{url_id}: {err}.");
177 self.fail(url_id);
178 });
179 true
180 }
181
182 async fn process_html(
183 &mut self,
184 url_id: usize,
185 text: String,
186 hrefs: BTreeSet<Url>,
187 imgs: BTreeSet<Url>,
188 ) -> Result<()> {
189 for href in hrefs {
190 let href_str = href.as_str();
191 if !self.cfg.blacklist.is_match(href_str) {
192 if self.cfg.filter.is_match(href_str) {
193 self.add_pending(href);
194 } else {
195 self.add_next_pending(href);
196 }
197 } else {
198 _ = self.rec.check_add_url(href)
199 }
200 }
201 if !self.cfg.disregard_other {
202 for img in imgs {
203 self.add_pending(img);
205 }
206 }
207 if !self.cfg.disregard_html {
208 save_file(
209 &format!("{}/{url_id}.html", self.cfg.html_dir),
210 text.as_bytes(),
211 )
212 .await?;
213 }
214 Ok(())
215 }
216
217 async fn process_other(&mut self, url_id: usize, extension: &str, bytes: Bytes) -> Result<()> {
218 if self.cfg.disregard_other {
219 return Ok(());
220 }
221 save_file(
222 &format!("{}/{url_id}{extension}", self.cfg.other_dir),
223 &bytes,
224 )
225 .await?;
226 Ok(())
227 }
228
229 pub async fn recursion(&mut self) {
231 self.s.time = Instant::now();
232 let mut state_lens = self.s.lens();
233 let mut record_lens = self.rec.lens();
234 let mut changes: usize = 0;
235 while self.s.has_more_tasks() || self.increment_ring() {
236 self.one_cycle().await;
237 if state_lens != self.s.lens() {
238 state_lens = self.s.lens();
239 debug!(
240 "{} pending, {} requests, {} processes, {} conclusions.",
241 state_lens.0, state_lens.1, state_lens.2, state_lens.3
242 );
243 }
244 if record_lens != self.rec.lens() {
245 changes += 1;
246 record_lens = self.rec.lens();
247 if changes % WRITE_FREQUENCY == 0 {
248 self.write().await;
249 }
250 }
251 }
252
253 self.write_all().await;
254 }
255
256 fn increment_ring(&mut self) -> bool {
257 if let Some(ref mut ring) = self.cfg.ring {
258 if let Some(pending) = ring.increment() {
259 self.s.pending = pending;
260 return true;
261 }
262 }
263 false
264 }
265
266 async fn one_cycle(&mut self) {
267 self.check_spawn_request().await;
268 self.check_requests().await;
269 self.check_spawn_request().await;
270 self.check_processes().await;
271 self.check_spawn_request().await;
272 self.process_conclusions().await;
273 self.check_spawn_request().await;
274 sleep(self.cfg.delay.saturating_sub(self.s.time.elapsed())).await;
275 }
276
277 async fn check_spawn_request(&mut self) {
278 if !self.delaying_requests() && self.spawn_one_request().await {
279 self.s.time += self.cfg.delay;
280 }
281 }
282
283 pub async fn finish(&mut self) {
286 self.s.time = Instant::now();
287 while self.s.has_processing() {
288 self.check_requests().await;
289 self.check_processes().await;
290 self.process_one_conclusion().await;
291 sleep(self.cfg.delay.saturating_sub(self.s.time.elapsed())).await;
292 self.s.time += self.cfg.delay;
293 }
294 self.write_all().await;
295 }
296
297 fn fail(&mut self, url_id: usize) {
298 if self.rec.fails.contains(&url_id) {
299 return;
300 }
301 self.rec.fails.insert(url_id);
302 self.s.pending.push_back(url_id);
303 }
304
305 async fn write(&mut self) {
306 {
307 let _ = self.s.writer.take();
308 }
309 self.s.writer = Some(
310 Writer::spawn(
311 format!("{}/{RECORD_DIR}", self.cfg.log_dir),
312 toml::to_string_pretty(&self.rec).unwrap(),
313 )
314 .await,
315 );
316 }
317
318 async fn write_all(&mut self) {
319 for _ in 0..8 {
320 self.write().await;
321 let writer = self.s.writer.take().unwrap();
322 if let Err(err) = writer.wait().await {
323 error!("Write all: {err}.");
324 sleep(Duration::from_secs(1)).await;
325 } else {
326 return;
327 }
328 }
329 error!("Fatal! Write all: all eight attempts failed!");
330 }
331}