1use crate::actor_runtime::{Actor, Status};
2use crate::compat::watch;
3use crate::handlers::{Operation, TaskEliminated};
4use crate::ids::{Id, IdOf};
5use crate::lifecycle::{LifecycleNotifier, TaskDone};
6use crate::linkage::Address;
7use anyhow::Error;
8use async_trait::async_trait;
9use futures::{
10 future::{select, Either, FusedFuture},
11 Future, FutureExt,
12};
13use std::hash::{Hash, Hasher};
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use thiserror::Error;
18
19pub trait Tag: Send + 'static {}
22
23impl Tag for () {}
24
25#[async_trait]
29pub trait LiteTask: Sized + Send + 'static {
30 type Output: Send;
32
33 fn log_target(&self) -> &str;
35
36 async fn routine(self, mut stop: StopReceiver) -> Result<Self::Output, Error> {
42 stop.or(self.interruptable_routine())
43 .await
44 .map_err(Error::from)
45 .and_then(std::convert::identity)
47 }
48
49 async fn interruptable_routine(mut self) -> Result<Self::Output, Error> {
51 self.pre_repeatable_routine().await?;
52 loop {
53 let last_attempt = Instant::now();
54 let routine_result = self.repeatable_routine().await;
55 match routine_result {
56 Ok(Some(output)) => {
57 break Ok(output);
58 }
59 Ok(None) => {
60 self.routine_wait(last_attempt, true).await;
62 }
63 Err(err) => {
64 log::error!(target: self.log_target(), "Routine failed: {}", err);
65 self.routine_wait(last_attempt, false).await;
66 }
67 }
68 }
69 }
70
71 async fn pre_repeatable_routine(&mut self) -> Result<(), Error> {
74 Ok(())
75 }
76
77 async fn repeatable_routine(&mut self) -> Result<Option<Self::Output>, Error> {
81 Ok(None)
82 }
83
84 async fn routine_wait(&mut self, _last_attempt: Instant, _succeed: bool) {
86 let duration = Duration::from_secs(5);
87 crate::compat::delay(duration).await
88 }
89}
90
91pub(crate) fn spawn<T, S, M>(task: T, tag: M, supervisor: Option<Address<S>>) -> TaskAddress<T>
92where
93 T: LiteTask,
94 S: Actor + TaskEliminated<T, M>,
95 M: Tag,
96{
97 let id = Id::unique();
98 let (stop_sender, stop_receiver) = make_stop_channel(id.clone());
99 let id_of = IdOf::<T>::new(id.clone());
100 let done_notifier = {
101 match supervisor {
102 None => <dyn LifecycleNotifier<_>>::ignore(),
103 Some(super_addr) => {
104 let op = Operation::Done { id };
106 <dyn LifecycleNotifier<_>>::once(super_addr, op)
107 }
108 }
109 };
110 let runtime = LiteRuntime {
111 id: id_of,
112 task,
113 done_notifier,
114 stop_receiver,
115 tag,
116 };
117 crate::compat::spawn_async(runtime.entrypoint());
118 stop_sender
119}
120
121pub trait StopSignal: Future<Output = ()> + FusedFuture + Send {}
123
124impl<T> StopSignal for T where T: Future<Output = ()> + FusedFuture + Send {}
125
126#[derive(Debug, Error)]
127#[error("task interrupted by a signal")]
128pub struct TaskStopped;
129
130fn make_stop_channel<T>(id: Id) -> (TaskAddress<T>, StopReceiver) {
131 let (tx, rx) = watch::channel(Status::Alive);
132 let stop_sender = StopSender { tx: Arc::new(tx) };
133 let address = TaskAddress {
134 id: IdOf::new(id),
135 stop_sender,
136 };
137 let receiver = StopReceiver { status: rx };
138 (address, receiver)
139}
140
141#[derive(Debug, Clone)]
143pub struct StopSender {
144 tx: Arc<watch::Sender<Status>>,
145}
146
147impl StopSender {
148 pub fn stop(&self) -> Result<(), Error> {
150 self.tx.send(Status::Stop).map_err(Error::from)
151 }
152}
153
154impl<T> From<TaskAddress<T>> for StopSender {
155 fn from(addr: TaskAddress<T>) -> Self {
156 addr.stop_sender
157 }
158}
159
160#[derive(Debug)]
164pub struct TaskAddress<T> {
165 id: IdOf<T>,
166 stop_sender: StopSender,
167}
168
169impl<T> Clone for TaskAddress<T> {
170 fn clone(&self) -> Self {
171 Self {
172 id: self.id(),
173 stop_sender: self.stop_sender.clone(),
174 }
175 }
176}
177
178impl<T> TaskAddress<T> {
179 pub fn id(&self) -> IdOf<T> {
181 self.id.clone()
182 }
183
184 pub fn stop(&self) -> Result<(), Error> {
186 self.stop_sender.stop()
187 }
188}
189
190impl<T: LiteTask> PartialEq for TaskAddress<T> {
191 fn eq(&self, other: &Self) -> bool {
192 self.id.eq(&other.id)
193 }
194}
195
196impl<T: LiteTask> Eq for TaskAddress<T> {}
197
198impl<T: LiteTask> Hash for TaskAddress<T> {
199 fn hash<H: Hasher>(&self, state: &mut H) {
200 self.id.hash(state);
201 }
202}
203
204#[derive(Debug, Clone)]
206pub struct StopReceiver {
207 status: watch::Receiver<Status>,
208}
209
210impl StopReceiver {
211 pub fn is_alive(&self) -> bool {
213 *self.status.borrow() == Status::Alive
214 }
215
216 pub fn into_future(self) -> Pin<Box<dyn StopSignal>> {
218 Box::pin(just_done(self.status).fuse())
219 }
220
221 pub async fn or<Fut>(&mut self, fut: Fut) -> Result<Fut::Output, TaskStopped>
224 where
225 Fut: Future,
226 {
227 let fut = Box::pin(fut);
228 let either = select(self.clone().into_future(), fut).await;
229 match either {
230 Either::Left((_done, _rem_fut)) => Err(TaskStopped),
231 Either::Right((output, _rem_fut)) => Ok(output),
232 }
233 }
234}
235
236async fn just_done(mut status: watch::Receiver<Status>) {
237 while status.changed().await.is_ok() {
238 if status.borrow().is_done() {
239 break;
240 }
241 }
242}
243
244#[derive(Debug, Error)]
246pub enum TaskError {
247 #[error("task was interrupted")]
249 Interrupted,
250 #[error("task failed: {0}")]
252 Other(Error),
253}
254
255impl TaskError {
256 pub fn is_interrupted(&self) -> bool {
258 matches!(self, Self::Interrupted)
259 }
260
261 pub fn into_other(self) -> Option<Error> {
264 match self {
265 Self::Interrupted => None,
266 Self::Other(err) => Some(err),
267 }
268 }
269
270 #[deprecated(since = "0.86.2", note = "Use ordinary `match` for checking.")]
271 pub fn swap<T>(result: Result<T, Self>) -> Result<Option<T>, Error> {
273 match result {
274 Ok(value) => Ok(Some(value)),
275 Err(Self::Interrupted) => Ok(None),
276 Err(Self::Other(err)) => Err(err),
277 }
278 }
279}
280
281impl From<Error> for TaskError {
282 fn from(error: Error) -> Self {
283 match error.downcast::<TaskStopped>() {
284 Ok(_stopped) => Self::Interrupted,
285 Err(other) => Self::Other(other),
286 }
287 }
288}
289
290struct LiteRuntime<T: LiteTask, M: Tag> {
291 id: IdOf<T>,
292 task: T,
293 done_notifier: Box<dyn LifecycleNotifier<TaskDone<T, M>>>,
295 stop_receiver: StopReceiver,
296 tag: M,
297}
298
299impl<T: LiteTask, M: Tag> LiteRuntime<T, M> {
300 async fn entrypoint(mut self) {
301 let log_target = self.task.log_target().to_owned();
302 log::info!(target: &log_target, "Task started: {}", self.id);
303 let res = self
304 .task
305 .routine(self.stop_receiver)
306 .await
307 .map_err(TaskError::from);
308 if let Err(err) = res.as_ref() {
309 if !err.is_interrupted() {
310 log::error!(target: &log_target, "Task failed: {}: {}", self.id, err);
312 }
313 }
314 log::info!(target: &log_target, "Task finished: {}", self.id);
315 let task_done = TaskDone::new(self.id.clone(), self.tag, res);
317 if let Err(err) = self.done_notifier.notify(task_done) {
318 log::error!(
319 target: &log_target,
320 "Can't send done notification from the task {}: {}",
321 self.id,
322 err
323 );
324 }
325 }
326}