Skip to main content

eureka_mmanager/download/state/
task.rs

1use std::{future::Future, ops::Deref, task::Poll};
2
3use actix::prelude::*;
4use tokio::sync::watch::{self, Receiver, Sender};
5use tokio_util::sync::ReusableBoxFuture;
6
7use crate::{download::messages::TaskSubscriberMessages, ManagerCoreResult, OwnedError};
8
9#[derive(Debug, Clone)]
10pub enum DownloadTaskState<T, L> {
11    Pending,
12    Loading(L),
13    Error(OwnedError),
14    Done(T),
15    Canceled,
16}
17
18impl<T, L> Default for DownloadTaskState<T, L> {
19    fn default() -> Self {
20        Self::Pending
21    }
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, MessageResponse)]
25pub enum TaskState {
26    Pending,
27    Loading,
28    Error,
29    Done,
30    Canceled,
31}
32
33impl Default for TaskState {
34    fn default() -> Self {
35        Self::Pending
36    }
37}
38
39impl TaskState {
40    pub fn is_finished(&self) -> bool {
41        *self == Self::Done || *self == Self::Canceled || *self == Self::Error
42    }
43    pub fn is_pending(&self) -> bool {
44        matches!(*self, Self::Pending)
45    }
46    pub fn is_loading(&self) -> bool {
47        matches!(*self, Self::Loading)
48    }
49}
50
51impl<T, L> From<DownloadTaskState<T, L>> for TaskState {
52    fn from(value: DownloadTaskState<T, L>) -> Self {
53        (&value).into()
54    }
55}
56
57impl<T, L> From<&DownloadTaskState<T, L>> for TaskState {
58    fn from(value: &DownloadTaskState<T, L>) -> Self {
59        match value {
60            DownloadTaskState::Pending => Self::Pending,
61            DownloadTaskState::Loading(_) => Self::Loading,
62            DownloadTaskState::Error(_) => Self::Error,
63            DownloadTaskState::Done(_) => Self::Done,
64            DownloadTaskState::Canceled => Self::Canceled,
65        }
66    }
67}
68
69impl<T, L> From<ManagerCoreResult<T>> for DownloadTaskState<T, L> {
70    fn from(value: ManagerCoreResult<T>) -> Self {
71        match value {
72            Ok(v) => Self::Done(v),
73            Err(v) => Self::Error(v.into()),
74        }
75    }
76}
77
78struct WaitForFinishedActor<T, L> {
79    state: Sender<DownloadTaskState<T, L>>,
80}
81
82impl<T, L> Actor for WaitForFinishedActor<T, L>
83where
84    T: 'static,
85    L: 'static,
86{
87    type Context = Context<Self>;
88}
89
90impl<T, L> Handler<TaskSubscriberMessages<DownloadTaskState<T, L>>> for WaitForFinishedActor<T, L>
91where
92    T: 'static,
93    L: 'static,
94{
95    type Result = ();
96    fn handle(
97        &mut self,
98        msg: TaskSubscriberMessages<DownloadTaskState<T, L>>,
99        ctx: &mut Self::Context,
100    ) -> Self::Result {
101        if self.state.is_closed() {
102            ctx.stop();
103            return;
104        }
105        match msg {
106            TaskSubscriberMessages::State(s) => {
107                let _ = self.state.send_replace(s);
108            }
109            TaskSubscriberMessages::ID(_) => {
110                let _ = self.state.send_replace(DownloadTaskState::Pending);
111            }
112            TaskSubscriberMessages::Dropped => {
113                let state = Into::<TaskState>::into(self.state.borrow().deref());
114                if !state.is_finished() {
115                    let _ = self.state.send_replace(DownloadTaskState::Canceled);
116                }
117            }
118        };
119    }
120}
121
122type WaitForFinishedCouple<T, L> = (
123    Recipient<TaskSubscriberMessages<DownloadTaskState<T, L>>>,
124    WaitForFinished<T, L>,
125);
126
127pub(crate) fn make_wait_for_finish_couple<T, L>() -> WaitForFinishedCouple<T, L>
128where
129    T: 'static + Send + Clone + Sync,
130    L: 'static + Send + Sync,
131{
132    let (tx, rx) = watch::channel(DownloadTaskState::Pending);
133    (
134        WaitForFinishedActor { state: tx }.start().recipient(),
135        WaitForFinished::new(rx),
136    )
137}
138
139#[derive(Debug, MessageResponse)]
140pub struct WaitForFinished<T, L> {
141    state: Receiver<DownloadTaskState<T, L>>,
142
143    fut: ReusableBoxFuture<'static, Result<T, WaitForFinishedError>>,
144}
145
146async fn make_future<T: Clone + Send + Sync, L: Send + Sync>(
147    mut rx: Receiver<DownloadTaskState<T, L>>,
148) -> Result<T, WaitForFinishedError> {
149    loop {
150        rx.changed()
151            .await
152            .map_err(WaitForFinishedError::RecvError)?;
153        match rx.borrow().deref() {
154            DownloadTaskState::Error(e) => {
155                return Err(WaitForFinishedError::Error(e.clone()));
156            }
157            DownloadTaskState::Done(d) => return Ok(d.clone()),
158            DownloadTaskState::Canceled => return Err(WaitForFinishedError::Canceled),
159            _ => {}
160        }
161    }
162}
163
164impl<T, L> WaitForFinished<T, L>
165where
166    T: Clone + Send + Sync + 'static,
167    L: Send + Sync + 'static,
168{
169    pub fn new(state: Receiver<DownloadTaskState<T, L>>) -> Self {
170        let mut rx = state.clone();
171        rx.mark_changed();
172        Self {
173            state,
174            fut: ReusableBoxFuture::new(make_future(rx)),
175        }
176    }
177}
178
179impl<T, L> Clone for WaitForFinished<T, L>
180where
181    T: Clone + Send + Sync + 'static,
182    L: Send + Sync + 'static,
183{
184    fn clone(&self) -> Self {
185        Self::new(self.state.clone())
186    }
187}
188
189#[derive(Debug, Clone, thiserror::Error)]
190pub enum WaitForFinishedError {
191    #[error("The task was been canceled")]
192    Canceled,
193    #[error("{0}")]
194    Error(OwnedError),
195    #[error(transparent)]
196    RecvError(#[from] tokio::sync::watch::error::RecvError),
197}
198
199impl<T, L> Future for WaitForFinished<T, L>
200where
201    T: Send + Sync,
202{
203    type Output = Result<T, WaitForFinishedError>;
204    fn poll(
205        mut self: std::pin::Pin<&mut Self>,
206        cx: &mut std::task::Context<'_>,
207    ) -> Poll<Self::Output> {
208        self.fut.poll(cx)
209    }
210}
211
212#[derive(Debug, Clone, Copy)]
213pub enum DownloadMessageState {
214    Pending,
215    Downloading,
216}
217
218impl Default for DownloadMessageState {
219    fn default() -> Self {
220        Self::Pending
221    }
222}