1use crate::requests::chapter::ChapterDownloadMeta;
4use crate::requests::{Error, Result};
5use crate::MangoClient;
6
7use std::path::PathBuf;
8use std::sync::Arc;
9
10use parking_lot::Mutex;
11
12use tokio::io::AsyncWriteExt as _;
13use tokio::sync::mpsc::{self, Sender};
14use tokio::task::{self, JoinSet};
15use tokio_stream::wrappers::ReceiverStream;
16use tokio_stream::StreamExt as _;
17
18use bon::bon;
19use bytes::Bytes;
20use reqwest::Response;
21use tracing::instrument::Instrument as _;
22
23#[derive(Debug, Clone)]
24pub(crate) enum PageStatus {
25 Loaded(PathBuf),
26 Loading(usize),
27 Idle,
28}
29
30#[derive(Debug, Clone)]
31pub(crate) enum ManagerCommand {
32 SwitchPage { page_num: usize },
33 DownloadedSuccessfully { page_num: usize },
34 DownloadError { page_num: usize },
35}
36
37#[derive(Debug, Clone, Copy)]
38pub(crate) enum DownloadingsSpawnerCommand {
39 Shutdown,
40 NewDownload { page_num: usize },
41}
42
43#[derive(Debug)]
46pub struct ChapterViewer {
47 pub(crate) statuses: Arc<Mutex<Vec<PageStatus>>>,
48 pub(crate) downloadings: ReceiverStream<usize>,
49 pub(crate) submit_switch: Sender<ManagerCommand>,
50}
51
52impl ChapterViewer {
53 #[tracing::instrument(skip(self))]
54 pub async fn open_page(&mut self, page_num: usize) -> PathBuf {
56 tracing::trace!("entered");
57
58 tracing::trace!("acquiring mutex lock");
59
60 let status = {
61 let statuses = self.statuses.lock();
62 statuses[page_num - 1].clone()
63 };
64
65 tracing::trace!("released mutex lock");
66
67 let sender = self.submit_switch.clone();
68
69 task::spawn(async move {
70 let _ = sender.send(ManagerCommand::SwitchPage { page_num }).await;
73 });
74
75 match status {
76 PageStatus::Loaded(path) => path,
77 _ => {
78 tracing::trace!("starting waiting for page {page_num}");
79
80 let mut res = None;
81 while let Some(downloaded_page_num) = self.downloadings.next().await {
82 tracing::trace!("got signal that page {downloaded_page_num} loaded");
83
84 if downloaded_page_num == page_num {
85 tracing::trace!("acquiring mutex lock");
86
87 let status = {
88 let statuses = self.statuses.lock();
89 statuses[page_num - 1].clone()
90 };
91
92 tracing::trace!("returned mutex lock");
93
94 match status {
95 PageStatus::Loaded(path) => {
96 res = Some(path);
97 break;
98 }
99 _ => panic!(
100 "got message that page was downloaded while it actually was not"
101 ),
102 }
103 }
104 }
105 res.expect("mananager task shutdowned before desired page was downloaded")
106 }
107 }
108 }
109}
110
111#[bon]
112impl MangoClient {
113 pub(crate) fn determine_next_download_page(
114 statuses: &mut parking_lot::MutexGuard<'_, Vec<PageStatus>>,
115 opened_page: usize,
116 ) -> (Option<usize>, bool) {
117 let mut found_loading_pages = false;
118
119 match statuses[opened_page - 1] {
120 PageStatus::Idle => (Some(opened_page), found_loading_pages),
121 _ => {
122 let mut page = opened_page;
123
124 while page != opened_page - 1 {
125 if page == statuses.len() {
126 page = 0;
127
128 if page == opened_page - 1 {
129 break;
130 }
131 }
132
133 match statuses[page] {
134 PageStatus::Idle => {
135 break;
136 }
137 PageStatus::Loading(_) => {
138 found_loading_pages = true;
139 }
140 _ => {}
141 };
142
143 page += 1;
144 }
145
146 if page == opened_page - 1 {
147 if let PageStatus::Loading(_) = statuses[page] {
148 found_loading_pages = true;
149 }
150
151 (None, found_loading_pages)
152 } else {
153 statuses[page] = PageStatus::Loading(0);
154 (Some(page + 1), found_loading_pages)
155 }
156 }
157 }
158 }
159
160 #[builder]
161 #[tracing::instrument(
162 skip(
163 statuses,
164 spawner_command_sender,
165 downloadings_sender,
166 command_receiver
167 ),
168 name = "downloadings_manager"
169 )]
170 pub(crate) async fn downloadings_manager(
171 mut opened_page: usize,
172 statuses: Arc<Mutex<Vec<PageStatus>>>,
173 spawner_command_sender: mpsc::Sender<DownloadingsSpawnerCommand>,
174 downloadings_sender: Option<mpsc::Sender<usize>>,
175 mut command_receiver: ReceiverStream<ManagerCommand>,
176 max_concurrent_downloads: usize,
177 chapter_size: usize,
178 ) {
179 {
180 let mut statuses = statuses.lock();
181
182 for i in 0..chapter_size.min(max_concurrent_downloads) {
183 statuses[i] = PageStatus::Loading(0);
184 }
185 }
186
187 for i in 0..chapter_size.min(max_concurrent_downloads) {
188 spawner_command_sender
189 .send(DownloadingsSpawnerCommand::NewDownload { page_num: i + 1 })
190 .await
191 .expect("task spawner shutdowned before downloading all pages");
192 }
193
194 while let Some(command) = command_receiver.next().await {
195 tracing::debug!("got command: \n{command:#?}\n");
196 tracing::trace!("\nstatuses: {statuses:#?}\n");
197
198 match command {
199 ManagerCommand::SwitchPage { page_num } => {
200 opened_page = page_num;
201 }
202 ManagerCommand::DownloadError { page_num } => {
203 let new_download_command = DownloadingsSpawnerCommand::NewDownload { page_num };
204 spawner_command_sender
205 .send(new_download_command)
206 .await
207 .expect("downloadings_spawner shutdowned before downloading all pages");
208
209 tracing::trace!("manager sent {new_download_command:#?} command to set");
210 }
211 ManagerCommand::DownloadedSuccessfully { page_num } => {
212 if let Some(sender) = &downloadings_sender {
215 if let Err(e) = sender.send(page_num).await {
216 tracing::debug!(
217 "got error {e}: downloadings receiver dropped, shutting down downloadings manager"
218 );
219
220 let command = DownloadingsSpawnerCommand::Shutdown;
221 spawner_command_sender.send(command).await.expect(
222 "downloadings_spawner shutdowned before the respectful command",
223 );
224
225 tracing::trace!("manager sent {command:#?} command to set");
226
227 break;
228 }
229 };
230
231 let (next_download_page, found_loading_pages) =
232 Self::determine_next_download_page(&mut statuses.lock(), opened_page);
233
234 match next_download_page {
235 Some(page) => {
236 let command =
237 DownloadingsSpawnerCommand::NewDownload { page_num: page };
238
239 spawner_command_sender.send(command).await.expect(
240 "downloadings_spawner shutdowned before downloading all pages",
241 );
242
243 tracing::trace!("manager sent {command:#?} command to set");
244 }
245 None => {
246 if !found_loading_pages {
247 let command = DownloadingsSpawnerCommand::Shutdown;
248 spawner_command_sender.send(command).await.expect(
249 "join_set task shutdowned before the respectful command",
250 );
251
252 tracing::trace!("manager sent {command:#?} command to set");
253
254 break;
255 }
256 }
257 };
258 }
259 }
260 }
261 tracing::debug!("shutdowned");
262 }
263
264 pub(crate) async fn handle_downloading_page_error(
265 error: Error,
266 manager_command_sender: &mpsc::Sender<ManagerCommand>,
267 page_num: usize,
268 ) {
269 tracing::warn!("got response from the server: {error:#?}");
270
271 manager_command_sender
272 .send(ManagerCommand::DownloadError { page_num })
273 .await
274 .expect("manager shutdowned before getting the signal from last downloading task");
275 }
276
277 pub(crate) async fn get_next_chunk(
281 resp: &mut Response,
282 manager_command_sender: &mpsc::Sender<ManagerCommand>,
283 page_num: usize,
284 ) -> (Option<Bytes>, bool) {
285 match resp.chunk().in_current_span().await {
286 Ok(chunk) => {
287 let with_errors = false;
288 (chunk, with_errors)
289 }
290 Err(e) => {
291 Self::handle_downloading_page_error(
292 Error::ReqwestError(e),
293 manager_command_sender,
294 page_num,
295 )
296 .in_current_span()
297 .await;
298
299 let with_errors = true;
300 (None, with_errors)
301 }
302 }
303 }
304
305 #[builder]
306 #[tracing::instrument(
307 skip(client, statuses, manager_command_sender),
308 name = "downloading_page"
309 )]
310 pub(crate) async fn downloading_page(
311 client: MangoClient,
312 chapter_hash: String,
313 page_num: usize,
314 url: String,
315 statuses: Arc<Mutex<Vec<PageStatus>>>,
316 manager_command_sender: mpsc::Sender<ManagerCommand>,
317 ) {
318 let out_page_filename = format!("tmp/{}/{}.png", chapter_hash, page_num);
319
320 let mut out_page = tokio::fs::File::create(&out_page_filename)
321 .await
322 .expect("failed to open file to save page");
323
324 let resp_res = client.get_page_chunks(&url).in_current_span().await;
325
326 match resp_res {
327 Ok(mut resp) => {
328 let total_size = resp
329 .content_length()
330 .expect("could not get the content_length of page");
331
332 while let (Some(chunk), with_error) =
333 Self::get_next_chunk(&mut resp, &manager_command_sender, page_num)
334 .in_current_span()
335 .await
336 {
337 if with_error {
338 return;
339 }
340
341 let cur_size = chunk.len();
342
343 out_page
344 .write_all(chunk.as_ref())
345 .await
346 .expect("failed to save page");
347
348 {
349 let mut statuses = statuses.lock();
350 let already_loaded = match statuses[page_num - 1] {
351 PageStatus::Loading(percent) => percent,
352 _ => unreachable!(),
353 };
354
355 statuses[page_num - 1] = PageStatus::Loading(
356 already_loaded + 100 * cur_size / total_size as usize,
357 );
358 }
359 }
360
361 {
362 let mut statuses = statuses.lock();
363 statuses[page_num - 1] = PageStatus::Loaded(out_page_filename.into());
364 }
365
366 manager_command_sender
367 .send(ManagerCommand::DownloadedSuccessfully { page_num })
368 .await
369 .expect(
370 "manager shutdowned before getting the signal from last downloading task",
371 );
372 }
373
374 Err(e) => {
375 Self::handle_downloading_page_error(e, &manager_command_sender, page_num)
376 .in_current_span()
377 .await;
378 }
379 }
380 }
381
382 #[builder]
383 #[tracing::instrument(skip_all, name = "downloadings_spawner")]
384 pub(crate) async fn downloadings_spawner(
385 client: MangoClient,
386 meta: ChapterDownloadMeta,
387 mut command_receiver: ReceiverStream<DownloadingsSpawnerCommand>,
388 manager_command_sender: mpsc::Sender<ManagerCommand>,
389 statuses: Arc<Mutex<Vec<PageStatus>>>,
390 ) {
391 let mut set = JoinSet::new();
392
393 while let Some(command) = command_receiver.next().await {
394 tracing::debug!("got command: \n{command:#?}\n");
395
396 match command {
397 DownloadingsSpawnerCommand::Shutdown => {
398 set.shutdown().await;
399
400 break;
401 }
402 DownloadingsSpawnerCommand::NewDownload { page_num } => {
403 let _ = set.try_join_next();
406
407 let download_url = format!(
408 "{}/data/{}/{}",
409 &meta.base_url,
410 &meta.chapter.hash,
411 &meta.chapter.data[page_num - 1]
412 );
413
414 let downloading_page = Self::downloading_page()
415 .client(client.clone())
416 .chapter_hash(meta.chapter.hash.clone())
417 .page_num(page_num)
418 .url(download_url)
419 .statuses(Arc::clone(&statuses))
420 .manager_command_sender(manager_command_sender.clone())
421 .call();
422
423 set.spawn(downloading_page);
424
425 tracing::trace!("spawned new download task");
426 }
427 }
428 }
429
430 tracing::debug!("shutdowned");
431 }
432
433 #[tracing::instrument(skip(self))]
448 pub async fn chapter_viewer(
449 &self,
450 chapter_id: &str,
451 mut max_concurrent_downloads: usize,
452 ) -> Result<ChapterViewer> {
453 max_concurrent_downloads = max_concurrent_downloads.max(1);
454
455 let download_meta = self
456 .get_chapter_download_meta(chapter_id)
457 .in_current_span()
458 .await?;
459
460 let chapter_size = download_meta.chapter.data.len();
461 let buf = Arc::new(Mutex::new(vec![PageStatus::Idle; chapter_size]));
462
463 let (downloadings_sender, downloading_receiver) = mpsc::channel(chapter_size);
464 let downloading_receiver = ReceiverStream::new(downloading_receiver);
465
466 let (manager_command_sender, manager_command_receiver) = mpsc::channel(10);
467 let manager_command_receiver = ReceiverStream::new(manager_command_receiver);
468
469 let (downloadings_spawner_command_sender, downloadings_spawner_command_receiver) =
470 mpsc::channel(10);
471 let downloadings_spawner_command_receiver =
472 ReceiverStream::new(downloadings_spawner_command_receiver);
473
474 let res = ChapterViewer {
475 downloadings: downloading_receiver,
476 statuses: buf,
477 submit_switch: manager_command_sender.clone(),
478 };
479
480 let chapter_download_dir = format!("tmp/{}", download_meta.chapter.hash);
481 if !std::fs::exists(&chapter_download_dir).expect("failed to get info about tmp directory")
482 {
483 std::fs::create_dir_all(&chapter_download_dir)
484 .expect("failed to create directory for storing pages");
485 } else {
486 let dir_meta = std::fs::metadata(&chapter_download_dir)
487 .expect("failed to create directory for storing pages");
488
489 if !dir_meta.is_dir() {
490 panic!("failed to create directory for storing pages: file already exists, not a directory");
491 }
492 }
493
494 let manager = Self::downloadings_manager()
495 .opened_page(1)
496 .statuses(Arc::clone(&res.statuses))
497 .spawner_command_sender(downloadings_spawner_command_sender.clone())
498 .downloadings_sender(downloadings_sender)
499 .command_receiver(manager_command_receiver)
500 .max_concurrent_downloads(max_concurrent_downloads)
501 .chapter_size(chapter_size)
502 .call();
503
504 task::spawn(manager);
505
506 let spawner = Self::downloadings_spawner()
507 .client(self.clone())
508 .meta(download_meta)
509 .command_receiver(downloadings_spawner_command_receiver)
510 .manager_command_sender(manager_command_sender)
511 .statuses(Arc::clone(&res.statuses))
512 .call();
513
514 task::spawn(spawner);
515
516 return Ok(res);
517 }
518}