pipeworks_tasks/
task_spawn.rs1use std::{
2 future::Future,
3 time::{Duration, Instant},
4};
5
6use tokio::time::sleep;
7use tracing::info;
8
9use crate::{
10 task_state::TaskState,
11 task_tree::{TaskId, TaskTree},
12};
13
14#[derive(Clone, Copy)]
15pub enum RestartStrategy {
16 Immediate,
17 SleepBetween(Duration),
18}
19
20pub fn spawn_supervised<T, Fut>(
24 name: impl Into<String>,
25 restart_strategy: RestartStrategy,
26 handler: T,
27) where
28 T: Fn() -> Fut + Send + Sync + 'static,
29 Fut: Future<Output = ()> + Send + Sync + 'static,
30{
31 let name: String = name.into();
32 spawn_infallible(format!("{}-supervisor", name), async move {
33 loop {
34 let future = handler();
35 let task_id = TaskId::next();
36 let parent_task_id = TaskId::try_current();
37 let future = TaskState::scope(name.clone(), task_id, parent_task_id, future);
38 let (task_tree, _task_id) = TaskTree::spawn_isolated_subtree(task_id, future);
39
40 task_tree.tasks.wait().await;
42
43 match restart_strategy {
44 RestartStrategy::Immediate => {}
45 RestartStrategy::SleepBetween(duration) => {
46 sleep(duration).await;
47 }
48 }
49 }
50 });
51}
52
53pub fn spawn_infallible<F>(name: impl Into<String>, future: F)
54where
55 F: Future<Output = ()> + Send + Sync + 'static,
56{
57 let name: String = name.into();
58 let task_id = TaskId::next();
59 let parent_task_id = TaskId::try_current();
60 let future = TaskState::scope(name.clone(), task_id, parent_task_id, future);
61 TaskTree::spawn_current_tree(task_id, future);
62}
63
64pub fn spawn_fallible<F>(name: impl Into<String>, future: F)
67where
68 F: Future<Output = ()> + Send + Sync + 'static,
69{
70 let name: String = name.into();
71 let task_id = TaskId::next();
72 let parent_task_id = TaskId::try_current();
73 let future = TaskState::scope(name.clone(), task_id, parent_task_id, future);
74 TaskTree::spawn_isolated_subtree(task_id, future);
75}
76
77pub async fn sleep_forever() {
78 sleep(Duration::MAX).await;
79}
80
81pub struct DropMessage(Instant);
82
83impl DropMessage {
84 pub fn new() -> Self {
85 info!("Staring");
86 Self(Instant::now())
87 }
88
89 pub fn will_return(self) {
90 info!("Returning after {}ms", self.0.elapsed().as_millis());
91 }
92}
93
94impl Drop for DropMessage {
95 fn drop(&mut self) {
96 info!("Dropped after {}ms", self.0.elapsed().as_millis());
97 }
98}