uni_plugin_host/
shutdown.rs1use std::sync::{Arc, Mutex, RwLock};
2use std::time::Duration;
3use tokio::sync::broadcast;
4use tokio::task::JoinHandle;
5
6pub struct ShutdownHandle {
8 tx: broadcast::Sender<()>,
9 task_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
10 shutdown_initiated: Arc<RwLock<bool>>,
11 timeout: Duration,
12}
13
14impl ShutdownHandle {
15 pub fn new(timeout: Duration) -> Self {
16 let (tx, _) = broadcast::channel(16);
17 Self {
18 tx,
19 task_handles: Arc::new(Mutex::new(Vec::new())),
20 shutdown_initiated: Arc::new(RwLock::new(false)),
21 timeout,
22 }
23 }
24
25 pub fn subscribe(&self) -> broadcast::Receiver<()> {
26 self.tx.subscribe()
27 }
28
29 pub fn track_task(&self, handle: JoinHandle<()>) {
30 self.task_handles.lock().unwrap().push(handle);
31 }
32
33 pub async fn shutdown_async(&self) -> anyhow::Result<()> {
34 {
35 let mut initiated = self.shutdown_initiated.write().unwrap();
36 if *initiated {
37 return Ok(());
38 }
39 *initiated = true;
40 }
41
42 let _ = self.tx.send(());
43
44 let handles = {
45 let mut tasks = self.task_handles.lock().unwrap();
46 std::mem::take(&mut *tasks)
47 };
48
49 if !handles.is_empty() {
50 tracing::info!("Waiting for {} background tasks", handles.len());
51
52 let wait_future = async {
53 for handle in handles {
54 let _ = handle.await;
55 }
56 };
57
58 match tokio::time::timeout(self.timeout, wait_future).await {
59 Ok(_) => tracing::info!("Background tasks completed gracefully"),
60 Err(_) => tracing::warn!("Shutdown timeout reached"),
61 }
62 }
63
64 Ok(())
65 }
66
67 pub fn shutdown_blocking(&self) {
68 let _ = self.tx.send(());
69 }
70}