1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use std::{thread, time::Duration};
use pipeworks_tasks::{
task_spawn::{
DropMessage, RestartStrategy, sleep_forever, spawn_fallible, spawn_infallible,
spawn_supervised,
},
task_state::{EVENT_CHANNEL, TaskStateTrackingLayer},
};
use tokio::{sync::broadcast, time::sleep};
use tracing_subscriber::{layer::SubscriberExt as _, util::SubscriberInitExt};
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(TaskStateTrackingLayer)
.init();
let (tx, mut rx) = broadcast::channel(1024);
// Let's listen in on task events for the whole tree (up to and including "root")
tokio::spawn(async move {
while let Ok(event) = rx.recv().await {
println!("{}", event);
}
});
// Spawn a "Supervised" task, which is an infallible task running an infinite loop that
// repeatedly creates a fallible task to execute the given closure. When/if the closure returns
// or panics, all children are killed, the supervisor waits for all children to be fully shut
// down, then executes the Restart Strategy. That is to say: even when immediately rebooting,
// the supervisor will not restart until all transitive child tasks are fully cleaned up and
// destructors have been run. This is VERY useful for ensuring resource locks (like TCP
// Sockets) have been fully closed before restarting the task without race conditions.
EVENT_CHANNEL
.scope(tx, async move {
spawn_infallible("Root", async move {
spawn_supervised(
"Restart-Immediate",
RestartStrategy::Immediate,
async || {
let guard = DropMessage::new();
// Spawn a child infallible task. If this tasks returns or panics, the parent task (and
// all it's dependents) will also be failed.
spawn_infallible("Infallible Never", async move {
let _slow_drop = SlowDrop;
let guard = DropMessage::new();
//sleep(Duration::MAX).await;
sleep_forever().await;
guard.will_return();
});
// This is the task that will cause the outer supervisor to reboot in this example, after
// 10ms it just returns (which is always considered a failure).
spawn_infallible("Infallible 10ms", async move {
let guard = DropMessage::new();
sleep(Duration::from_millis(10)).await;
guard.will_return();
});
// Spawns a fallible task. Another way to think of this is as an 'isolated infallible
// sub-tree'. That is, in fact, exactly how it's implemented. When the parent task (the
// outer supervisor) is failed this task will be killed like normal. But if a child of this
// task fails, the failure does not propagate up to the parent task. This is true of both
// returns and panics.
spawn_fallible("Fallable", async move {
let guard = DropMessage::new();
// This will cause the fallible task to fail.
spawn_infallible("Fallable->Infallable 5ms", async move {
let guard = DropMessage::new();
sleep(Duration::from_millis(5)).await;
guard.will_return();
});
// This will be killed when the above task returns after 5-ish ms.
spawn_infallible("Fallable->Infallable 9ms", async move {
let guard = DropMessage::new();
sleep(Duration::from_millis(9)).await;
guard.will_return();
});
// This waits until the current isolated sub-tree (in this case, the contents of our
// fallible task) are done running. It's a good way of saying "I don't have anything
// else to do, I've spawned children, but I don't want to return because that's always
// considered a failure".
sleep_forever().await;
guard.will_return();
});
// Spawn a child Supervised task. It's no different from the outer supervisor; it behaves
// just like a fallible task in this context, in that it will never fail the parent, but
// continually retries the child. Also like a fallible task, it too will be killed if the
// parent is killed (in this case, when the outer supervisor reboots).
spawn_supervised(
"Nested Supervisor",
RestartStrategy::SleepBetween(Duration::from_millis(1)),
async || {
let guard = DropMessage::new();
sleep(Duration::from_millis(2)).await;
guard.will_return();
},
);
sleep_forever().await;
guard.will_return();
},
);
sleep_forever().await;
});
})
.await;
sleep(Duration::from_millis(20)).await;
}
pub struct SlowDrop;
impl Drop for SlowDrop {
fn drop(&mut self) {
thread::sleep(Duration::from_millis(10));
}
}