tokio_interactive/lib.rs
1#![doc = include_str!("../README.md")]
2use anyhow::{anyhow, Result};
3use log::*;
4use serde::{Deserialize, Serialize};
5use std::collections::{HashMap, VecDeque};
6use std::fmt::{Debug, Formatter};
7use std::path::PathBuf;
8use std::sync::{Arc, OnceLock};
9use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
10use tokio::process::Command;
11use tokio::sync::mpsc::{Receiver, Sender};
12use tokio::sync::Mutex;
13// Change this line
14
15/// A static, lazily-initialized process pool used to manage asynchronous interactive processes.
16///
17/// This global variable utilizes the `OnceLock` to ensure it is initialized only once during
18/// the program's lifetime. It holds an `Arc<Mutex<HashMap<u32, AsynchronousInteractiveProcess>>>`
19/// to allow safe concurrent access and modification of the process pool.
20///
21/// - `OnceLock`: Provides thread-safe, one-time initialization of the process pool.
22/// - `Arc`: Ensures the `Mutex<HashMap<...>>` can be shared across threads safely.
23/// - `Mutex`: Protects the `HashMap` from concurrent modification, ensuring thread safety.
24/// - `HashMap<u32, AsynchronousInteractiveProcess>`: Maps process IDs (`u32`) to
25/// corresponding `AsynchronousInteractiveProcess` instances.
26///
27/// Usage:
28///
29/// The process pool is intended to store and manage asynchronous interactive processes by their IDs.
30/// Access needs to ensure proper locking with the `Mutex` guard to guarantee thread safety.
31///
32/// Example initialization:
33/// ```rust
34/// PROCESS_POOL.get_or_init(|| Arc::new(Mutex::new(HashMap::new())));
35/// ```
36///
37/// Example access:
38/// ```rust
39/// if let Some(pool) = PROCESS_POOL.get() {
40/// let mut guard = pool.lock().unwrap();
41/// guard.insert(process_id, new_process);
42/// }
43/// ```
44/// Ensure that any operations on the pool respect the locking mechanism provided by the `Mutex`.
45///
46/// Once initialized, `PROCESS_POOL` cannot be re-initialized or re-assigned.
47static PROCESS_POOL: OnceLock<Arc<Mutex<HashMap<u32, AsynchronousInteractiveProcess>>>> = OnceLock::new();
48
49/// Represents a handle to a process identified by its process ID (PID).
50///
51/// The `ProcessHandle` struct is used to encapsulate the process identifier (PID)
52/// of a running process. It is designed to be lightweight and can be cloned
53/// or debugged as required.
54///
55/// # Fields
56/// - `pid`: A `u32` value that represents the process ID of the targeted process.
57///
58/// # Traits
59/// - `Debug`: Allows instances of `ProcessHandle` to be formatted using the `{:?}` formatter
60/// for debugging purposes.
61/// - `Clone`: Allows the `ProcessHandle` to be cloned, creating a new instance with the
62/// same `pid`.
63///
64/// # Example
65/// ```
66/// let handle = ProcessHandle { pid: 1234 };
67/// println!("{:?}", handle); // Outputs: ProcessHandle { pid: 1234 }
68/// let cloned_handle = handle.clone();
69/// println!("{:?}", cloned_handle); // Outputs: ProcessHandle { pid: 1234 }
70/// ```
71#[derive(Debug, Clone)]
72pub struct ProcessHandle {
73 pid: u32,
74}
75
76/// `AsynchronousInteractiveProcess` is a structure that represents a non-blocking,
77/// interactive process. It provides metadata and mechanisms to interact with a
78/// process asynchronously using sender and receiver channels.
79///
80/// # Fields
81///
82/// * `pid` - An optional process ID (`pid`) of the interactive process. If the process
83/// is not yet started, this field will remain `None`.
84///
85/// * `filename` - The name of the executable file that represents the interactive process.
86/// This is required to identify and initiate the process.
87///
88/// * `arguments` - A vector containing the arguments to pass to the executable file
89/// when launching the process.
90///
91/// * `working_directory` - The directory where the interactive process will be executed.
92/// This is represented as a `PathBuf`.
93///
94/// * `sender` - An optional `Sender` channel used to send messages or data to the
95/// interactive process. This field is skipped during serialization and deserialization
96/// because it holds runtime-related data.
97///
98/// * `receiver` - An optional `Receiver` channel used to receive messages or data
99/// from the interactive process. This field is also skipped during serialization
100/// and deserialization for the same reasons as `sender`.
101///
102/// * `input_queue` - A deque (double-ended queue) that maintains an in-memory buffer
103/// of strings representing input for the interactive process. This field is skipped
104/// during serialization as it only contains transient runtime-related data.
105///
106/// # Trait Implementations
107///
108/// - `Debug`: Allows instances of this struct to be formatted and logged for debugging purposes.
109/// - `Serialize`: Makes the struct serializable, excluding fields marked with `#[serde(skip)]`.
110/// - `Deserialize`: Allows deserialization to create instances of this struct from serialized data.
111/// - `Default`: Provides a default implementation where optional fields are set to `None`,
112/// collections are empty, and `filename` is an empty string.
113///
114/// # Example
115/// ```rust
116/// use std::path::PathBuf;
117/// use std::collections::VecDeque;
118/// use serde::{Serialize, Deserialize};
119/// use crossbeam_channel::{Sender, Receiver};
120///
121/// let process = AsynchronousInteractiveProcess {
122/// pid: None,
123/// filename: "my_program".to_string(),
124/// arguments: vec!["--help".to_string()],
125/// working_directory: PathBuf::from("/path/to/dir"),
126/// sender: None,
127/// receiver: None,
128/// input_queue: VecDeque::new(),
129/// };
130///
131/// println!("{:?}", process);
132/// ```
133///
134/// This structure is ideal for scenarios where processes need to be controlled
135/// asynchronously with multiple input or output channels.
136#[derive(Serialize, Deserialize, Default)]
137pub struct AsynchronousInteractiveProcess {
138 pub pid: Option<u32>,
139 pub filename: String,
140 pub arguments: Vec<String>,
141 pub working_directory: PathBuf,
142 #[serde(skip)]
143 sender: Option<Sender<String>>,
144 #[serde(skip)]
145 receiver: Option<Receiver<String>>,
146 #[serde(skip)]
147 input_queue: VecDeque<String>,
148 #[serde(skip)]
149 exit_callback: Option<Arc<dyn Fn(i32) + Send + Sync>>,
150}
151
152impl Debug for AsynchronousInteractiveProcess{
153 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154 write!(f, "AsynchronousInteractiveProcess {{ pid: {:?}, filename: {:?}, arguments: {:?}, working_directory: {:?} }}", self.pid, self.filename, self.arguments, self.working_directory)
155 }
156}
157
158impl ProcessHandle {
159 /// Receives an output message asynchronously from the process associated with the instance's `pid`
160 /// if available.
161 ///
162 /// This function attempts to fetch a message by accessing the process pool and checking for a
163 /// message in the associated process's receiver.
164 ///
165 /// The process follows these steps:
166 /// 1. Check if the global process pool (`PROCESS_POOL`) is initialized.
167 /// 2. Attempt to acquire the lock on the pool asynchronously.
168 /// 3. Look for the process associated with `self.pid`.
169 /// 4. If the process has a receiver channel, attempt to get a message using `try_recv`.
170 /// 5. If a message is found, return it as `Ok(Some(String))`.
171 /// 6. If no message is received, retry up to `MAX_RETRIES` times, with a delay of `RETRY_DELAY_MS`
172 /// milliseconds between each retry.
173 ///
174 /// If no message is received after all retries, the function returns `Ok(None)`.
175 ///
176 /// # Returns
177 ///
178 /// * `Ok(Some(String))` - If a message is successfully received from the receiver channel.
179 /// * `Ok(None)` - If no message is received after all retries.
180 /// * `Err(Error)` - If an error occurs during the process, such as:
181 /// - The process pool is not initialized
182 /// - The process with the specified PID is not found
183 /// - The receiver channel is not available
184 /// - The receiver channel is disconnected
185 ///
186 /// # Behavior
187 ///
188 /// - This function uses `tokio::time::sleep` for introducing delays between retries and works
189 /// asynchronously.
190 /// - The function uses constants `MAX_RETRIES` (10) and `RETRY_DELAY_MS` (10) to control the
191 /// retry behavior.
192 /// - The function properly propagates errors that occur during the process.
193 ///
194 /// # Example
195 ///
196 /// ```rust
197 /// match instance.receive_output().await {
198 /// Ok(Some(output)) => println!("Received output: {}", output),
199 /// Ok(None) => println!("No output received."),
200 /// Err(e) => eprintln!("Error receiving output: {}", e),
201 /// }
202 /// ```
203 ///
204 /// # Note
205 ///
206 /// - The function assumes the existence of a global process pool (`PROCESS_POOL`) which is safe
207 /// to access concurrently using mutex-based locking.
208 /// - The function makes use of `tokio::sync::Mutex` to handle concurrent executions across
209 /// asynchronous tasks.
210 /// Default timeout for receive_output in milliseconds
211 const DEFAULT_TIMEOUT_MS: u64 = 100;
212
213 /// Helper function to try receiving a message
214 async fn try_receive_message(
215 pid: u32,
216 pool: &mut tokio::sync::MutexGuard<'_, HashMap<u32, AsynchronousInteractiveProcess>>,
217 ) -> Result<Option<String>> {
218 if let Some(process) = pool.get_mut(&pid) {
219 if let Some(receiver) = &mut process.receiver {
220 match receiver.try_recv() {
221 Ok(msg) => return Ok(Some(msg)),
222 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {}
223 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
224 return Err(anyhow!("Channel disconnected for process {}", pid));
225 }
226 }
227 } else {
228 return Err(anyhow!("Receiver not available for process {}", pid));
229 }
230 } else {
231 return Err(anyhow!("Process {} not found", pid));
232 }
233 Ok(None)
234 }
235
236 pub async fn receive_output(&self) -> Result<Option<String>> {
237 // Use the default timeout
238 self.receive_output_with_timeout(std::time::Duration::from_millis(Self::DEFAULT_TIMEOUT_MS)).await
239 }
240
241 /// Receives an output message asynchronously from the process associated with the instance's `pid`
242 /// if available, with a specified timeout.
243 ///
244 /// This function is similar to `receive_output()` but allows specifying a custom timeout duration
245 /// instead of using the default timeout.
246 ///
247 /// # Arguments
248 ///
249 /// * `timeout` - A `std::time::Duration` specifying how long to wait for a message before giving up.
250 ///
251 /// # Returns
252 ///
253 /// * `Ok(Some(String))` - If a message is successfully received from the receiver channel.
254 /// * `Ok(None)` - If no message is received before the timeout expires.
255 /// * `Err(Error)` - If an error occurs during the process, such as:
256 /// - The process pool is not initialized
257 /// - The process with the specified PID is not found
258 /// - The receiver channel is not available
259 /// - The receiver channel is disconnected
260 ///
261 /// # Example
262 ///
263 /// ```rust
264 /// // Wait for up to 5 seconds for a message
265 /// match instance.receive_output_with_timeout(std::time::Duration::from_secs(5)).await {
266 /// Ok(Some(output)) => println!("Received output: {}", output),
267 /// Ok(None) => println!("No output received within timeout."),
268 /// Err(e) => eprintln!("Error receiving output: {}", e),
269 /// }
270 /// ```
271 pub async fn receive_output_with_timeout(&self, timeout: std::time::Duration) -> Result<Option<String>> {
272 // Define constants for retry parameters
273 const RETRY_DELAY_MS: u64 = 10;
274
275 if let Some(process_pool) = PROCESS_POOL.get() {
276 // Try to receive a message immediately
277 {
278 let mut pool = process_pool.lock().await;
279 if let Some(msg) = Self::try_receive_message(self.pid, &mut pool).await? {
280 return Ok(Some(msg));
281 }
282 }
283
284 // If no message is available, retry with delay until timeout is reached
285 let start_time = std::time::Instant::now();
286 while start_time.elapsed() < timeout {
287 tokio::time::sleep(tokio::time::Duration::from_millis(RETRY_DELAY_MS)).await;
288
289 let mut pool = process_pool.lock().await;
290 if let Some(msg) = Self::try_receive_message(self.pid, &mut pool).await? {
291 return Ok(Some(msg));
292 }
293 }
294
295 // No message after timeout
296 return Ok(None);
297 }
298
299 Err(anyhow!("Process pool not initialized"))
300 }
301
302 /// Sends the provided input to a process associated with this instance's PID asynchronously.
303 ///
304 /// # Arguments
305 ///
306 /// * `input` - An input of any type that can be converted into a `String`. This is the data
307 /// that will be sent to the respective process's input queue.
308 ///
309 /// # Returns
310 ///
311 /// * `Ok(())` if the input was successfully sent or queued for sending.
312 /// * `Err` - Returns an error in the following cases:
313 /// - If the process pool is not initialized.
314 /// - If the process associated with this PID is not found.
315 /// - If the process was not started or its sender channel is not available.
316 /// - If the sender channel is closed and input cannot be sent.
317 ///
318 /// # Details
319 ///
320 /// This function retrieves the process associated with the current instance's `pid`
321 /// from a global process pool. If a valid process is found:
322 /// - The input is sent via an asynchronous channel to the process.
323 /// - If the channel is full, the input is queued for later sending.
324 /// - If the channel is closed, an error is returned.
325 ///
326 /// The function also ensures thread safety by acquiring a lock on the process pool before attempting
327 /// any operations related to the process.
328 ///
329 /// # Errors
330 ///
331 /// This function propagates several potential issues as errors:
332 /// - If the process pool is uninitialized (`PROCESS_POOL.get()` returns `None`).
333 /// - If the process associated with the PID is missing in the pool.
334 /// - If the sender channel was never initialized or is unavailable.
335 /// - If the sender channel is closed and no new messages can be sent.
336 ///
337 /// # Example
338 ///
339 /// ```rust
340 /// use anyhow::Result;
341 ///
342 /// #[tokio::main]
343 /// async fn main() -> Result<()> {
344 /// let manager = ProcessManager::new(1); // Assumes a struct is managing process with ID 1.
345 /// manager.send_input("Some input").await?;
346 /// Ok(())
347 /// }
348 /// ```
349 ///
350 /// # Note
351 ///
352 /// This function expects a global process pool (`PROCESS_POOL`) to be properly initialized before being called.
353 /// Additionally, the associated process must have a valid `sender` channel to accept input.
354 pub async fn send_input(&self, input: impl Into<String>) -> Result<()> {
355 let input_str = input.into();
356 if let Some(process_pool) = PROCESS_POOL.get() {
357 let mut pool = process_pool.lock().await;
358 if let Some(process) = pool.get_mut(&self.pid) {
359 if let Some(sender) = &process.sender {
360 match sender.try_send(input_str.clone()) {
361 Ok(_) => Ok(()),
362 Err(e) => match e {
363 tokio::sync::mpsc::error::TrySendError::Full(_) => {
364 process.input_queue.push_back(input_str);
365 Ok(())
366 }
367 tokio::sync::mpsc::error::TrySendError::Closed(_) => {
368 Err(anyhow!("Failed to send input: channel closed"))
369 }
370 },
371 }
372 } else {
373 Err(anyhow!("Process not started or sender not available"))
374 }
375 } else {
376 Err(anyhow!("Process not found"))
377 }
378 } else {
379 Err(anyhow!("Process pool not initialized"))
380 }
381 }
382
383 /// Checks if the process associated with the current instance is running.
384 ///
385 /// This method attempts to determine if a process with the `pid` of the current instance
386 /// exists in the global `PROCESS_POOL`.
387 ///
388 /// # Returns
389 /// * `true` - If the process with the associated `pid` is currently present in the global process pool.
390 /// * `false` - If the process is not found in the global process pool or if the pool is not initialized.
391 ///
392 /// # Async Behavior
393 /// This method is asynchronous because it acquires a lock on the `PROCESS_POOL`.
394 ///
395 /// # Notes
396 /// - The `PROCESS_POOL` must be initialized before calling this function.
397 /// If `PROCESS_POOL` is not set, the function will return `false`.
398 /// - The `PROCESS_POOL` is expected to be a globally accessible, asynchronous, and thread-safe
399 /// data structure that tracks active processes.
400 ///
401 /// # Example
402 /// ```rust
403 /// let result = instance.is_process_running().await;
404 /// if result {
405 /// println!("Process is running.");
406 /// } else {
407 /// println!("Process is not running.");
408 /// }
409 /// ```
410 pub async fn is_process_running(&self) -> bool {
411 if let Some(process_pool) = PROCESS_POOL.get() {
412 let pool = process_pool.lock().await;
413 return pool.contains_key(&self.pid);
414 }
415 false
416 }
417
418 /// Asynchronously terminates a process identified by its `pid`.
419 ///
420 /// This method performs the following operations tailored to the target operating system:
421 ///
422 /// - **Windows**: Opens a handle to the process using its process ID (`pid`) and forcefully terminates it
423 /// using the `TerminateProcess` function from the WinAPI.
424 /// - **Linux**: Utilizes the `kill` system call with the `SIGKILL` signal to forcefully terminate the process.
425 ///
426 /// ## Platform-specific Notes:
427 ///
428 /// - On **Windows**, the process is identified and terminated using the `OpenProcess` and `TerminateProcess`
429 /// functions from the WinAPI.
430 /// - On **Linux**, the `kill` system call is used with the signal `SIGKILL` (9) to ensure the process is terminated.
431 ///
432 /// # Errors
433 ///
434 /// - Returns an error if the process termination fails (on Linux) due to system call errors or invalid process IDs.
435 /// On failure, the error contains details about the `pid` and the last OS error encountered.
436 ///
437 /// # Safety
438 ///
439 /// This method uses unsafe code blocks to interact with system APIs (`libc` on Linux, WinAPI on Windows). Ensure
440 /// that the provided `pid` corresponds to a valid process, and consider the implications of forcefully
441 /// terminating processes.
442 ///
443 /// # Example
444 ///
445 /// ```rust
446 /// let process_manager = SomeProcessManager::new(pid); // Example struct containing the pid
447 /// if let Err(e) = process_manager.kill().await {
448 /// eprintln!("Failed to terminate process: {}", e);
449 /// }
450 /// ```
451 /// Attempts to gracefully shut down the process, falling back to forceful termination if needed.
452 ///
453 /// This method first tries to gracefully shut down the process by:
454 /// - On Linux: Sending a SIGTERM signal
455 /// - On Windows: Sending a WM_CLOSE message to the main window
456 ///
457 /// If the process doesn't exit within the specified timeout, it will forcefully terminate
458 /// the process using the `kill()` method.
459 ///
460 /// # Arguments
461 ///
462 /// * `timeout` - A `std::time::Duration` specifying how long to wait for the process to exit
463 /// gracefully before forcefully terminating it.
464 ///
465 /// # Returns
466 ///
467 /// * `Ok(())` - If the process was successfully shut down (either gracefully or forcefully).
468 /// * `Err(Error)` - If an error occurred during the shutdown process.
469 ///
470 /// # Example
471 ///
472 /// ```rust
473 /// // Try to shut down gracefully, waiting up to 5 seconds before force killing
474 /// if let Err(e) = process.shutdown(std::time::Duration::from_secs(5)).await {
475 /// eprintln!("Failed to shut down process: {}", e);
476 /// }
477 /// ```
478 pub async fn shutdown(&self, timeout: std::time::Duration) -> Result<()> {
479 // First, try to gracefully shut down the process
480 let graceful_shutdown_result = self.graceful_shutdown().await;
481
482 // If graceful shutdown failed or isn't implemented for this platform, log it but continue
483 if let Err(e) = &graceful_shutdown_result {
484 debug!("Graceful shutdown attempt failed: {}", e);
485 return self.kill().await;
486 }
487
488 // Wait for the process to exit for the specified timeout
489 let start_time = std::time::Instant::now();
490 while start_time.elapsed() < timeout {
491 if !self.is_process_running().await {
492 return Ok(());
493 }
494 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
495 }
496
497 // If we're here, the process didn't exit gracefully within the timeout
498 // Fall back to forceful termination
499 debug!("Process did not exit gracefully within timeout, forcing termination");
500 self.kill().await
501 }
502
503 /// Attempts to gracefully shut down the process without forceful termination.
504 ///
505 /// This method is platform-specific:
506 /// - On Linux: Sends a SIGTERM signal to request graceful termination
507 /// - On Windows: First tries to send a Ctrl+C event to the process using GenerateConsoleCtrlEvent.
508 /// If that fails, it attempts to send an "exit" command to the process via stdin.
509 ///
510 /// # Returns
511 ///
512 /// * `Ok(())` - If the graceful shutdown signal was successfully sent.
513 /// * `Err(Error)` - If an error occurred while sending the graceful shutdown signal.
514 ///
515 /// # Notes
516 ///
517 /// - A successful return doesn't guarantee the process will actually exit.
518 /// Use `shutdown()` with a timeout to ensure the process exits.
519 /// - On Windows, the GenerateConsoleCtrlEvent function works with process groups, not individual
520 /// processes, so it may not work for all processes. The fallback "exit" command approach
521 /// works for many command-line applications that accept such commands.
522 /// - For GUI applications on Windows, neither approach may work. In such cases, the `shutdown()`
523 /// method will fall back to forceful termination after the timeout.
524 async fn graceful_shutdown(&self) -> Result<()> {
525 #[cfg(target_os = "windows")]
526 {
527 unsafe {
528 // First, try to send Ctrl+C event to the process
529 // CTRL_C_EVENT = 0, CTRL_BREAK_EVENT = 1
530 // Note: GenerateConsoleCtrlEvent works with process groups, not individual processes
531 // For console applications, we can try sending Ctrl+C to the process
532 let result = winapi::um::wincon::GenerateConsoleCtrlEvent(0, self.pid);
533 if result == 0 {
534 return Err(anyhow!(
535 "Failed to send Ctrl+C to process {}: {}",
536 self.pid,
537 std::io::Error::last_os_error()
538 ));
539 }
540 }
541 return Ok(());
542 }
543
544 #[cfg(target_os = "linux")]
545 {
546 unsafe {
547 // Use the kill system call with SIGTERM (15) to request graceful termination
548 let result = libc::kill(self.pid as libc::pid_t, libc::SIGTERM);
549 if result != 0 {
550 return Err(anyhow!(
551 "Failed to send SIGTERM to process {}: {}",
552 self.pid,
553 std::io::Error::last_os_error()
554 ));
555 }
556 }
557 return Ok(());
558 }
559
560 #[cfg(not(any(target_os = "windows", target_os = "linux")))]
561 {
562 return Err(anyhow!("Graceful shutdown not implemented for this platform"));
563 }
564 }
565
566 /// Forcefully terminates the process immediately.
567 ///
568 /// This method should be used as a last resort when a process needs to be terminated
569 /// immediately. For a more graceful approach, consider using `shutdown()` first.
570 ///
571 /// # Returns
572 ///
573 /// * `Ok(())` - If the process was successfully terminated.
574 /// * `Err(Error)` - If an error occurred during the termination process.
575 ///
576 /// # Example
577 ///
578 /// ```rust
579 /// if let Err(e) = process.kill().await {
580 /// eprintln!("Failed to terminate process: {}", e);
581 /// }
582 /// ```
583 pub async fn kill(&self) -> Result<()> {
584 #[cfg(target_os = "windows")]
585 {
586 unsafe {
587 // PROCESS_TERMINATE (0x00010000) access right is required to terminate a process
588 let handle = winapi::um::processthreadsapi::OpenProcess(0x00010000, 0, self.pid);
589 if handle.is_null() {
590 return Err(anyhow!("Failed to open process {}: {}", self.pid, std::io::Error::last_os_error()));
591 }
592
593 // Attempt to terminate the process
594 let result = winapi::um::processthreadsapi::TerminateProcess(handle, 0);
595
596 // Always close the handle to prevent resource leaks
597 let close_result = winapi::um::handleapi::CloseHandle(handle);
598
599 // Check if termination was successful
600 if result == 0 {
601 return Err(anyhow!(
602 "Failed to terminate process {}: {}",
603 self.pid,
604 std::io::Error::last_os_error()
605 ));
606 }
607
608 // Check if handle was closed successfully
609 if close_result == 0 {
610 warn!(
611 "Failed to close process handle for process {}: {}",
612 self.pid,
613 std::io::Error::last_os_error()
614 );
615 // We don't return an error here as the process was terminated successfully
616 }
617 }
618 }
619 #[cfg(target_os = "linux")]
620 {
621 unsafe {
622 // Use the kill system call with SIGKILL (9) to forcefully terminate the process
623 let result = libc::kill(self.pid as libc::pid_t, libc::SIGKILL);
624 if result != 0 {
625 return Err(anyhow!("Failed to kill process {}: {}", self.pid, std::io::Error::last_os_error()));
626 }
627 }
628 }
629
630 Ok(())
631 }
632}
633
634impl AsynchronousInteractiveProcess {
635 /// Creates a new instance of the struct with default values.
636 ///
637 /// # Arguments
638 ///
639 /// * `filename` - A value that can be converted into a `String`. This typically represents the
640 /// name or path of the file associated with the instance.
641 ///
642 /// # Returns
643 ///
644 /// Returns a new instance of the struct populated with default fields:
645 /// - `pid`: `None` (indicating no process ID is associated yet).
646 /// - `filename`: The provided `filename` converted into a `String`.
647 /// - `arguments`: An empty vector, representing no initial arguments.
648 /// - `working_directory`: Set to the current directory (`"./"`).
649 /// - `sender`: `None`, indicating no sender is associated initially.
650 /// - `receiver`: `None`, indicating no receiver is associated initially.
651 /// - `input_queue`: An empty `VecDeque`, representing no items in the input queue.
652 ///
653 /// # Example
654 ///
655 /// ```
656 /// let instance = MyStruct::new("example.txt");
657 /// assert_eq!(instance.filename, "example.txt");
658 /// assert!(instance.pid.is_none());
659 /// assert!(instance.arguments.is_empty());
660 /// assert_eq!(instance.working_directory, PathBuf::from("./"));
661 /// assert!(instance.sender.is_none());
662 /// assert!(instance.receiver.is_none());
663 /// assert!(instance.input_queue.is_empty());
664 /// ```
665 pub fn new(filename: impl Into<String>) -> Self {
666 Self {
667 pid: None,
668 filename: filename.into(),
669 arguments: Vec::new(),
670 working_directory: PathBuf::from("./"),
671 sender: None,
672 receiver: None,
673 input_queue: VecDeque::new(),
674 exit_callback: None,
675 }
676 }
677
678 /// Sets the arguments for the current instance by converting a vector of items
679 /// implementing `Into<String>` into a `Vec<String>`.
680 ///
681 /// # Parameters
682 /// - `args`: A `Vec` containing items that implement the `Into<String>` trait. These
683 /// items will be converted into `String` and used to set the `arguments` field
684 /// of the instance.
685 ///
686 /// # Returns
687 /// - `Self`: The current instance (`self`) after updating its `arguments` field.
688 ///
689 /// # Example
690 /// ```rust
691 /// let instance = MyStruct::new()
692 /// .with_arguments(vec!["arg1", "arg2", String::from("arg3")]);
693 /// ```
694 ///
695 /// This method allows chaining, as it returns the updated instance
696 /// after setting the `arguments` field.
697 pub fn with_arguments(mut self, args: Vec<impl Into<String>>) -> Self {
698 self.arguments = args.into_iter().map(|arg| arg.into()).collect();
699 self
700 }
701
702 /// Adds an argument to the `arguments` vector and returns the modified instance.
703 ///
704 /// # Parameters
705 ///
706 /// * `arg` - A value that implements the `Into<String>` trait, which will be converted into a `String`
707 /// and added to the `arguments` vector.
708 ///
709 /// # Returns
710 ///
711 /// Returns the modified instance of the implementing struct (`Self`) with the new argument added.
712 ///
713 /// # Example
714 ///
715 /// ```rust
716 /// let instance = SomeStruct::new().with_argument("example");
717 /// ```
718 ///
719 /// This adds the string `"example"` to the `arguments` vector of `SomeStruct`.
720 pub fn with_argument(mut self, arg: impl Into<String>) -> Self {
721 self.arguments.push(arg.into());
722 self
723 }
724
725 /// Sets the working directory for the instance and returns the modified instance.
726 ///
727 /// # Arguments
728 ///
729 /// * `dir` - A value that can be converted into a `PathBuf`, representing the desired working directory.
730 ///
731 /// # Returns
732 ///
733 /// The instance of the struct with the updated working directory.
734 ///
735 /// # Example
736 ///
737 /// ```rust
738 /// let instance = MyStruct::new()
739 /// .with_working_directory("/path/to/directory");
740 /// ```
741 pub fn with_working_directory(mut self, dir: impl Into<PathBuf>) -> Self {
742 self.working_directory = dir.into();
743 self
744 }
745
746 pub fn process_exit_callback<F>(mut self, callback: F) -> Self
747 where
748 F: Fn(i32) + Send + Sync + 'static,
749 {
750 self.exit_callback = Some(Arc::new(callback));
751 self
752 }
753
754 /// Starts a new process based on the configuration stored in the struct, manages
755 /// its I/O streams asynchronously, and tracks its lifecycle in a shared process pool.
756 ///
757 /// # Returns
758 /// - `Ok<u32>`: Returns the process ID (PID) if the process starts successfully.
759 /// - `Err(std::io::Error)`: Returns an error if any part of the process start operation fails.
760 ///
761 /// # Process Configuration
762 /// - The process is launched using the executable specified in `self.filename`.
763 /// - Command-line arguments are passed via `self.arguments`.
764 /// - The process will run in the directory specified in `self.working_directory`.
765 ///
766 /// # Standard I/O Management
767 /// - The process's `stdin` is assigned a `tokio::sync::mpsc::channel` for communication.
768 /// - `stdout` and `stderr` are read asynchronously. Each line from these streams is sent
769 /// to an `mpsc::channel`, where `stdout`/`stderr` data can be consumed.
770 ///
771 /// # Shared Process Pool
772 /// - The process metadata, including its PID and I/O channels, is stored in a global,
773 /// thread-safe `PROCESS_POOL`.
774 /// - The process pool is managed using a `tokio::sync::Mutex` and stores processes in a
775 /// `HashMap` with their PID as the key.
776 ///
777 /// # Asynchronous I/O
778 /// - A background task is spawned to write data from an internal input queue to the process's
779 /// `stdin`.
780 /// - Another background task continuously reads and forwards `stdout` and `stderr` messages
781 /// from the process.
782 /// - If the process exits or encounters an error, its PID and metadata are removed from the
783 /// process pool.
784 ///
785 /// # Example Use Case
786 /// ```no_run
787 /// let mut my_process = MyProcess {
788 /// filename: "my_program".to_string(),
789 /// arguments: vec!["--arg1", "value1".to_string()],
790 /// working_directory: "/path/to/dir".to_string(),
791 /// pid: None,
792 /// sender: None,
793 /// receiver: None,
794 /// input_queue: VecDeque::new(),
795 /// };
796 ///
797 /// let pid = my_process.start().await.unwrap();
798 /// println!("Started process with PID: {}", pid);
799 /// ```
800 ///
801 /// # Notes
802 /// - The process's `stdin`, `stdout`, and `stderr` are piped to capture and manage
803 /// communication asynchronously.
804 /// - Each spawned background task is independently responsible for managing a specific
805 /// stream or part of the process lifecycle.
806 /// - Errors during I/O operations or spawning the process are logged using the `log` crate
807 /// macros (e.g., `error!`, `debug!`).
808 ///
809 /// # Potential Errors
810 /// - Failure to spawn the process (e.g., if `self.filename` is invalid or inaccessible).
811 /// - Errors during communication with the process's I/O streams (e.g., writing to a closed `stdin`).
812 /// - Mutex locking failure on the shared process pool due to an internal inconsistency.
813 pub async fn start(&mut self) -> Result<u32> {
814 let mut command = Command::new(&self.filename);
815 command.args(&self.arguments);
816 command.current_dir(&self.working_directory);
817
818 command.stdin(std::process::Stdio::piped());
819 command.stdout(std::process::Stdio::piped());
820 command.stderr(std::process::Stdio::piped());
821
822 let mut child = command.spawn()?;
823 let pid = child.id().unwrap_or(0);
824
825 let (stdin_sender, mut stdin_receiver) = tokio::sync::mpsc::channel::<String>(100);
826 let (stdout_sender, stdout_receiver) = tokio::sync::mpsc::channel::<String>(100);
827
828 if let Some(mut stdin) = child.stdin.take() {
829 tokio::spawn(async move {
830 while let Some(input) = stdin_receiver.recv().await {
831 if let Err(e) = stdin.write_all(input.as_bytes()).await {
832 error!("Failed to write to process stdin: {}", e);
833 break;
834 }
835 if let Err(e) = stdin.write_all(b"\n").await {
836 error!("Failed to write newline to process stdin: {}", e);
837 break;
838 }
839 if let Err(e) = stdin.flush().await {
840 error!("Failed to flush process stdin: {}", e);
841 break;
842 }
843 }
844 });
845 }
846
847 if let Some(stdout) = child.stdout.take() {
848 let stdout_sender_clone = stdout_sender.clone();
849 tokio::spawn(async move {
850 let mut reader = BufReader::new(stdout);
851 let mut line = String::new();
852 while let Ok(bytes_read) = reader.read_line(&mut line).await {
853 if bytes_read == 0 {
854 break;
855 }
856 if let Err(e) = stdout_sender_clone.send(line.trim_end().to_string()).await {
857 error!("Failed to send stdout message: {}", e);
858 break;
859 }
860 line.clear();
861 }
862 });
863 }
864
865 if let Some(stderr) = child.stderr.take() {
866 let stderr_sender = stdout_sender.clone();
867 tokio::spawn(async move {
868 let mut reader = BufReader::new(stderr);
869 let mut line = String::new();
870 while let Ok(bytes_read) = reader.read_line(&mut line).await {
871 if bytes_read == 0 {
872 break;
873 }
874 if let Err(e) = stderr_sender.send(format!("STDERR: {}", line.trim_end())).await {
875 error!("Failed to send stderr message: {}", e);
876 break;
877 }
878 line.clear();
879 }
880 });
881 }
882
883 let queue_pid = pid;
884 tokio::spawn(async move {
885 loop {
886 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
887
888 let process_pool = match PROCESS_POOL.get() {
889 Some(pool) => pool,
890 None => break,
891 };
892
893 let mut pool = process_pool.lock().await;
894
895 let process = match pool.get_mut(&queue_pid) {
896 Some(p) => p,
897 None => break,
898 };
899
900 while let Some(input) = process.input_queue.pop_front() {
901 if let Some(sender) = &process.sender {
902 if let Err(_) = sender.try_send(input.clone()) {
903 process.input_queue.push_front(input);
904 break;
905 }
906 } else {
907 process.input_queue.clear();
908 break;
909 }
910 }
911
912 drop(pool);
913 }
914 });
915
916 let exit_callback = self.exit_callback.clone();
917 tokio::spawn(async move {
918 match child.wait().await {
919 Ok(exit_status) => {
920 let exit_code = exit_status.code().unwrap_or(-1); // Use -1 for unknown exit codes
921 debug!("Process {} exited with code: {}", pid, exit_code);
922
923 if let Some(exit_callback) = exit_callback {
924 exit_callback(exit_code);
925 }
926 }
927 Err(e) => {
928 error!("Process {} exited with error: {}", pid, e);
929
930 // Still trigger callback with error code
931 if let Some(exit_callback) = exit_callback {
932 exit_callback(-1); // or some other error indicator
933 }
934 }
935 }
936
937 // Cleanup
938 if let Some(process_pool) = PROCESS_POOL.get() {
939 let mut pool = process_pool.lock().await;
940 pool.remove(&pid);
941 debug!("Process {} has exited and been removed from the pool", pid);
942 }
943 });
944
945 let process_pool = PROCESS_POOL.get_or_init(|| Arc::new(Mutex::new(HashMap::new())));
946
947 let mut pool = process_pool.lock().await;
948 pool.insert(
949 pid,
950 Self {
951 pid: Some(pid),
952 filename: self.filename.clone(),
953 arguments: self.arguments.clone(),
954 working_directory: self.working_directory.clone(),
955 sender: Some(stdin_sender),
956 receiver: Some(stdout_receiver),
957 input_queue: VecDeque::new(),
958 exit_callback: self.exit_callback.clone(),
959 },
960 );
961
962 self.pid = Some(pid);
963
964 Ok(pid)
965 }
966
967 /// Asynchronously retrieves a handle to a process for a given process ID (PID).
968 ///
969 /// This function checks if a process with the specified PID exists in a shared
970 /// process pool. If the process is found, it returns an `Option` containing a
971 /// `ProcessHandle` for the process. Otherwise, it returns `None`.
972 ///
973 /// # Arguments
974 ///
975 /// * `pid` - A 32-bit unsigned integer representing the process ID of the
976 /// desired process.
977 ///
978 /// # Returns
979 ///
980 /// * `Some(ProcessHandle)` - If a process with the given PID is found in the
981 /// process pool.
982 /// * `None` - If the process does not exist in the process pool or if the
983 /// process pool is uninitialized.
984 ///
985 /// # Example
986 ///
987 /// ```rust
988 /// if let Some(handle) = get_process_by_pid(12345).await {
989 /// println!("Process found with PID: {}", handle.pid);
990 /// } else {
991 /// println!("Process not found.");
992 /// }
993 /// ```
994 ///
995 /// # Errors
996 ///
997 /// This function will return `None` if the global process pool (`PROCESS_POOL`)
998 /// has not been initialized or is unavailable.
999 ///
1000 /// # Note
1001 ///
1002 /// This is an asynchronous function and must be awaited to complete its operation.
1003 pub async fn get_process_by_pid(pid: u32) -> Option<ProcessHandle> {
1004 let process_pool = PROCESS_POOL.get()?;
1005 let pool = process_pool.lock().await;
1006 if pool.contains_key(&pid) { Some(ProcessHandle { pid }) } else { None }
1007 }
1008
1009 /// Asynchronously checks if a process identified by its `pid` is currently running.
1010 ///
1011 /// # Returns
1012 /// - `true` if the process with the stored `pid` is found in the `PROCESS_POOL`.
1013 /// - `false` if the stored `pid` is `None`, the `PROCESS_POOL` is not initialized,
1014 /// or the `pid` does not exist in the `PROCESS_POOL`.
1015 ///
1016 /// The function first checks if the `pid` instance variable is set. If so, it attempts
1017 /// to access the global `PROCESS_POOL`. If `PROCESS_POOL` is initialized, it acquires
1018 /// a lock and checks if the `pid` exists in the pool.
1019 ///
1020 /// # Examples
1021 /// ```rust
1022 /// let result = my_instance.is_process_running().await;
1023 /// if result {
1024 /// println!("The process is running.");
1025 /// } else {
1026 /// println!("The process is not running.");
1027 /// }
1028 /// ```
1029 ///
1030 /// # Async Behavior
1031 /// This function acquires an asynchronous lock on the `PROCESS_POOL` to safely access
1032 /// its contents and will yield if the lock is currently held elsewhere.
1033 ///
1034 /// # Panics
1035 /// This function may panic if the async lock on the `PROCESS_POOL` fails unexpectedly.
1036 ///
1037 /// # Dependencies
1038 /// - `PROCESS_POOL` should be a globally accessible and lazily initialized structure
1039 /// (e.g., using `once_cell` or similar patterns) that maintains a mapping of currently
1040 /// active processes.
1041 pub async fn is_process_running(&self) -> bool {
1042 if let Some(pid) = self.pid {
1043 if let Some(process_pool) = PROCESS_POOL.get() {
1044 let pool = process_pool.lock().await;
1045 return pool.contains_key(&pid);
1046 }
1047 }
1048 false
1049 }
1050}