eureka_mmanager/download/state/
task.rs1use std::{future::Future, ops::Deref, task::Poll};
2
3use actix::prelude::*;
4use tokio::sync::watch::{self, Receiver, Sender};
5use tokio_util::sync::ReusableBoxFuture;
6
7use crate::{download::messages::TaskSubscriberMessages, ManagerCoreResult, OwnedError};
8
9#[derive(Debug, Clone)]
10pub enum DownloadTaskState<T, L> {
11 Pending,
12 Loading(L),
13 Error(OwnedError),
14 Done(T),
15 Canceled,
16}
17
18impl<T, L> Default for DownloadTaskState<T, L> {
19 fn default() -> Self {
20 Self::Pending
21 }
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, MessageResponse)]
25pub enum TaskState {
26 Pending,
27 Loading,
28 Error,
29 Done,
30 Canceled,
31}
32
33impl Default for TaskState {
34 fn default() -> Self {
35 Self::Pending
36 }
37}
38
39impl TaskState {
40 pub fn is_finished(&self) -> bool {
41 *self == Self::Done || *self == Self::Canceled || *self == Self::Error
42 }
43 pub fn is_pending(&self) -> bool {
44 matches!(*self, Self::Pending)
45 }
46 pub fn is_loading(&self) -> bool {
47 matches!(*self, Self::Loading)
48 }
49}
50
51impl<T, L> From<DownloadTaskState<T, L>> for TaskState {
52 fn from(value: DownloadTaskState<T, L>) -> Self {
53 (&value).into()
54 }
55}
56
57impl<T, L> From<&DownloadTaskState<T, L>> for TaskState {
58 fn from(value: &DownloadTaskState<T, L>) -> Self {
59 match value {
60 DownloadTaskState::Pending => Self::Pending,
61 DownloadTaskState::Loading(_) => Self::Loading,
62 DownloadTaskState::Error(_) => Self::Error,
63 DownloadTaskState::Done(_) => Self::Done,
64 DownloadTaskState::Canceled => Self::Canceled,
65 }
66 }
67}
68
69impl<T, L> From<ManagerCoreResult<T>> for DownloadTaskState<T, L> {
70 fn from(value: ManagerCoreResult<T>) -> Self {
71 match value {
72 Ok(v) => Self::Done(v),
73 Err(v) => Self::Error(v.into()),
74 }
75 }
76}
77
78struct WaitForFinishedActor<T, L> {
79 state: Sender<DownloadTaskState<T, L>>,
80}
81
82impl<T, L> Actor for WaitForFinishedActor<T, L>
83where
84 T: 'static,
85 L: 'static,
86{
87 type Context = Context<Self>;
88}
89
90impl<T, L> Handler<TaskSubscriberMessages<DownloadTaskState<T, L>>> for WaitForFinishedActor<T, L>
91where
92 T: 'static,
93 L: 'static,
94{
95 type Result = ();
96 fn handle(
97 &mut self,
98 msg: TaskSubscriberMessages<DownloadTaskState<T, L>>,
99 ctx: &mut Self::Context,
100 ) -> Self::Result {
101 if self.state.is_closed() {
102 ctx.stop();
103 return;
104 }
105 match msg {
106 TaskSubscriberMessages::State(s) => {
107 let _ = self.state.send_replace(s);
108 }
109 TaskSubscriberMessages::ID(_) => {
110 let _ = self.state.send_replace(DownloadTaskState::Pending);
111 }
112 TaskSubscriberMessages::Dropped => {
113 let state = Into::<TaskState>::into(self.state.borrow().deref());
114 if !state.is_finished() {
115 let _ = self.state.send_replace(DownloadTaskState::Canceled);
116 }
117 }
118 };
119 }
120}
121
122type WaitForFinishedCouple<T, L> = (
123 Recipient<TaskSubscriberMessages<DownloadTaskState<T, L>>>,
124 WaitForFinished<T, L>,
125);
126
127pub(crate) fn make_wait_for_finish_couple<T, L>() -> WaitForFinishedCouple<T, L>
128where
129 T: 'static + Send + Clone + Sync,
130 L: 'static + Send + Sync,
131{
132 let (tx, rx) = watch::channel(DownloadTaskState::Pending);
133 (
134 WaitForFinishedActor { state: tx }.start().recipient(),
135 WaitForFinished::new(rx),
136 )
137}
138
139#[derive(Debug, MessageResponse)]
140pub struct WaitForFinished<T, L> {
141 state: Receiver<DownloadTaskState<T, L>>,
142
143 fut: ReusableBoxFuture<'static, Result<T, WaitForFinishedError>>,
144}
145
146async fn make_future<T: Clone + Send + Sync, L: Send + Sync>(
147 mut rx: Receiver<DownloadTaskState<T, L>>,
148) -> Result<T, WaitForFinishedError> {
149 loop {
150 rx.changed()
151 .await
152 .map_err(WaitForFinishedError::RecvError)?;
153 match rx.borrow().deref() {
154 DownloadTaskState::Error(e) => {
155 return Err(WaitForFinishedError::Error(e.clone()));
156 }
157 DownloadTaskState::Done(d) => return Ok(d.clone()),
158 DownloadTaskState::Canceled => return Err(WaitForFinishedError::Canceled),
159 _ => {}
160 }
161 }
162}
163
164impl<T, L> WaitForFinished<T, L>
165where
166 T: Clone + Send + Sync + 'static,
167 L: Send + Sync + 'static,
168{
169 pub fn new(state: Receiver<DownloadTaskState<T, L>>) -> Self {
170 let mut rx = state.clone();
171 rx.mark_changed();
172 Self {
173 state,
174 fut: ReusableBoxFuture::new(make_future(rx)),
175 }
176 }
177}
178
179impl<T, L> Clone for WaitForFinished<T, L>
180where
181 T: Clone + Send + Sync + 'static,
182 L: Send + Sync + 'static,
183{
184 fn clone(&self) -> Self {
185 Self::new(self.state.clone())
186 }
187}
188
189#[derive(Debug, Clone, thiserror::Error)]
190pub enum WaitForFinishedError {
191 #[error("The task was been canceled")]
192 Canceled,
193 #[error("{0}")]
194 Error(OwnedError),
195 #[error(transparent)]
196 RecvError(#[from] tokio::sync::watch::error::RecvError),
197}
198
199impl<T, L> Future for WaitForFinished<T, L>
200where
201 T: Send + Sync,
202{
203 type Output = Result<T, WaitForFinishedError>;
204 fn poll(
205 mut self: std::pin::Pin<&mut Self>,
206 cx: &mut std::task::Context<'_>,
207 ) -> Poll<Self::Output> {
208 self.fut.poll(cx)
209 }
210}
211
212#[derive(Debug, Clone, Copy)]
213pub enum DownloadMessageState {
214 Pending,
215 Downloading,
216}
217
218impl Default for DownloadMessageState {
219 fn default() -> Self {
220 Self::Pending
221 }
222}