eureka_mmanager/download/chapter/task/messages/download.rs
1use std::{
2 collections::HashMap,
3 path::Path,
4 sync::{
5 atomic::{AtomicBool, Ordering as AtomicOrd},
6 Arc,
7 },
8};
9
10use actix::prelude::*;
11use bytes::{Buf, Bytes};
12use futures_util::FutureExt;
13use mangadex_api_schema_rust::v5::ChapterObject as Object;
14use mangadex_api_types_rust::RelationshipType;
15use tokio_stream::StreamExt;
16
17use crate::{
18 data_push::chapter::{image::ChapterImagePushEntry, ChapterRequiredRelationship},
19 download::{
20 chapter::task::{
21 ChapterDownloadTask as Task, ChapterDownloadTaskState, ChapterDownloadingState as State,
22 },
23 messages::StartDownload,
24 state::{messages::get::GetManagerStateData, DownloadTaskState, TaskState},
25 traits::task::{Download, State as TaskStateTrait},
26 },
27 history::{
28 history_w_file::traits::{AsyncAutoCommitRollbackInsert, AsyncAutoCommitRollbackRemove},
29 HistoryEntry,
30 },
31 prelude::{ChapterDataPullAsyncTrait, DeleteDataAsyncTrait, PushActorAddr},
32 ManagerCoreResult,
33};
34
35impl Task {
36 fn preloading(&self) {
37 *self.state.write() = DownloadTaskState::Loading(State::Preloading);
38 self.sync_state_subscribers();
39 }
40 fn send_to_subscrbers(&self) -> Arc<dyn Fn(ChapterDownloadTaskState) + Send + Sync + 'static> {
41 let state = self.state.clone();
42 let subs = self.subscribers.clone();
43 Arc::new({
44 move |state_to_send: ChapterDownloadTaskState| {
45 *state.write() = state_to_send.clone();
46 subs.do_send(crate::download::messages::TaskSubscriberMessages::State(
47 state_to_send,
48 ));
49 }
50 })
51 }
52}
53
54impl Download for Task {
55 fn download(&mut self, ctx: &mut Self::Context) {
56 if self.state() != TaskState::Loading {
57 self.preloading();
58 let manager = self.manager.clone();
59 let mode = self.mode;
60 let id = self.id;
61 let force_port_443 = self.force_port_443;
62
63 let entry = HistoryEntry::new(id, RelationshipType::Chapter);
64 let send_to_subscrbers = self.send_to_subscrbers();
65 let send_to_subs_map = send_to_subscrbers.clone();
66 if let Some(t) = self.handle.replace(
67 ctx.spawn(
68 async move {
69 // Getting manager state data
70
71 let client = manager.get_client().await?;
72 let mut history = manager.get_history().await?;
73 // fetching chapter data
74 send_to_subscrbers(DownloadTaskState::Loading(State::FetchingData));
75 // insert data in history
76 history.insert_and_commit(entry).await?;
77 let res = client
78 .chapter()
79 .id(id)
80 .get()
81 .includes(ChapterRequiredRelationship::get_includes())
82 .send()
83 .await?;
84 // push chapter data to the dirs_option actor
85 manager.verify_and_push(res.data.clone()).await?;
86 // Getting fetching AtHome data
87 send_to_subscrbers(DownloadTaskState::Loading(State::FetchingAtHomeData));
88 let current_images =
89 manager.get_chapter_images(id).await.unwrap_or_default();
90 let (images, is_new) = {
91 let mut images: HashMap<String, usize> = Default::default();
92 // getting current images size
93 match mode {
94 crate::download::chapter::task::DownloadMode::Normal => {
95 for image in ¤t_images.data {
96 if let Ok(b) =
97 manager.get_chapter_image(id, image.clone()).await
98 {
99 if let Ok(len) =
100 b.metadata().map(|met| met.len() as usize)
101 {
102 images.insert(image.clone(), len);
103 }
104 }
105 }
106 }
107 crate::download::chapter::task::DownloadMode::DataSaver => {
108 for image in ¤t_images.data_saver {
109 if let Ok(b) = manager
110 .get_chapter_image_data_saver(id, image.clone())
111 .await
112 {
113 if let Ok(len) =
114 b.metadata().map(|met| met.len() as usize)
115 {
116 images.insert(image.clone(), len);
117 }
118 }
119 }
120 }
121 };
122 let is_new = AtomicBool::new(match mode {
123 crate::download::chapter::task::DownloadMode::Normal => {
124 current_images.data.is_empty()
125 }
126 crate::download::chapter::task::DownloadMode::DataSaver => {
127 current_images.data_saver.is_empty()
128 }
129 });
130 (images, is_new)
131 };
132
133 let is_first_loading = AtomicBool::new(true);
134 let stream = client
135 .download()
136 .chapter(id)
137 .report(true)
138 .mode(mode)
139 .force_port_443(force_port_443)
140 .build()?
141 .download_stream_with_checker(|at_home, resp| {
142 if !is_new.load(AtomicOrd::Relaxed)
143 && is_first_loading.load(AtomicOrd::Relaxed)
144 {
145 match mode {
146 crate::download::chapter::task::DownloadMode::Normal => {
147 is_new.swap(
148 at_home
149 .at_home
150 .chapter
151 .data
152 .partial_cmp(¤t_images.data)
153 .map(|cm| cm.is_ne())
154 .unwrap_or(true),
155 AtomicOrd::Relaxed,
156 );
157 }
158 crate::download::chapter::task::DownloadMode::DataSaver => {
159 is_new.swap(
160 at_home
161 .at_home
162 .chapter
163 .data_saver
164 .partial_cmp(¤t_images.data_saver)
165 .map(|cm| cm.is_ne())
166 .unwrap_or(true),
167 AtomicOrd::Relaxed,
168 );
169 }
170 };
171 is_first_loading.swap(false, AtomicOrd::Relaxed);
172 }
173 // log::debug!("{}", is_new.load(AtomicOrd::Relaxed));
174 if is_new.load(AtomicOrd::Relaxed) {
175 false
176 } else {
177 resp.content_length()
178 .and_then(|cl| {
179 let filename = Path::new(resp.url().path())
180 .file_name()?
181 .to_str()?;
182 images
183 .get(filename)?
184 .partial_cmp(&TryInto::<usize>::try_into(cl).ok()?)
185 })
186 .map(|o| o.is_eq())
187 .unwrap_or_default()
188 }
189 })
190 .await?;
191 // Delete if the chapter data is new
192 if is_new.load(AtomicOrd::Relaxed) {
193 manager
194 .delete_chapter_images_ignore_conflict(id, mode)
195 .await?;
196 }
197 // Fetches each images and stores it
198 let mut have_error = false;
199 let have_error_ref = &mut have_error;
200 let mut mark_have_error = move || {
201 if !*have_error_ref {
202 *have_error_ref = true;
203 }
204 };
205 let mut stream = Box::pin(stream);
206 while let Some(((filename, res_bytes), index, len)) = stream.next().await {
207 send_to_subscrbers(DownloadTaskState::Loading(State::FetchingImage {
208 filename: filename.clone(),
209 index,
210 len,
211 }));
212 match res_bytes {
213 Ok(b) => {
214 if let Err(e) = manager
215 .push(
216 ChapterImagePushEntry::new(
217 id,
218 filename.clone(),
219 b.reader(),
220 )
221 .mode(mode),
222 )
223 .await
224 {
225 mark_have_error();
226 log::error!("[chapter|{id}|{filename}]>write - {e}");
227 }
228 }
229 Err(e) => {
230 if let mangadex_api_types_rust::error::Error::SkippedDownload(
231 _,
232 ) = &e
233 {
234 } else {
235 mark_have_error();
236 log::error!("[chapter|{id}|{filename}]>write - {e}");
237 if let Err(e) = manager
238 .push(
239 ChapterImagePushEntry::new(
240 id,
241 filename.clone(),
242 Bytes::new().reader(),
243 )
244 .mode(mode),
245 )
246 .await
247 {
248 mark_have_error();
249 log::error!("[chapter|{id}|{filename}]>write - {e}");
250 }
251 }
252 }
253 }
254 }
255 if !have_error {
256 history.remove_and_commit(entry).await?;
257 }
258 Ok(res.data)
259 }
260 .map(move |res: ManagerCoreResult<Object>| {
261 send_to_subs_map(res.into());
262 })
263 .into_actor(self),
264 ),
265 ) {
266 ctx.cancel_future(t);
267 }
268 }
269 }
270}
271
272impl Handler<StartDownload> for Task {
273 type Result = ();
274 fn handle(&mut self, _msg: StartDownload, ctx: &mut Self::Context) -> Self::Result {
275 self.download(ctx);
276 }
277}