Skip to main content

profile_bee/
spawn.rs

1use std::io::Error;
2// use std::process::{Child, Command, Stdio};
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use tokio::process::{Child, Command};
6
7use tokio::sync::mpsc::{self, Receiver, Sender};
8
9pub struct Nothing;
10
11#[derive(Clone)]
12pub struct StopHandler {
13    tx: Sender<Nothing>,
14}
15
16impl StopHandler {
17    fn stop(&self) {
18        println!("stopping...");
19        let _ = self.tx.try_send(Nothing);
20    }
21}
22
23impl Drop for StopHandler {
24    fn drop(&mut self) {
25        self.stop();
26    }
27}
28
29pub struct SpawnProcess {
30    pid: u32,
31    child: Child,
32    running: Arc<AtomicBool>,
33    stopper_rx: Receiver<Nothing>,
34}
35
36impl SpawnProcess {
37    pub fn spawn(program: &str, args: &[&str]) -> Result<(Self, StopHandler), Error> {
38        let running = Arc::new(AtomicBool::new(true));
39        let (tx, rx) = mpsc::channel::<Nothing>(1);
40
41        let child = Command::new(program)
42            .args(args)
43            // .stdout(Stdio::piped())
44            // .stderr(Stdio::piped())
45            .spawn()?;
46
47        let pid = child.id().expect("pid");
48
49        let stop = StopHandler { tx };
50
51        Ok((
52            Self {
53                pid,
54                child,
55                running,
56                stopper_rx: rx,
57            },
58            stop,
59        ))
60    }
61
62    pub fn pid(&self) -> u32 {
63        self.pid
64    }
65
66    fn running(&self) -> bool {
67        self.running.load(Ordering::SeqCst)
68    }
69
70    async fn kill(&mut self) -> Result<(), Error> {
71        if !self.running() {
72            println!("already stopped");
73            return Ok(());
74        }
75        self.running.store(false, Ordering::SeqCst);
76        println!("killing...");
77        let r = self.child.kill().await;
78        println!("done...");
79        r
80    }
81
82    pub async fn work_done(&mut self) {
83        tokio::select! {
84            _ = self.child.wait() => {
85                // Listen to when process stops
86                println!("Child process stopped");
87                self.running.store(false, Ordering::SeqCst);
88            },
89            stopper = self.stopper_rx.recv() => {
90                match stopper {
91                    // listen on stop signals from other applications
92                    Some(_) => {
93                        println!("close signal done...");
94                        let _ = self.kill().await;
95                    }
96                    None => {
97                        println!("Disconnected");
98                        let _ = self.kill().await;
99                    }
100                }
101            }
102        }
103    }
104
105    /// Spawn new thread to monitor output in real-time
106    // pub fn monitor(&mut self) {
107    //     if let Some(stdout) = self.child.stdout.take() {
108    //         std::thread::spawn(move || {
109    //             let mut reader = io::BufReader::new(stdout);
110    //             let mut buffer = String::new();
111    //             while let Ok(n) = reader.read_line(&mut buffer) {
112    //                 if n == 0 {
113    //                     break;
114    //                 }
115    //                 print!("{}", buffer);
116    //                 buffer.clear();
117    //             }
118    //         });
119    //     }
120    // }
121
122    // pub fn monitor_stderr(&mut self) {
123    //     if let Some(stderr) = self.child.stderr.take() {
124    //         std::thread::spawn(move || {
125    //             let mut reader = io::BufReader::new(stderr);
126    //             let mut buffer = String::new();
127    //             while let Ok(n) = reader.read_line(&mut buffer) {
128    //                 if n == 0 {
129    //                     break;
130    //                 }
131    //                 eprint!("{}", buffer);
132    //                 buffer.clear();
133    //             }
134    //         });
135    //     }
136    // }
137
138    pub async fn close_signal(&mut self) -> Result<(), Error> {
139        match self.stopper_rx.recv().await {
140            Some(_) => {
141                println!("close signal done...");
142                return self.kill().await;
143            }
144            None => {
145                println!("Disconnected");
146                return self.kill().await;
147            }
148        }
149    }
150
151    // Wait for the command to complete
152    pub async fn wait(&mut self) -> Result<(), Error> {
153        let _status = self.child.wait().await?;
154        self.running.store(false, Ordering::SeqCst);
155        Ok(())
156    }
157}
158
159impl Drop for SpawnProcess {
160    fn drop(&mut self) {
161        let _ = self.kill();
162    }
163}