mango_api/
viewer.rs

1//! Placement for most of the internal structs and functions related to the [ChapterViewer]
2
3use crate::requests::chapter::ChapterDownloadMeta;
4use crate::requests::{Error, Result};
5use crate::MangoClient;
6
7use std::path::PathBuf;
8use std::sync::Arc;
9
10use parking_lot::Mutex;
11
12use tokio::io::AsyncWriteExt as _;
13use tokio::sync::mpsc::{self, Sender};
14use tokio::task::{self, JoinSet};
15use tokio_stream::wrappers::ReceiverStream;
16use tokio_stream::StreamExt as _;
17
18use bon::bon;
19use bytes::Bytes;
20use reqwest::Response;
21use tracing::instrument::Instrument as _;
22
23#[derive(Debug, Clone)]
24pub(crate) enum PageStatus {
25    Loaded(PathBuf),
26    Loading(usize),
27    Idle,
28}
29
30#[derive(Debug, Clone)]
31pub(crate) enum ManagerCommand {
32    SwitchPage { page_num: usize },
33    DownloadedSuccessfully { page_num: usize },
34    DownloadError { page_num: usize },
35}
36
37#[derive(Debug, Clone, Copy)]
38pub(crate) enum DownloadingsSpawnerCommand {
39    Shutdown,
40    NewDownload { page_num: usize },
41}
42
43/// The struct used for getting access to chapter pages that are being downloaded via
44/// [MangoClient::chapter_viewer]
45#[derive(Debug)]
46pub struct ChapterViewer {
47    pub(crate) statuses: Arc<Mutex<Vec<PageStatus>>>,
48    pub(crate) downloadings: ReceiverStream<usize>,
49    pub(crate) submit_switch: Sender<ManagerCommand>,
50}
51
52impl ChapterViewer {
53    #[tracing::instrument(skip(self))]
54    /// Returns the path to the desired page
55    pub async fn open_page(&mut self, page_num: usize) -> PathBuf {
56        tracing::trace!("entered");
57
58        tracing::trace!("acquiring mutex lock");
59
60        let status = {
61            let statuses = self.statuses.lock();
62            statuses[page_num - 1].clone()
63        };
64
65        tracing::trace!("released mutex lock");
66
67        let sender = self.submit_switch.clone();
68
69        task::spawn(async move {
70            // NOTE: if the receiver is dropped, then all pages are downloaded and we don't care in
71            // delivering this message
72            let _ = sender.send(ManagerCommand::SwitchPage { page_num }).await;
73        });
74
75        match status {
76            PageStatus::Loaded(path) => path,
77            _ => {
78                tracing::trace!("starting waiting for page {page_num}");
79
80                let mut res = None;
81                while let Some(downloaded_page_num) = self.downloadings.next().await {
82                    tracing::trace!("got signal that page {downloaded_page_num} loaded");
83
84                    if downloaded_page_num == page_num {
85                        tracing::trace!("acquiring mutex lock");
86
87                        let status = {
88                            let statuses = self.statuses.lock();
89                            statuses[page_num - 1].clone()
90                        };
91
92                        tracing::trace!("returned mutex lock");
93
94                        match status {
95                            PageStatus::Loaded(path) => {
96                                res = Some(path);
97                                break;
98                            }
99                            _ => panic!(
100                                "got message that page was downloaded while it actually was not"
101                            ),
102                        }
103                    }
104                }
105                res.expect("mananager task shutdowned before desired page was downloaded")
106            }
107        }
108    }
109}
110
111#[bon]
112impl MangoClient {
113    pub(crate) fn determine_next_download_page(
114        statuses: &mut parking_lot::MutexGuard<'_, Vec<PageStatus>>,
115        opened_page: usize,
116    ) -> (Option<usize>, bool) {
117        let mut found_loading_pages = false;
118
119        match statuses[opened_page - 1] {
120            PageStatus::Idle => (Some(opened_page), found_loading_pages),
121            _ => {
122                let mut page = opened_page;
123
124                while page != opened_page - 1 {
125                    if page == statuses.len() {
126                        page = 0;
127
128                        if page == opened_page - 1 {
129                            break;
130                        }
131                    }
132
133                    match statuses[page] {
134                        PageStatus::Idle => {
135                            break;
136                        }
137                        PageStatus::Loading(_) => {
138                            found_loading_pages = true;
139                        }
140                        _ => {}
141                    };
142
143                    page += 1;
144                }
145
146                if page == opened_page - 1 {
147                    if let PageStatus::Loading(_) = statuses[page] {
148                        found_loading_pages = true;
149                    }
150
151                    (None, found_loading_pages)
152                } else {
153                    statuses[page] = PageStatus::Loading(0);
154                    (Some(page + 1), found_loading_pages)
155                }
156            }
157        }
158    }
159
160    #[builder]
161    #[tracing::instrument(
162        skip(
163            statuses,
164            spawner_command_sender,
165            downloadings_sender,
166            command_receiver
167        ),
168        name = "downloadings_manager"
169    )]
170    pub(crate) async fn downloadings_manager(
171        mut opened_page: usize,
172        statuses: Arc<Mutex<Vec<PageStatus>>>,
173        spawner_command_sender: mpsc::Sender<DownloadingsSpawnerCommand>,
174        downloadings_sender: Option<mpsc::Sender<usize>>,
175        mut command_receiver: ReceiverStream<ManagerCommand>,
176        max_concurrent_downloads: usize,
177        chapter_size: usize,
178    ) {
179        {
180            let mut statuses = statuses.lock();
181
182            for i in 0..chapter_size.min(max_concurrent_downloads) {
183                statuses[i] = PageStatus::Loading(0);
184            }
185        }
186
187        for i in 0..chapter_size.min(max_concurrent_downloads) {
188            spawner_command_sender
189                .send(DownloadingsSpawnerCommand::NewDownload { page_num: i + 1 })
190                .await
191                .expect("task spawner shutdowned before downloading all pages");
192        }
193
194        while let Some(command) = command_receiver.next().await {
195            tracing::debug!("got command: \n{command:#?}\n");
196            tracing::trace!("\nstatuses: {statuses:#?}\n");
197
198            match command {
199                ManagerCommand::SwitchPage { page_num } => {
200                    opened_page = page_num;
201                }
202                ManagerCommand::DownloadError { page_num } => {
203                    let new_download_command = DownloadingsSpawnerCommand::NewDownload { page_num };
204                    spawner_command_sender
205                        .send(new_download_command)
206                        .await
207                        .expect("downloadings_spawner shutdowned before downloading all pages");
208
209                    tracing::trace!("manager sent {new_download_command:#?} command to set");
210                }
211                ManagerCommand::DownloadedSuccessfully { page_num } => {
212                    // NOTE: if the receiver is dropped, than nobody is interested in downloading this
213                    // chapter at the moment, so we can shutdown
214                    if let Some(sender) = &downloadings_sender {
215                        if let Err(e) = sender.send(page_num).await {
216                            tracing::debug!(
217                                "got error {e}: downloadings receiver dropped, shutting down downloadings manager"
218                            );
219
220                            let command = DownloadingsSpawnerCommand::Shutdown;
221                            spawner_command_sender.send(command).await.expect(
222                                "downloadings_spawner shutdowned before the respectful command",
223                            );
224
225                            tracing::trace!("manager sent {command:#?} command to set");
226
227                            break;
228                        }
229                    };
230
231                    let (next_download_page, found_loading_pages) =
232                        Self::determine_next_download_page(&mut statuses.lock(), opened_page);
233
234                    match next_download_page {
235                        Some(page) => {
236                            let command =
237                                DownloadingsSpawnerCommand::NewDownload { page_num: page };
238
239                            spawner_command_sender.send(command).await.expect(
240                                "downloadings_spawner shutdowned before downloading all pages",
241                            );
242
243                            tracing::trace!("manager sent {command:#?} command to set");
244                        }
245                        None => {
246                            if !found_loading_pages {
247                                let command = DownloadingsSpawnerCommand::Shutdown;
248                                spawner_command_sender.send(command).await.expect(
249                                    "join_set task shutdowned before the respectful command",
250                                );
251
252                                tracing::trace!("manager sent {command:#?} command to set");
253
254                                break;
255                            }
256                        }
257                    };
258                }
259            }
260        }
261        tracing::debug!("shutdowned");
262    }
263
264    pub(crate) async fn handle_downloading_page_error(
265        error: Error,
266        manager_command_sender: &mpsc::Sender<ManagerCommand>,
267        page_num: usize,
268    ) {
269        tracing::warn!("got response from the server: {error:#?}");
270
271        manager_command_sender
272            .send(ManagerCommand::DownloadError { page_num })
273            .await
274            .expect("manager shutdowned before getting the signal from last downloading task");
275    }
276
277    /// Queries next chunk, handles possible errors and returnes this chunk if no errors
278    /// encountered or None if errors were met or the source is drained along with the boolean
279    /// value that represents if errores were emitted
280    pub(crate) async fn get_next_chunk(
281        resp: &mut Response,
282        manager_command_sender: &mpsc::Sender<ManagerCommand>,
283        page_num: usize,
284    ) -> (Option<Bytes>, bool) {
285        match resp.chunk().in_current_span().await {
286            Ok(chunk) => {
287                let with_errors = false;
288                (chunk, with_errors)
289            }
290            Err(e) => {
291                Self::handle_downloading_page_error(
292                    Error::ReqwestError(e),
293                    manager_command_sender,
294                    page_num,
295                )
296                .in_current_span()
297                .await;
298
299                let with_errors = true;
300                (None, with_errors)
301            }
302        }
303    }
304
305    #[builder]
306    #[tracing::instrument(
307        skip(client, statuses, manager_command_sender),
308        name = "downloading_page"
309    )]
310    pub(crate) async fn downloading_page(
311        client: MangoClient,
312        chapter_hash: String,
313        page_num: usize,
314        url: String,
315        statuses: Arc<Mutex<Vec<PageStatus>>>,
316        manager_command_sender: mpsc::Sender<ManagerCommand>,
317    ) {
318        let out_page_filename = format!("tmp/{}/{}.png", chapter_hash, page_num);
319
320        let mut out_page = tokio::fs::File::create(&out_page_filename)
321            .await
322            .expect("failed to open file to save page");
323
324        let resp_res = client.get_page_chunks(&url).in_current_span().await;
325
326        match resp_res {
327            Ok(mut resp) => {
328                let total_size = resp
329                    .content_length()
330                    .expect("could not get the content_length of page");
331
332                while let (Some(chunk), with_error) =
333                    Self::get_next_chunk(&mut resp, &manager_command_sender, page_num)
334                        .in_current_span()
335                        .await
336                {
337                    if with_error {
338                        return;
339                    }
340
341                    let cur_size = chunk.len();
342
343                    out_page
344                        .write_all(chunk.as_ref())
345                        .await
346                        .expect("failed to save page");
347
348                    {
349                        let mut statuses = statuses.lock();
350                        let already_loaded = match statuses[page_num - 1] {
351                            PageStatus::Loading(percent) => percent,
352                            _ => unreachable!(),
353                        };
354
355                        statuses[page_num - 1] = PageStatus::Loading(
356                            already_loaded + 100 * cur_size / total_size as usize,
357                        );
358                    }
359                }
360
361                {
362                    let mut statuses = statuses.lock();
363                    statuses[page_num - 1] = PageStatus::Loaded(out_page_filename.into());
364                }
365
366                manager_command_sender
367                    .send(ManagerCommand::DownloadedSuccessfully { page_num })
368                    .await
369                    .expect(
370                        "manager shutdowned before getting the signal from last downloading task",
371                    );
372            }
373
374            Err(e) => {
375                Self::handle_downloading_page_error(e, &manager_command_sender, page_num)
376                    .in_current_span()
377                    .await;
378            }
379        }
380    }
381
382    #[builder]
383    #[tracing::instrument(skip_all, name = "downloadings_spawner")]
384    pub(crate) async fn downloadings_spawner(
385        client: MangoClient,
386        meta: ChapterDownloadMeta,
387        mut command_receiver: ReceiverStream<DownloadingsSpawnerCommand>,
388        manager_command_sender: mpsc::Sender<ManagerCommand>,
389        statuses: Arc<Mutex<Vec<PageStatus>>>,
390    ) {
391        let mut set = JoinSet::new();
392
393        while let Some(command) = command_receiver.next().await {
394            tracing::debug!("got command: \n{command:#?}\n");
395
396            match command {
397                DownloadingsSpawnerCommand::Shutdown => {
398                    set.shutdown().await;
399
400                    break;
401                }
402                DownloadingsSpawnerCommand::NewDownload { page_num } => {
403                    // HACK: we don't care about the result of the computations, this is simply
404                    // for the sake of buffer not overflowing
405                    let _ = set.try_join_next();
406
407                    let download_url = format!(
408                        "{}/data/{}/{}",
409                        &meta.base_url,
410                        &meta.chapter.hash,
411                        &meta.chapter.data[page_num - 1]
412                    );
413
414                    let downloading_page = Self::downloading_page()
415                        .client(client.clone())
416                        .chapter_hash(meta.chapter.hash.clone())
417                        .page_num(page_num)
418                        .url(download_url)
419                        .statuses(Arc::clone(&statuses))
420                        .manager_command_sender(manager_command_sender.clone())
421                        .call();
422
423                    set.spawn(downloading_page);
424
425                    tracing::trace!("spawned new download task");
426                }
427            }
428        }
429
430        tracing::debug!("shutdowned");
431    }
432
433    /// Setups new [ChapterViewer] instance.
434    /// The gist of how this works is as following.
435    ///
436    /// This function creates two working green threads:
437    /// - downloadings_manager
438    /// - downloadings_spawner
439    ///
440    /// These threads interact with each other via channels.
441    /// Manager is responsible for deciding what to download next and an overall process status.
442    /// Spawner is responsible for creating small green thread downloaders for each page.
443    /// The maximum number of concurrently downloading pages is specified by `max_concurrent_downloads` .
444    /// The function returnes an instance of [ChapterViewer] that can be used to interact with
445    /// the manager task by means of the [`open_page`](ChapterViewer::open_page) function.
446    /// Working threads shutdown when the [ChapterViewer] is dropped or when all pages are downloaded
447    #[tracing::instrument(skip(self))]
448    pub async fn chapter_viewer(
449        &self,
450        chapter_id: &str,
451        mut max_concurrent_downloads: usize,
452    ) -> Result<ChapterViewer> {
453        max_concurrent_downloads = max_concurrent_downloads.max(1);
454
455        let download_meta = self
456            .get_chapter_download_meta(chapter_id)
457            .in_current_span()
458            .await?;
459
460        let chapter_size = download_meta.chapter.data.len();
461        let buf = Arc::new(Mutex::new(vec![PageStatus::Idle; chapter_size]));
462
463        let (downloadings_sender, downloading_receiver) = mpsc::channel(chapter_size);
464        let downloading_receiver = ReceiverStream::new(downloading_receiver);
465
466        let (manager_command_sender, manager_command_receiver) = mpsc::channel(10);
467        let manager_command_receiver = ReceiverStream::new(manager_command_receiver);
468
469        let (downloadings_spawner_command_sender, downloadings_spawner_command_receiver) =
470            mpsc::channel(10);
471        let downloadings_spawner_command_receiver =
472            ReceiverStream::new(downloadings_spawner_command_receiver);
473
474        let res = ChapterViewer {
475            downloadings: downloading_receiver,
476            statuses: buf,
477            submit_switch: manager_command_sender.clone(),
478        };
479
480        let chapter_download_dir = format!("tmp/{}", download_meta.chapter.hash);
481        if !std::fs::exists(&chapter_download_dir).expect("failed to get info about tmp directory")
482        {
483            std::fs::create_dir_all(&chapter_download_dir)
484                .expect("failed to create directory for storing pages");
485        } else {
486            let dir_meta = std::fs::metadata(&chapter_download_dir)
487                .expect("failed to create directory for storing pages");
488
489            if !dir_meta.is_dir() {
490                panic!("failed to create directory for storing pages: file already exists, not a directory");
491            }
492        }
493
494        let manager = Self::downloadings_manager()
495            .opened_page(1)
496            .statuses(Arc::clone(&res.statuses))
497            .spawner_command_sender(downloadings_spawner_command_sender.clone())
498            .downloadings_sender(downloadings_sender)
499            .command_receiver(manager_command_receiver)
500            .max_concurrent_downloads(max_concurrent_downloads)
501            .chapter_size(chapter_size)
502            .call();
503
504        task::spawn(manager);
505
506        let spawner = Self::downloadings_spawner()
507            .client(self.clone())
508            .meta(download_meta)
509            .command_receiver(downloadings_spawner_command_receiver)
510            .manager_command_sender(manager_command_sender)
511            .statuses(Arc::clone(&res.statuses))
512            .call();
513
514        task::spawn(spawner);
515
516        return Ok(res);
517    }
518}