pipeworks_tasks/
task_spawn.rs

1use std::{
2    future::Future,
3    time::{Duration, Instant},
4};
5
6use tokio::time::sleep;
7use tracing::info;
8
9use crate::{
10    task_state::TaskState,
11    task_tree::{TaskId, TaskTree},
12};
13
14#[derive(Clone, Copy)]
15pub enum RestartStrategy {
16    Immediate,
17    SleepBetween(Duration),
18}
19
20// A supervised task is an infallible task that spawns a fallible sub-tree each (re)boot. Because
21// it need to reboot the task, it needs ownership of the task (aka it needs to be a fn/closure, not
22// a future).
23pub fn spawn_supervised<T, Fut>(
24    name: impl Into<String>,
25    restart_strategy: RestartStrategy,
26    handler: T,
27) where
28    T: Fn() -> Fut + Send + Sync + 'static,
29    Fut: Future<Output = ()> + Send + Sync + 'static,
30{
31    let name: String = name.into();
32    spawn_infallible(format!("{}-supervisor", name), async move {
33        loop {
34            let future = handler();
35            let task_id = TaskId::next();
36            let parent_task_id = TaskId::try_current();
37            let future = TaskState::scope(name.clone(), task_id, parent_task_id, future);
38            let (task_tree, _task_id) = TaskTree::spawn_isolated_subtree(task_id, future);
39
40            // Wait for the task subtree to fully exit before beginning restart.
41            task_tree.tasks.wait().await;
42
43            match restart_strategy {
44                RestartStrategy::Immediate => {}
45                RestartStrategy::SleepBetween(duration) => {
46                    sleep(duration).await;
47                }
48            }
49        }
50    });
51}
52
53pub fn spawn_infallible<F>(name: impl Into<String>, future: F)
54where
55    F: Future<Output = ()> + Send + Sync + 'static,
56{
57    let name: String = name.into();
58    let task_id = TaskId::next();
59    let parent_task_id = TaskId::try_current();
60    let future = TaskState::scope(name.clone(), task_id, parent_task_id, future);
61    TaskTree::spawn_current_tree(task_id, future);
62}
63
64// Fallible tasks are their own infallible sub-tree. The cancellation token is a child of the
65// parent, and the fallible task future is a child of the parent infallible task.
66pub fn spawn_fallible<F>(name: impl Into<String>, future: F)
67where
68    F: Future<Output = ()> + Send + Sync + 'static,
69{
70    let name: String = name.into();
71    let task_id = TaskId::next();
72    let parent_task_id = TaskId::try_current();
73    let future = TaskState::scope(name.clone(), task_id, parent_task_id, future);
74    TaskTree::spawn_isolated_subtree(task_id, future);
75}
76
77pub async fn sleep_forever() {
78    sleep(Duration::MAX).await;
79}
80
81pub struct DropMessage(Instant);
82
83impl DropMessage {
84    pub fn new() -> Self {
85        info!("Staring");
86        Self(Instant::now())
87    }
88
89    pub fn will_return(self) {
90        info!("Returning after {}ms", self.0.elapsed().as_millis());
91    }
92}
93
94impl Drop for DropMessage {
95    fn drop(&mut self) {
96        info!("Dropped after {}ms", self.0.elapsed().as_millis());
97    }
98}