scion_sdk_utils/
task_handler.rs1use tokio::{process::Child, task::JoinSet};
17use tokio_util::sync::CancellationToken;
18
19pub struct InProcess {
21 pub task_set: CancelTaskSet,
23}
24
25impl InProcess {
26 pub fn new(task_set: CancelTaskSet) -> Self {
28 Self { task_set }
29 }
30}
31
32impl Drop for InProcess {
33 fn drop(&mut self) {
34 self.task_set.cancellation_token().cancel();
35 }
36}
37
38pub struct Subprocess {
40 pub child: Child,
42}
43
44impl Subprocess {
45 pub fn new(child: Child) -> Self {
47 Self { child }
48 }
49}
50
51impl Drop for Subprocess {
52 fn drop(&mut self) {
53 let _ = self.child.start_kill();
54 let _ = self.child.try_wait();
55 }
56}
57
58pub struct CancelTaskSet {
64 pub join_set: JoinSet<Result<(), std::io::Error>>,
66 cancellation_token: CancellationToken,
67}
68
69impl CancelTaskSet {
70 #[allow(clippy::new_without_default)]
72 pub fn new() -> Self {
73 let cancellation_token = CancellationToken::new();
74 Self::from_cancel_token(false, cancellation_token)
75 }
76
77 pub fn new_with_signal_handler() -> Self {
80 let cancellation_token = CancellationToken::new();
81 Self::from_cancel_token(true, cancellation_token)
82 }
83
84 pub fn from_cancel_token(
91 register_signal_handler: bool,
92 cancellation_token: CancellationToken,
93 ) -> Self {
94 let mut join_set = JoinSet::new();
95 if register_signal_handler {
96 Self::spawn_shutdown_handler(&mut join_set, cancellation_token.clone());
97 }
98 CancelTaskSet {
99 join_set,
100 cancellation_token,
101 }
102 }
103
104 pub fn cancellation_token(&self) -> CancellationToken {
106 self.cancellation_token.clone()
107 }
108
109 fn spawn_shutdown_handler(
110 join_set: &mut JoinSet<Result<(), std::io::Error>>,
111 cancellation_token: CancellationToken,
112 ) {
113 join_set.spawn(async move {
114 #[cfg(target_family = "unix")]
115 {
116 use tokio::signal::unix::{SignalKind, signal};
117
118 let mut sigint =
119 signal(SignalKind::interrupt()).expect("failed to register SIGINT handler");
120 let mut sigterm =
121 signal(SignalKind::terminate()).expect("failed to register SIGTERM handler");
122 tokio::select! {
123 _ = sigint.recv() => {
124 tracing::debug!("Received SIGINT, cancelling token");
125 cancellation_token.cancel();
126 },
127 _ = sigterm.recv() => {
128 tracing::debug!("Received SIGTERM, cancelling token");
129 cancellation_token.cancel();
130 },
131 _ = cancellation_token.cancelled() => {
132 tracing::debug!("Cancellation token cancelled, exiting shutdown handler");
133 },
134 }
135 }
136
137 #[cfg(target_family = "windows")]
138 {
139 use tokio::signal::windows;
140
141 let mut ctrl_c = windows::ctrl_c().expect("failed to register CTRL-C handler");
142 let mut ctrl_break =
143 windows::ctrl_break().expect("failed to register CTRL-BREAK handler");
144
145 tokio::select! {
146 _ = ctrl_c.recv() => {
147 tracing::debug!("Received CTRL-C, cancelling token");
148 cancellation_token.cancel();
149 },
150 _ = ctrl_break.recv() => {
151 tracing::debug!("Received CTRL-BREAK, cancelling token");
152 cancellation_token.cancel();
153 },
154 _ = cancellation_token.cancelled() => {
155 tracing::debug!("Cancellation token cancelled, exiting shutdown handler");
156 },
157 }
158 }
159
160 Ok(())
161 });
162 }
163
164 pub fn spawn_cancellable_task<Fut>(&mut self, task: Fut)
166 where
167 Fut: Future<Output = Result<(), std::io::Error>> + Send + 'static,
168 {
169 let token = self.cancellation_token();
170 self.join_set.spawn(async move {
171 match token.run_until_cancelled(task).await {
172 Some(Ok(_)) => Ok(()), Some(Err(e)) => Err(e), None => Ok(()), }
176 });
177 }
178
179 pub async fn join_all(&mut self) {
182 while let Some(result) = self.join_set.join_next().await {
183 match result {
184 Ok(Ok(())) => {} Ok(Err(e)) => {
186 tracing::error!(error=%e, "Task failed");
187 self.cancellation_token.cancel();
188 }
189 Err(e) => {
190 tracing::error!(error=%e, "Task join failed");
191 self.cancellation_token.cancel();
192 }
193 }
194 }
195 }
196}
197
198impl Drop for CancelTaskSet {
199 fn drop(&mut self) {
200 self.cancellation_token.cancel();
201 self.join_set.abort_all();
202 }
203}