eureka_mmanager/download/
chapter.rs1pub mod messages;
2pub mod task;
3
4use std::{collections::HashMap, sync::Arc, time::Duration};
5
6use actix::{prelude::*, WeakAddr};
7use shrink_fit_wrapper::ShrinkFitWrapper;
8use task::DownloadMode;
9use tokio::sync::Notify;
10use uuid::Uuid;
11
12use crate::download::messages::StopTask;
13
14use self::task::ChapterDownloadTask;
15
16use super::{
17 messages::{DropSingleTaskMessage, GetTaskMessage, StartDownload},
18 state::{DownloadManagerState, DownloadMessageState},
19 traits::{managers::TaskManager, task::AsyncState},
20};
21
22#[derive(Debug)]
23pub struct ChapterDownloadManager {
24 state: Addr<DownloadManagerState>,
25 tasks: ShrinkFitWrapper<HashMap<Uuid, WeakAddr<ChapterDownloadTask>>>,
26 notify: Arc<Notify>,
27}
28
29#[derive(Debug, Clone, Copy)]
30pub struct ChapterDownloadMessage {
31 id: Uuid,
32 state: DownloadMessageState,
33 mode: DownloadMode,
34 force_port_443: bool,
35}
36
37impl ChapterDownloadMessage {
38 pub fn new(id: Uuid) -> Self {
39 Self {
40 id,
41 state: DownloadMessageState::Pending,
42 mode: DownloadMode::Normal,
43 force_port_443: false,
44 }
45 }
46 pub fn state(self, state: DownloadMessageState) -> Self {
47 Self { state, ..self }
48 }
49 pub fn mode<M: Into<DownloadMode>>(self, mode: M) -> Self {
50 Self {
51 mode: mode.into(),
52 ..self
53 }
54 }
55 pub fn force_port_443(self, force_port_443: bool) -> Self {
56 Self {
57 force_port_443,
58 ..self
59 }
60 }
61}
62
63impl From<Uuid> for ChapterDownloadMessage {
64 fn from(value: Uuid) -> Self {
65 Self::new(value)
66 }
67}
68
69impl From<ChapterDownloadMessage> for Uuid {
70 fn from(value: ChapterDownloadMessage) -> Self {
71 value.id
72 }
73}
74
75impl Message for ChapterDownloadMessage {
76 type Result = Addr<<ChapterDownloadManager as TaskManager>::Task>;
77}
78
79impl TaskManager for ChapterDownloadManager {
80 type Task = ChapterDownloadTask;
81 type DownloadMessage = ChapterDownloadMessage;
82 fn state(&self) -> Addr<DownloadManagerState> {
83 self.state.clone()
84 }
85 fn notify(&self) -> Arc<Notify> {
86 self.notify.clone()
87 }
88 fn tasks(&self) -> Vec<Addr<Self::Task>> {
89 self.tasks
90 .values()
91 .flat_map(|task| task.upgrade())
92 .collect()
93 }
94 fn tasks_id(&self) -> Vec<Uuid> {
95 self.tasks
96 .iter()
97 .flat_map(|(id, tasks)| {
98 if tasks.upgrade().is_some() {
99 Some(id)
100 } else {
101 None
102 }
103 })
104 .copied()
105 .collect()
106 }
107 fn new_task(
108 &mut self,
109 msg: Self::DownloadMessage,
110 ctx: &mut Self::Context,
111 ) -> Addr<Self::Task> {
112 let task = {
113 match self.tasks.as_mut().entry(msg.id) {
114 std::collections::hash_map::Entry::Occupied(mut occupied_entry) => {
115 let weak = occupied_entry.get_mut();
116 if let Some(tsk) = weak.upgrade() {
117 tsk
118 } else {
119 let tsk =
120 Self::Task::new(msg.id, msg.mode, msg.force_port_443, ctx.address())
121 .start();
122 let _weak = std::mem::replace(weak, tsk.downgrade());
123 tsk
124 }
125 }
126 std::collections::hash_map::Entry::Vacant(vacant_entry) => {
127 let tsk = Self::Task::new(msg.id, msg.mode, msg.force_port_443, ctx.address())
128 .start();
129 vacant_entry.insert(tsk.downgrade());
130 tsk
131 }
132 }
133 };
134 let re_task = task.clone();
135 self.notify.notify_waiters();
136
137 if let DownloadMessageState::Downloading = msg.state {
138 let fut = async move {
139 let state = re_task.state().await?;
140 if !state.is_loading() {
141 re_task.send(msg.mode).await?;
142 re_task.send(StartDownload).await?;
143 }
144 Ok::<_, actix::MailboxError>(())
145 }
146 .into_actor(self)
147 .map(|s, _, _| {
148 if let Err(err) = s {
149 log::error!("{err}");
150 }
151 });
152 ctx.wait(fut)
153 }
154 task
155 }
156
157 fn drop_task(&mut self, id: Uuid) {
158 if let Some(task) = self.tasks.get(&id) {
159 if task.upgrade().is_none() {
160 self.tasks.as_mut().remove(&id);
161 }
162 }
163 self.notify.notify_waiters();
164 }
165 fn get_task(&self, id: Uuid) -> Option<Addr<Self::Task>> {
166 self.tasks.get(&id).and_then(WeakAddr::upgrade)
167 }
168}
169
170impl Handler<ChapterDownloadMessage> for ChapterDownloadManager {
171 type Result = <ChapterDownloadMessage as Message>::Result;
172 fn handle(&mut self, msg: ChapterDownloadMessage, ctx: &mut Self::Context) -> Self::Result {
173 self.new_task(msg, ctx)
174 }
175}
176
177impl Drop for ChapterDownloadManager {
178 fn drop(&mut self) {
179 self.tasks
180 .values()
181 .flat_map(|maybe_task| maybe_task.upgrade())
182 .for_each(|task| task.do_send(StopTask));
183 }
184}
185
186impl ChapterDownloadManager {
187 pub fn new(state: Addr<DownloadManagerState>) -> Self {
188 Self {
189 state,
190 tasks: ShrinkFitWrapper::new(HashMap::new())
191 .set_shrink_duration_cycle(Duration::from_secs(60 * 5)),
192 notify: Arc::new(Notify::new()),
193 }
194 }
195}
196
197impl Actor for ChapterDownloadManager {
198 type Context = Context<Self>;
199}
200
201impl Handler<DropSingleTaskMessage> for ChapterDownloadManager {
202 type Result = <DropSingleTaskMessage as Message>::Result;
203 fn handle(&mut self, msg: DropSingleTaskMessage, _ctx: &mut Self::Context) -> Self::Result {
204 self.drop_task(msg.0);
205 }
206}
207
208impl Handler<GetTaskMessage<ChapterDownloadTask>> for ChapterDownloadManager {
209 type Result = <GetTaskMessage<ChapterDownloadTask> as Message>::Result;
210 fn handle(
211 &mut self,
212 msg: GetTaskMessage<ChapterDownloadTask>,
213 _ctx: &mut Self::Context,
214 ) -> Self::Result {
215 self.get_task(msg.into())
216 }
217}