cli/tunnels/
shutdown_signal.rs1use futures::{stream::FuturesUnordered, StreamExt};
7use std::{fmt, path::PathBuf};
8use sysinfo::Pid;
9
10use crate::util::{
11 machine::{wait_until_exe_deleted, wait_until_process_exits},
12 sync::{new_barrier, Barrier, Receivable},
13};
14
15#[derive(Copy, Clone)]
17pub enum ShutdownSignal {
18 CtrlC,
19 ParentProcessKilled(Pid),
20 ExeUninstalled,
21 ServiceStopped,
22 RpcShutdownRequested,
23 RpcRestartRequested,
24}
25
26impl fmt::Display for ShutdownSignal {
27 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
28 match self {
29 ShutdownSignal::CtrlC => write!(f, "Ctrl-C received"),
30 ShutdownSignal::ParentProcessKilled(p) => {
31 write!(f, "Parent process {} no longer exists", p)
32 }
33 ShutdownSignal::ExeUninstalled => {
34 write!(f, "Executable no longer exists")
35 }
36 ShutdownSignal::ServiceStopped => write!(f, "Service stopped"),
37 ShutdownSignal::RpcShutdownRequested => write!(f, "RPC client requested shutdown"),
38 ShutdownSignal::RpcRestartRequested => {
39 write!(f, "RPC client requested a tunnel restart")
40 }
41 }
42 }
43}
44
45pub enum ShutdownRequest {
46 CtrlC,
47 ParentProcessKilled(Pid),
48 ExeUninstalled(PathBuf),
49 Derived(Box<dyn Receivable<ShutdownSignal> + Send>),
50}
51
52impl ShutdownRequest {
53 async fn wait(self) -> Option<ShutdownSignal> {
54 match self {
55 ShutdownRequest::CtrlC => {
56 let ctrl_c = tokio::signal::ctrl_c();
57 ctrl_c.await.ok();
58 Some(ShutdownSignal::CtrlC)
59 }
60 ShutdownRequest::ParentProcessKilled(pid) => {
61 wait_until_process_exits(pid, 2000).await;
62 Some(ShutdownSignal::ParentProcessKilled(pid))
63 }
64 ShutdownRequest::ExeUninstalled(exe_path) => {
65 wait_until_exe_deleted(&exe_path, 2000).await;
66 Some(ShutdownSignal::ExeUninstalled)
67 }
68 ShutdownRequest::Derived(mut rx) => rx.recv_msg().await,
69 }
70 }
71 pub fn create_rx(
74 signals: impl IntoIterator<Item = ShutdownRequest>,
75 ) -> Barrier<ShutdownSignal> {
76 let (barrier, opener) = new_barrier();
77 let futures = signals
78 .into_iter()
79 .map(|s| s.wait())
80 .collect::<FuturesUnordered<_>>();
81
82 tokio::spawn(async move {
83 if let Some(s) = futures.filter_map(futures::future::ready).next().await {
84 opener.open(s);
85 }
86 });
87
88 barrier
89 }
90}