Skip to main content

eureka_mmanager/download/
cover.rs

1pub mod messages;
2pub mod task;
3
4use std::{collections::HashMap, sync::Arc, time::Duration};
5
6use actix::{prelude::*, WeakAddr};
7use shrink_fit_wrapper::ShrinkFitWrapper;
8use tokio::sync::Notify;
9use uuid::Uuid;
10
11use crate::{download::messages::StopTask, prelude::AsyncState};
12
13use self::task::CoverDownloadTask;
14
15use super::{
16    messages::{DropSingleTaskMessage, GetTaskMessage, StartDownload},
17    state::{DownloadManagerState, DownloadMessageState},
18    traits::managers::TaskManager,
19};
20
21#[derive(Debug)]
22pub struct CoverDownloadManager {
23    state: Addr<DownloadManagerState>,
24    tasks: ShrinkFitWrapper<HashMap<Uuid, WeakAddr<CoverDownloadTask>>>,
25    notify: Arc<Notify>,
26}
27
28impl CoverDownloadManager {
29    pub fn new(state: Addr<DownloadManagerState>) -> Self {
30        Self {
31            state,
32            tasks: ShrinkFitWrapper::new(HashMap::new())
33                .set_shrink_duration_cycle(Duration::from_secs(10 * 60)),
34            notify: Arc::new(Notify::new()),
35        }
36    }
37}
38
39impl Actor for CoverDownloadManager {
40    type Context = Context<Self>;
41}
42
43#[derive(Debug, Clone, Copy)]
44pub struct CoverDownloadMessage {
45    id: Uuid,
46    state: DownloadMessageState,
47}
48
49impl CoverDownloadMessage {
50    pub fn new(id: Uuid) -> Self {
51        Self {
52            id,
53            state: DownloadMessageState::Pending,
54        }
55    }
56    pub fn state(self, state: DownloadMessageState) -> Self {
57        Self { state, ..self }
58    }
59}
60
61impl From<Uuid> for CoverDownloadMessage {
62    fn from(value: Uuid) -> Self {
63        Self::new(value)
64    }
65}
66
67impl From<CoverDownloadMessage> for Uuid {
68    fn from(value: CoverDownloadMessage) -> Self {
69        value.id
70    }
71}
72
73impl Message for CoverDownloadMessage {
74    type Result = Addr<CoverDownloadTask>;
75}
76
77impl TaskManager for CoverDownloadManager {
78    type Task = CoverDownloadTask;
79    type DownloadMessage = CoverDownloadMessage;
80
81    fn state(&self) -> Addr<DownloadManagerState> {
82        self.state.clone()
83    }
84
85    fn notify(&self) -> Arc<Notify> {
86        self.notify.clone()
87    }
88
89    fn tasks(&self) -> Vec<Addr<Self::Task>> {
90        self.tasks
91            .values()
92            .flat_map(|task| task.upgrade())
93            .collect()
94    }
95    fn tasks_id(&self) -> Vec<Uuid> {
96        self.tasks
97            .iter()
98            .flat_map(|(id, tasks)| {
99                if tasks.upgrade().is_some() {
100                    Some(id)
101                } else {
102                    None
103                }
104            })
105            .copied()
106            .collect()
107    }
108
109    fn new_task(
110        &mut self,
111        msg: Self::DownloadMessage,
112        ctx: &mut Self::Context,
113    ) -> Addr<Self::Task> {
114        let task = {
115            match self.tasks.as_mut().entry(msg.id) {
116                std::collections::hash_map::Entry::Occupied(mut occupied_entry) => {
117                    let weak = occupied_entry.get_mut();
118                    if let Some(tsk) = weak.upgrade() {
119                        tsk
120                    } else {
121                        let tsk = Self::Task::new(msg.id, ctx.address()).start();
122                        let _weak = std::mem::replace(weak, tsk.downgrade());
123                        tsk
124                    }
125                }
126                std::collections::hash_map::Entry::Vacant(vacant_entry) => {
127                    let tsk = Self::Task::new(msg.id, ctx.address()).start();
128                    vacant_entry.insert(tsk.downgrade());
129                    tsk
130                }
131            }
132        };
133        let re_task = task.clone();
134        self.notify.notify_waiters();
135
136        if let DownloadMessageState::Downloading = msg.state {
137            let fut = async move {
138                let state = re_task.state().await?;
139                if !state.is_loading() {
140                    re_task.send(StartDownload).await?;
141                }
142                Ok::<_, actix::MailboxError>(())
143            }
144            .into_actor(self)
145            .map(|s, _, _| {
146                if let Err(err) = s {
147                    log::error!("{err}");
148                }
149            });
150            ctx.wait(fut)
151        }
152        task
153    }
154
155    fn drop_task(&mut self, id: Uuid) {
156        if let Some(task) = self.tasks.get(&id) {
157            if task.upgrade().is_none() {
158                self.tasks.as_mut().remove(&id);
159            }
160        }
161        self.notify.notify_waiters();
162    }
163    fn get_task(&self, id: Uuid) -> Option<Addr<Self::Task>> {
164        self.tasks.get(&id).and_then(WeakAddr::upgrade)
165    }
166}
167
168impl Handler<CoverDownloadMessage> for CoverDownloadManager {
169    type Result = <CoverDownloadMessage as Message>::Result;
170    fn handle(&mut self, msg: CoverDownloadMessage, ctx: &mut Self::Context) -> Self::Result {
171        self.new_task(msg, ctx)
172    }
173}
174
175impl Handler<DropSingleTaskMessage> for CoverDownloadManager {
176    type Result = <DropSingleTaskMessage as Message>::Result;
177    fn handle(&mut self, msg: DropSingleTaskMessage, _ctx: &mut Self::Context) -> Self::Result {
178        self.drop_task(msg.0);
179    }
180}
181
182impl Handler<GetTaskMessage<CoverDownloadTask>> for CoverDownloadManager {
183    type Result = <GetTaskMessage<CoverDownloadTask> as Message>::Result;
184    fn handle(
185        &mut self,
186        msg: GetTaskMessage<CoverDownloadTask>,
187        _ctx: &mut Self::Context,
188    ) -> Self::Result {
189        self.get_task(msg.into())
190    }
191}
192
193impl Drop for CoverDownloadManager {
194    fn drop(&mut self) {
195        self.tasks
196            .values()
197            .flat_map(|maybe_task| maybe_task.upgrade())
198            .for_each(|task| task.do_send(StopTask));
199    }
200}