tokio_interactive/
lib.rs

1#![doc = include_str!("../README.md")]
2use anyhow::{anyhow, Result};
3use log::*;
4use serde::{Deserialize, Serialize};
5use std::collections::{HashMap, VecDeque};
6use std::fmt::{Debug, Formatter};
7use std::path::PathBuf;
8use std::sync::{Arc, OnceLock};
9use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
10use tokio::process::Command;
11use tokio::sync::mpsc::{Receiver, Sender};
12use tokio::sync::Mutex;
13// Change this line
14
15/// A static, lazily-initialized process pool used to manage asynchronous interactive processes.
16///
17/// This global variable utilizes the `OnceLock` to ensure it is initialized only once during
18/// the program's lifetime. It holds an `Arc<Mutex<HashMap<u32, AsynchronousInteractiveProcess>>>`
19/// to allow safe concurrent access and modification of the process pool.
20///
21/// - `OnceLock`: Provides thread-safe, one-time initialization of the process pool.
22/// - `Arc`: Ensures the `Mutex<HashMap<...>>` can be shared across threads safely.
23/// - `Mutex`: Protects the `HashMap` from concurrent modification, ensuring thread safety.
24/// - `HashMap<u32, AsynchronousInteractiveProcess>`: Maps process IDs (`u32`) to
25///   corresponding `AsynchronousInteractiveProcess` instances.
26///
27/// Usage:
28///
29/// The process pool is intended to store and manage asynchronous interactive processes by their IDs.
30/// Access needs to ensure proper locking with the `Mutex` guard to guarantee thread safety.
31///
32/// Example initialization:
33/// ```rust
34/// PROCESS_POOL.get_or_init(|| Arc::new(Mutex::new(HashMap::new())));
35/// ```
36///
37/// Example access:
38/// ```rust
39/// if let Some(pool) = PROCESS_POOL.get() {
40///     let mut guard = pool.lock().unwrap();
41///     guard.insert(process_id, new_process);
42/// }
43/// ```
44/// Ensure that any operations on the pool respect the locking mechanism provided by the `Mutex`.
45///
46/// Once initialized, `PROCESS_POOL` cannot be re-initialized or re-assigned.
47static PROCESS_POOL: OnceLock<Arc<Mutex<HashMap<u32, AsynchronousInteractiveProcess>>>> = OnceLock::new();
48
49/// Represents a handle to a process identified by its process ID (PID).
50///
51/// The `ProcessHandle` struct is used to encapsulate the process identifier (PID)
52/// of a running process. It is designed to be lightweight and can be cloned
53/// or debugged as required.
54///
55/// # Fields
56/// - `pid`: A `u32` value that represents the process ID of the targeted process.
57///
58/// # Traits
59/// - `Debug`: Allows instances of `ProcessHandle` to be formatted using the `{:?}` formatter
60///   for debugging purposes.
61/// - `Clone`: Allows the `ProcessHandle` to be cloned, creating a new instance with the
62///   same `pid`.
63///
64/// # Example
65/// ```
66/// let handle = ProcessHandle { pid: 1234 };
67/// println!("{:?}", handle); // Outputs: ProcessHandle { pid: 1234 }
68/// let cloned_handle = handle.clone();
69/// println!("{:?}", cloned_handle); // Outputs: ProcessHandle { pid: 1234 }
70/// ```
71#[derive(Debug, Clone)]
72pub struct ProcessHandle {
73    pid: u32,
74}
75
76/// `AsynchronousInteractiveProcess` is a structure that represents a non-blocking,
77/// interactive process. It provides metadata and mechanisms to interact with a
78/// process asynchronously using sender and receiver channels.
79///
80/// # Fields
81///
82/// * `pid` - An optional process ID (`pid`) of the interactive process. If the process
83///   is not yet started, this field will remain `None`.
84///
85/// * `filename` - The name of the executable file that represents the interactive process.
86///   This is required to identify and initiate the process.
87///
88/// * `arguments` - A vector containing the arguments to pass to the executable file
89///   when launching the process.
90///
91/// * `working_directory` - The directory where the interactive process will be executed.
92///   This is represented as a `PathBuf`.
93///
94/// * `sender` - An optional `Sender` channel used to send messages or data to the
95///   interactive process. This field is skipped during serialization and deserialization
96///   because it holds runtime-related data.
97///
98/// * `receiver` - An optional `Receiver` channel used to receive messages or data
99///   from the interactive process. This field is also skipped during serialization
100///   and deserialization for the same reasons as `sender`.
101///
102/// * `input_queue` - A deque (double-ended queue) that maintains an in-memory buffer
103///   of strings representing input for the interactive process. This field is skipped
104///   during serialization as it only contains transient runtime-related data.
105///
106/// # Trait Implementations
107///
108/// - `Debug`: Allows instances of this struct to be formatted and logged for debugging purposes.
109/// - `Serialize`: Makes the struct serializable, excluding fields marked with `#[serde(skip)]`.
110/// - `Deserialize`: Allows deserialization to create instances of this struct from serialized data.
111/// - `Default`: Provides a default implementation where optional fields are set to `None`,
112///   collections are empty, and `filename` is an empty string.
113///
114/// # Example
115/// ```rust
116/// use std::path::PathBuf;
117/// use std::collections::VecDeque;
118/// use serde::{Serialize, Deserialize};
119/// use crossbeam_channel::{Sender, Receiver};
120///
121/// let process = AsynchronousInteractiveProcess {
122///     pid: None,
123///     filename: "my_program".to_string(),
124///     arguments: vec!["--help".to_string()],
125///     working_directory: PathBuf::from("/path/to/dir"),
126///     sender: None,
127///     receiver: None,
128///     input_queue: VecDeque::new(),
129/// };
130///
131/// println!("{:?}", process);
132/// ```
133///
134/// This structure is ideal for scenarios where processes need to be controlled
135/// asynchronously with multiple input or output channels.
136#[derive(Serialize, Deserialize, Default)]
137pub struct AsynchronousInteractiveProcess {
138    pub pid: Option<u32>,
139    pub filename: String,
140    pub arguments: Vec<String>,
141    pub working_directory: PathBuf,
142    #[serde(skip)]
143    sender: Option<Sender<String>>,
144    #[serde(skip)]
145    receiver: Option<Receiver<String>>,
146    #[serde(skip)]
147    input_queue: VecDeque<String>,
148    #[serde(skip)]
149    exit_callback: Option<Arc<dyn Fn(i32) + Send + Sync>>,
150}
151
152impl Debug for AsynchronousInteractiveProcess{
153    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154        write!(f, "AsynchronousInteractiveProcess {{ pid: {:?}, filename: {:?}, arguments: {:?}, working_directory: {:?} }}", self.pid, self.filename, self.arguments, self.working_directory)
155    }
156}
157
158impl ProcessHandle {
159    /// Receives an output message asynchronously from the process associated with the instance's `pid`
160    /// if available.
161    ///
162    /// This function attempts to fetch a message by accessing the process pool and checking for a
163    /// message in the associated process's receiver.
164    ///
165    /// The process follows these steps:
166    /// 1. Check if the global process pool (`PROCESS_POOL`) is initialized.
167    /// 2. Attempt to acquire the lock on the pool asynchronously.
168    /// 3. Look for the process associated with `self.pid`.
169    /// 4. If the process has a receiver channel, attempt to get a message using `try_recv`.
170    /// 5. If a message is found, return it as `Ok(Some(String))`.
171    /// 6. If no message is received, retry up to `MAX_RETRIES` times, with a delay of `RETRY_DELAY_MS`
172    ///    milliseconds between each retry.
173    ///
174    /// If no message is received after all retries, the function returns `Ok(None)`.
175    ///
176    /// # Returns
177    ///
178    /// * `Ok(Some(String))` - If a message is successfully received from the receiver channel.
179    /// * `Ok(None)` - If no message is received after all retries.
180    /// * `Err(Error)` - If an error occurs during the process, such as:
181    ///   - The process pool is not initialized
182    ///   - The process with the specified PID is not found
183    ///   - The receiver channel is not available
184    ///   - The receiver channel is disconnected
185    ///
186    /// # Behavior
187    ///
188    /// - This function uses `tokio::time::sleep` for introducing delays between retries and works
189    ///   asynchronously.
190    /// - The function uses constants `MAX_RETRIES` (10) and `RETRY_DELAY_MS` (10) to control the
191    ///   retry behavior.
192    /// - The function properly propagates errors that occur during the process.
193    ///
194    /// # Example
195    ///
196    /// ```rust
197    /// match instance.receive_output().await {
198    ///     Ok(Some(output)) => println!("Received output: {}", output),
199    ///     Ok(None) => println!("No output received."),
200    ///     Err(e) => eprintln!("Error receiving output: {}", e),
201    /// }
202    /// ```
203    ///
204    /// # Note
205    ///
206    /// - The function assumes the existence of a global process pool (`PROCESS_POOL`) which is safe
207    ///   to access concurrently using mutex-based locking.
208    /// - The function makes use of `tokio::sync::Mutex` to handle concurrent executions across
209    ///   asynchronous tasks.
210    /// Default timeout for receive_output in milliseconds
211    const DEFAULT_TIMEOUT_MS: u64 = 100;
212
213    /// Helper function to try receiving a message
214    async fn try_receive_message(
215        pid: u32,
216        pool: &mut tokio::sync::MutexGuard<'_, HashMap<u32, AsynchronousInteractiveProcess>>,
217    ) -> Result<Option<String>> {
218        if let Some(process) = pool.get_mut(&pid) {
219            if let Some(receiver) = &mut process.receiver {
220                match receiver.try_recv() {
221                    Ok(msg) => return Ok(Some(msg)),
222                    Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {}
223                    Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
224                        return Err(anyhow!("Channel disconnected for process {}", pid));
225                    }
226                }
227            } else {
228                return Err(anyhow!("Receiver not available for process {}", pid));
229            }
230        } else {
231            return Err(anyhow!("Process {} not found", pid));
232        }
233        Ok(None)
234    }
235
236    pub async fn receive_output(&self) -> Result<Option<String>> {
237        // Use the default timeout
238        self.receive_output_with_timeout(std::time::Duration::from_millis(Self::DEFAULT_TIMEOUT_MS)).await
239    }
240
241    /// Receives an output message asynchronously from the process associated with the instance's `pid`
242    /// if available, with a specified timeout.
243    ///
244    /// This function is similar to `receive_output()` but allows specifying a custom timeout duration
245    /// instead of using the default timeout.
246    ///
247    /// # Arguments
248    ///
249    /// * `timeout` - A `std::time::Duration` specifying how long to wait for a message before giving up.
250    ///
251    /// # Returns
252    ///
253    /// * `Ok(Some(String))` - If a message is successfully received from the receiver channel.
254    /// * `Ok(None)` - If no message is received before the timeout expires.
255    /// * `Err(Error)` - If an error occurs during the process, such as:
256    ///   - The process pool is not initialized
257    ///   - The process with the specified PID is not found
258    ///   - The receiver channel is not available
259    ///   - The receiver channel is disconnected
260    ///
261    /// # Example
262    ///
263    /// ```rust
264    /// // Wait for up to 5 seconds for a message
265    /// match instance.receive_output_with_timeout(std::time::Duration::from_secs(5)).await {
266    ///     Ok(Some(output)) => println!("Received output: {}", output),
267    ///     Ok(None) => println!("No output received within timeout."),
268    ///     Err(e) => eprintln!("Error receiving output: {}", e),
269    /// }
270    /// ```
271    pub async fn receive_output_with_timeout(&self, timeout: std::time::Duration) -> Result<Option<String>> {
272        // Define constants for retry parameters
273        const RETRY_DELAY_MS: u64 = 10;
274
275        if let Some(process_pool) = PROCESS_POOL.get() {
276            // Try to receive a message immediately
277            {
278                let mut pool = process_pool.lock().await;
279                if let Some(msg) = Self::try_receive_message(self.pid, &mut pool).await? {
280                    return Ok(Some(msg));
281                }
282            }
283
284            // If no message is available, retry with delay until timeout is reached
285            let start_time = std::time::Instant::now();
286            while start_time.elapsed() < timeout {
287                tokio::time::sleep(tokio::time::Duration::from_millis(RETRY_DELAY_MS)).await;
288
289                let mut pool = process_pool.lock().await;
290                if let Some(msg) = Self::try_receive_message(self.pid, &mut pool).await? {
291                    return Ok(Some(msg));
292                }
293            }
294
295            // No message after timeout
296            return Ok(None);
297        }
298
299        Err(anyhow!("Process pool not initialized"))
300    }
301
302    /// Sends the provided input to a process associated with this instance's PID asynchronously.
303    ///
304    /// # Arguments
305    ///
306    /// * `input` - An input of any type that can be converted into a `String`. This is the data
307    ///   that will be sent to the respective process's input queue.
308    ///
309    /// # Returns
310    ///
311    /// * `Ok(())` if the input was successfully sent or queued for sending.
312    /// * `Err` - Returns an error in the following cases:
313    ///     - If the process pool is not initialized.
314    ///     - If the process associated with this PID is not found.
315    ///     - If the process was not started or its sender channel is not available.
316    ///     - If the sender channel is closed and input cannot be sent.
317    ///
318    /// # Details
319    ///
320    /// This function retrieves the process associated with the current instance's `pid`
321    /// from a global process pool. If a valid process is found:
322    /// - The input is sent via an asynchronous channel to the process.
323    /// - If the channel is full, the input is queued for later sending.
324    /// - If the channel is closed, an error is returned.
325    ///
326    /// The function also ensures thread safety by acquiring a lock on the process pool before attempting
327    /// any operations related to the process.
328    ///
329    /// # Errors
330    ///
331    /// This function propagates several potential issues as errors:
332    /// - If the process pool is uninitialized (`PROCESS_POOL.get()` returns `None`).
333    /// - If the process associated with the PID is missing in the pool.
334    /// - If the sender channel was never initialized or is unavailable.
335    /// - If the sender channel is closed and no new messages can be sent.
336    ///
337    /// # Example
338    ///
339    /// ```rust
340    /// use anyhow::Result;
341    ///
342    /// #[tokio::main]
343    /// async fn main() -> Result<()> {
344    ///     let manager = ProcessManager::new(1); // Assumes a struct is managing process with ID 1.
345    ///     manager.send_input("Some input").await?;
346    ///     Ok(())
347    /// }
348    /// ```
349    ///
350    /// # Note
351    ///
352    /// This function expects a global process pool (`PROCESS_POOL`) to be properly initialized before being called.
353    /// Additionally, the associated process must have a valid `sender` channel to accept input.
354    pub async fn send_input(&self, input: impl Into<String>) -> Result<()> {
355        let input_str = input.into();
356        if let Some(process_pool) = PROCESS_POOL.get() {
357            let mut pool = process_pool.lock().await;
358            if let Some(process) = pool.get_mut(&self.pid) {
359                if let Some(sender) = &process.sender {
360                    match sender.try_send(input_str.clone()) {
361                        Ok(_) => Ok(()),
362                        Err(e) => match e {
363                            tokio::sync::mpsc::error::TrySendError::Full(_) => {
364                                process.input_queue.push_back(input_str);
365                                Ok(())
366                            }
367                            tokio::sync::mpsc::error::TrySendError::Closed(_) => {
368                                Err(anyhow!("Failed to send input: channel closed"))
369                            }
370                        },
371                    }
372                } else {
373                    Err(anyhow!("Process not started or sender not available"))
374                }
375            } else {
376                Err(anyhow!("Process not found"))
377            }
378        } else {
379            Err(anyhow!("Process pool not initialized"))
380        }
381    }
382
383    /// Checks if the process associated with the current instance is running.
384    ///
385    /// This method attempts to determine if a process with the `pid` of the current instance
386    /// exists in the global `PROCESS_POOL`.
387    ///
388    /// # Returns
389    /// * `true` - If the process with the associated `pid` is currently present in the global process pool.
390    /// * `false` - If the process is not found in the global process pool or if the pool is not initialized.
391    ///
392    /// # Async Behavior
393    /// This method is asynchronous because it acquires a lock on the `PROCESS_POOL`.
394    ///
395    /// # Notes
396    /// - The `PROCESS_POOL` must be initialized before calling this function.
397    ///   If `PROCESS_POOL` is not set, the function will return `false`.
398    /// - The `PROCESS_POOL` is expected to be a globally accessible, asynchronous, and thread-safe
399    ///   data structure that tracks active processes.
400    ///
401    /// # Example
402    /// ```rust
403    /// let result = instance.is_process_running().await;
404    /// if result {
405    ///     println!("Process is running.");
406    /// } else {
407    ///     println!("Process is not running.");
408    /// }
409    /// ```
410    pub async fn is_process_running(&self) -> bool {
411        if let Some(process_pool) = PROCESS_POOL.get() {
412            let pool = process_pool.lock().await;
413            return pool.contains_key(&self.pid);
414        }
415        false
416    }
417
418    /// Asynchronously terminates a process identified by its `pid`.
419    ///
420    /// This method performs the following operations tailored to the target operating system:
421    ///
422    /// - **Windows**: Opens a handle to the process using its process ID (`pid`) and forcefully terminates it
423    ///   using the `TerminateProcess` function from the WinAPI.
424    /// - **Linux**: Utilizes the `kill` system call with the `SIGKILL` signal to forcefully terminate the process.
425    ///
426    /// ## Platform-specific Notes:
427    ///
428    /// - On **Windows**, the process is identified and terminated using the `OpenProcess` and `TerminateProcess`
429    ///   functions from the WinAPI.
430    /// - On **Linux**, the `kill` system call is used with the signal `SIGKILL` (9) to ensure the process is terminated.
431    ///
432    /// # Errors
433    ///
434    /// - Returns an error if the process termination fails (on Linux) due to system call errors or invalid process IDs.
435    ///   On failure, the error contains details about the `pid` and the last OS error encountered.
436    ///
437    /// # Safety
438    ///
439    /// This method uses unsafe code blocks to interact with system APIs (`libc` on Linux, WinAPI on Windows). Ensure
440    /// that the provided `pid` corresponds to a valid process, and consider the implications of forcefully
441    /// terminating processes.
442    ///
443    /// # Example
444    ///
445    /// ```rust
446    /// let process_manager = SomeProcessManager::new(pid); // Example struct containing the pid
447    /// if let Err(e) = process_manager.kill().await {
448    ///     eprintln!("Failed to terminate process: {}", e);
449    /// }
450    /// ```
451    /// Attempts to gracefully shut down the process, falling back to forceful termination if needed.
452    ///
453    /// This method first tries to gracefully shut down the process by:
454    /// - On Linux: Sending a SIGTERM signal
455    /// - On Windows: Sending a WM_CLOSE message to the main window
456    ///
457    /// If the process doesn't exit within the specified timeout, it will forcefully terminate
458    /// the process using the `kill()` method.
459    ///
460    /// # Arguments
461    ///
462    /// * `timeout` - A `std::time::Duration` specifying how long to wait for the process to exit
463    ///   gracefully before forcefully terminating it.
464    ///
465    /// # Returns
466    ///
467    /// * `Ok(())` - If the process was successfully shut down (either gracefully or forcefully).
468    /// * `Err(Error)` - If an error occurred during the shutdown process.
469    ///
470    /// # Example
471    ///
472    /// ```rust
473    /// // Try to shut down gracefully, waiting up to 5 seconds before force killing
474    /// if let Err(e) = process.shutdown(std::time::Duration::from_secs(5)).await {
475    ///     eprintln!("Failed to shut down process: {}", e);
476    /// }
477    /// ```
478    pub async fn shutdown(&self, timeout: std::time::Duration) -> Result<()> {
479        // First, try to gracefully shut down the process
480        let graceful_shutdown_result = self.graceful_shutdown().await;
481
482        // If graceful shutdown failed or isn't implemented for this platform, log it but continue
483        if let Err(e) = &graceful_shutdown_result {
484            debug!("Graceful shutdown attempt failed: {}", e);
485            return self.kill().await;
486        }
487
488        // Wait for the process to exit for the specified timeout
489        let start_time = std::time::Instant::now();
490        while start_time.elapsed() < timeout {
491            if !self.is_process_running().await {
492                return Ok(());
493            }
494            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
495        }
496
497        // If we're here, the process didn't exit gracefully within the timeout
498        // Fall back to forceful termination
499        debug!("Process did not exit gracefully within timeout, forcing termination");
500        self.kill().await
501    }
502
503    /// Attempts to gracefully shut down the process without forceful termination.
504    ///
505    /// This method is platform-specific:
506    /// - On Linux: Sends a SIGTERM signal to request graceful termination
507    /// - On Windows: First tries to send a Ctrl+C event to the process using GenerateConsoleCtrlEvent.
508    ///   If that fails, it attempts to send an "exit" command to the process via stdin.
509    ///
510    /// # Returns
511    ///
512    /// * `Ok(())` - If the graceful shutdown signal was successfully sent.
513    /// * `Err(Error)` - If an error occurred while sending the graceful shutdown signal.
514    ///
515    /// # Notes
516    ///
517    /// - A successful return doesn't guarantee the process will actually exit.
518    ///   Use `shutdown()` with a timeout to ensure the process exits.
519    /// - On Windows, the GenerateConsoleCtrlEvent function works with process groups, not individual
520    ///   processes, so it may not work for all processes. The fallback "exit" command approach
521    ///   works for many command-line applications that accept such commands.
522    /// - For GUI applications on Windows, neither approach may work. In such cases, the `shutdown()`
523    ///   method will fall back to forceful termination after the timeout.
524    async fn graceful_shutdown(&self) -> Result<()> {
525        #[cfg(target_os = "windows")]
526        {
527            unsafe {
528                // First, try to send Ctrl+C event to the process
529                // CTRL_C_EVENT = 0, CTRL_BREAK_EVENT = 1
530                // Note: GenerateConsoleCtrlEvent works with process groups, not individual processes
531                // For console applications, we can try sending Ctrl+C to the process
532                let result = winapi::um::wincon::GenerateConsoleCtrlEvent(0, self.pid);
533                if result == 0 {
534                    return Err(anyhow!(
535                        "Failed to send Ctrl+C to process {}: {}",
536                        self.pid,
537                        std::io::Error::last_os_error()
538                    ));
539                }
540            }
541            return Ok(());
542        }
543
544        #[cfg(target_os = "linux")]
545        {
546            unsafe {
547                // Use the kill system call with SIGTERM (15) to request graceful termination
548                let result = libc::kill(self.pid as libc::pid_t, libc::SIGTERM);
549                if result != 0 {
550                    return Err(anyhow!(
551                        "Failed to send SIGTERM to process {}: {}",
552                        self.pid,
553                        std::io::Error::last_os_error()
554                    ));
555                }
556            }
557            return Ok(());
558        }
559
560        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
561        {
562            return Err(anyhow!("Graceful shutdown not implemented for this platform"));
563        }
564    }
565
566    /// Forcefully terminates the process immediately.
567    ///
568    /// This method should be used as a last resort when a process needs to be terminated
569    /// immediately. For a more graceful approach, consider using `shutdown()` first.
570    ///
571    /// # Returns
572    ///
573    /// * `Ok(())` - If the process was successfully terminated.
574    /// * `Err(Error)` - If an error occurred during the termination process.
575    ///
576    /// # Example
577    ///
578    /// ```rust
579    /// if let Err(e) = process.kill().await {
580    ///     eprintln!("Failed to terminate process: {}", e);
581    /// }
582    /// ```
583    pub async fn kill(&self) -> Result<()> {
584        #[cfg(target_os = "windows")]
585        {
586            unsafe {
587                // PROCESS_TERMINATE (0x00010000) access right is required to terminate a process
588                let handle = winapi::um::processthreadsapi::OpenProcess(0x00010000, 0, self.pid);
589                if handle.is_null() {
590                    return Err(anyhow!("Failed to open process {}: {}", self.pid, std::io::Error::last_os_error()));
591                }
592
593                // Attempt to terminate the process
594                let result = winapi::um::processthreadsapi::TerminateProcess(handle, 0);
595
596                // Always close the handle to prevent resource leaks
597                let close_result = winapi::um::handleapi::CloseHandle(handle);
598
599                // Check if termination was successful
600                if result == 0 {
601                    return Err(anyhow!(
602                        "Failed to terminate process {}: {}",
603                        self.pid,
604                        std::io::Error::last_os_error()
605                    ));
606                }
607
608                // Check if handle was closed successfully
609                if close_result == 0 {
610                    warn!(
611                        "Failed to close process handle for process {}: {}",
612                        self.pid,
613                        std::io::Error::last_os_error()
614                    );
615                    // We don't return an error here as the process was terminated successfully
616                }
617            }
618        }
619        #[cfg(target_os = "linux")]
620        {
621            unsafe {
622                // Use the kill system call with SIGKILL (9) to forcefully terminate the process
623                let result = libc::kill(self.pid as libc::pid_t, libc::SIGKILL);
624                if result != 0 {
625                    return Err(anyhow!("Failed to kill process {}: {}", self.pid, std::io::Error::last_os_error()));
626                }
627            }
628        }
629
630        Ok(())
631    }
632}
633
634impl AsynchronousInteractiveProcess {
635    /// Creates a new instance of the struct with default values.
636    ///
637    /// # Arguments
638    ///
639    /// * `filename` - A value that can be converted into a `String`. This typically represents the
640    ///   name or path of the file associated with the instance.
641    ///
642    /// # Returns
643    ///
644    /// Returns a new instance of the struct populated with default fields:
645    /// - `pid`: `None` (indicating no process ID is associated yet).
646    /// - `filename`: The provided `filename` converted into a `String`.
647    /// - `arguments`: An empty vector, representing no initial arguments.
648    /// - `working_directory`: Set to the current directory (`"./"`).
649    /// - `sender`: `None`, indicating no sender is associated initially.
650    /// - `receiver`: `None`, indicating no receiver is associated initially.
651    /// - `input_queue`: An empty `VecDeque`, representing no items in the input queue.
652    ///
653    /// # Example
654    ///
655    /// ```
656    /// let instance = MyStruct::new("example.txt");
657    /// assert_eq!(instance.filename, "example.txt");
658    /// assert!(instance.pid.is_none());
659    /// assert!(instance.arguments.is_empty());
660    /// assert_eq!(instance.working_directory, PathBuf::from("./"));
661    /// assert!(instance.sender.is_none());
662    /// assert!(instance.receiver.is_none());
663    /// assert!(instance.input_queue.is_empty());
664    /// ```
665    pub fn new(filename: impl Into<String>) -> Self {
666        Self {
667            pid: None,
668            filename: filename.into(),
669            arguments: Vec::new(),
670            working_directory: PathBuf::from("./"),
671            sender: None,
672            receiver: None,
673            input_queue: VecDeque::new(),
674            exit_callback: None,
675        }
676    }
677
678    /// Sets the arguments for the current instance by converting a vector of items
679    /// implementing `Into<String>` into a `Vec<String>`.
680    ///
681    /// # Parameters
682    /// - `args`: A `Vec` containing items that implement the `Into<String>` trait. These
683    ///   items will be converted into `String` and used to set the `arguments` field
684    ///   of the instance.
685    ///
686    /// # Returns
687    /// - `Self`: The current instance (`self`) after updating its `arguments` field.
688    ///
689    /// # Example
690    /// ```rust
691    /// let instance = MyStruct::new()
692    ///     .with_arguments(vec!["arg1", "arg2", String::from("arg3")]);
693    /// ```
694    ///
695    /// This method allows chaining, as it returns the updated instance
696    /// after setting the `arguments` field.
697    pub fn with_arguments(mut self, args: Vec<impl Into<String>>) -> Self {
698        self.arguments = args.into_iter().map(|arg| arg.into()).collect();
699        self
700    }
701
702    /// Adds an argument to the `arguments` vector and returns the modified instance.
703    ///
704    /// # Parameters
705    ///
706    /// * `arg` - A value that implements the `Into<String>` trait, which will be converted into a `String`
707    ///   and added to the `arguments` vector.
708    ///
709    /// # Returns
710    ///
711    /// Returns the modified instance of the implementing struct (`Self`) with the new argument added.
712    ///
713    /// # Example
714    ///
715    /// ```rust
716    /// let instance = SomeStruct::new().with_argument("example");
717    /// ```
718    ///
719    /// This adds the string `"example"` to the `arguments` vector of `SomeStruct`.
720    pub fn with_argument(mut self, arg: impl Into<String>) -> Self {
721        self.arguments.push(arg.into());
722        self
723    }
724
725    /// Sets the working directory for the instance and returns the modified instance.
726    ///
727    /// # Arguments
728    ///
729    /// * `dir` - A value that can be converted into a `PathBuf`, representing the desired working directory.
730    ///
731    /// # Returns
732    ///
733    /// The instance of the struct with the updated working directory.
734    ///
735    /// # Example
736    ///
737    /// ```rust
738    /// let instance = MyStruct::new()
739    ///     .with_working_directory("/path/to/directory");
740    /// ```
741    pub fn with_working_directory(mut self, dir: impl Into<PathBuf>) -> Self {
742        self.working_directory = dir.into();
743        self
744    }
745
746    pub fn process_exit_callback<F>(mut self, callback: F) -> Self
747    where
748        F: Fn(i32) + Send + Sync + 'static,
749    {
750        self.exit_callback = Some(Arc::new(callback));
751        self
752    }
753
754    /// Starts a new process based on the configuration stored in the struct, manages
755    /// its I/O streams asynchronously, and tracks its lifecycle in a shared process pool.
756    ///
757    /// # Returns
758    /// - `Ok<u32>`: Returns the process ID (PID) if the process starts successfully.
759    /// - `Err(std::io::Error)`: Returns an error if any part of the process start operation fails.
760    ///
761    /// # Process Configuration
762    /// - The process is launched using the executable specified in `self.filename`.
763    /// - Command-line arguments are passed via `self.arguments`.
764    /// - The process will run in the directory specified in `self.working_directory`.
765    ///
766    /// # Standard I/O Management
767    /// - The process's `stdin` is assigned a `tokio::sync::mpsc::channel` for communication.
768    /// - `stdout` and `stderr` are read asynchronously. Each line from these streams is sent
769    ///   to an `mpsc::channel`, where `stdout`/`stderr` data can be consumed.
770    ///
771    /// # Shared Process Pool
772    /// - The process metadata, including its PID and I/O channels, is stored in a global,
773    ///   thread-safe `PROCESS_POOL`.
774    /// - The process pool is managed using a `tokio::sync::Mutex` and stores processes in a
775    ///   `HashMap` with their PID as the key.
776    ///
777    /// # Asynchronous I/O
778    /// - A background task is spawned to write data from an internal input queue to the process's
779    ///   `stdin`.
780    /// - Another background task continuously reads and forwards `stdout` and `stderr` messages
781    ///   from the process.
782    /// - If the process exits or encounters an error, its PID and metadata are removed from the
783    ///   process pool.
784    ///
785    /// # Example Use Case
786    /// ```no_run
787    /// let mut my_process = MyProcess {
788    ///     filename: "my_program".to_string(),
789    ///     arguments: vec!["--arg1", "value1".to_string()],
790    ///     working_directory: "/path/to/dir".to_string(),
791    ///     pid: None,
792    ///     sender: None,
793    ///     receiver: None,
794    ///     input_queue: VecDeque::new(),
795    /// };
796    ///
797    /// let pid = my_process.start().await.unwrap();
798    /// println!("Started process with PID: {}", pid);
799    /// ```
800    ///
801    /// # Notes
802    /// - The process's `stdin`, `stdout`, and `stderr` are piped to capture and manage
803    ///   communication asynchronously.
804    /// - Each spawned background task is independently responsible for managing a specific
805    ///   stream or part of the process lifecycle.
806    /// - Errors during I/O operations or spawning the process are logged using the `log` crate
807    ///   macros (e.g., `error!`, `debug!`).
808    ///
809    /// # Potential Errors
810    /// - Failure to spawn the process (e.g., if `self.filename` is invalid or inaccessible).
811    /// - Errors during communication with the process's I/O streams (e.g., writing to a closed `stdin`).
812    /// - Mutex locking failure on the shared process pool due to an internal inconsistency.
813    pub async fn start(&mut self) -> Result<u32> {
814        let mut command = Command::new(&self.filename);
815        command.args(&self.arguments);
816        command.current_dir(&self.working_directory);
817
818        command.stdin(std::process::Stdio::piped());
819        command.stdout(std::process::Stdio::piped());
820        command.stderr(std::process::Stdio::piped());
821
822        let mut child = command.spawn()?;
823        let pid = child.id().unwrap_or(0);
824
825        let (stdin_sender, mut stdin_receiver) = tokio::sync::mpsc::channel::<String>(100);
826        let (stdout_sender, stdout_receiver) = tokio::sync::mpsc::channel::<String>(100);
827
828        if let Some(mut stdin) = child.stdin.take() {
829            tokio::spawn(async move {
830                while let Some(input) = stdin_receiver.recv().await {
831                    if let Err(e) = stdin.write_all(input.as_bytes()).await {
832                        error!("Failed to write to process stdin: {}", e);
833                        break;
834                    }
835                    if let Err(e) = stdin.write_all(b"\n").await {
836                        error!("Failed to write newline to process stdin: {}", e);
837                        break;
838                    }
839                    if let Err(e) = stdin.flush().await {
840                        error!("Failed to flush process stdin: {}", e);
841                        break;
842                    }
843                }
844            });
845        }
846
847        if let Some(stdout) = child.stdout.take() {
848            let stdout_sender_clone = stdout_sender.clone();
849            tokio::spawn(async move {
850                let mut reader = BufReader::new(stdout);
851                let mut line = String::new();
852                while let Ok(bytes_read) = reader.read_line(&mut line).await {
853                    if bytes_read == 0 {
854                        break;
855                    }
856                    if let Err(e) = stdout_sender_clone.send(line.trim_end().to_string()).await {
857                        error!("Failed to send stdout message: {}", e);
858                        break;
859                    }
860                    line.clear();
861                }
862            });
863        }
864
865        if let Some(stderr) = child.stderr.take() {
866            let stderr_sender = stdout_sender.clone();
867            tokio::spawn(async move {
868                let mut reader = BufReader::new(stderr);
869                let mut line = String::new();
870                while let Ok(bytes_read) = reader.read_line(&mut line).await {
871                    if bytes_read == 0 {
872                        break;
873                    }
874                    if let Err(e) = stderr_sender.send(format!("STDERR: {}", line.trim_end())).await {
875                        error!("Failed to send stderr message: {}", e);
876                        break;
877                    }
878                    line.clear();
879                }
880            });
881        }
882
883        let queue_pid = pid;
884        tokio::spawn(async move {
885            loop {
886                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
887
888                let process_pool = match PROCESS_POOL.get() {
889                    Some(pool) => pool,
890                    None => break,
891                };
892
893                let mut pool = process_pool.lock().await;
894
895                let process = match pool.get_mut(&queue_pid) {
896                    Some(p) => p,
897                    None => break,
898                };
899
900                while let Some(input) = process.input_queue.pop_front() {
901                    if let Some(sender) = &process.sender {
902                        if let Err(_) = sender.try_send(input.clone()) {
903                            process.input_queue.push_front(input);
904                            break;
905                        }
906                    } else {
907                        process.input_queue.clear();
908                        break;
909                    }
910                }
911
912                drop(pool);
913            }
914        });
915
916        let exit_callback = self.exit_callback.clone();
917        tokio::spawn(async move {
918            match child.wait().await {
919                Ok(exit_status) => {
920                    let exit_code = exit_status.code().unwrap_or(-1); // Use -1 for unknown exit codes
921                    debug!("Process {} exited with code: {}", pid, exit_code);
922
923                    if let Some(exit_callback) = exit_callback {
924                        exit_callback(exit_code);
925                    }
926                }
927                Err(e) => {
928                    error!("Process {} exited with error: {}", pid, e);
929
930                    // Still trigger callback with error code
931                    if let Some(exit_callback) = exit_callback {
932                        exit_callback(-1); // or some other error indicator
933                    }
934                }
935            }
936
937            // Cleanup
938            if let Some(process_pool) = PROCESS_POOL.get() {
939                let mut pool = process_pool.lock().await;
940                pool.remove(&pid);
941                debug!("Process {} has exited and been removed from the pool", pid);
942            }
943        });
944
945        let process_pool = PROCESS_POOL.get_or_init(|| Arc::new(Mutex::new(HashMap::new())));
946
947        let mut pool = process_pool.lock().await;
948        pool.insert(
949            pid,
950            Self {
951                pid: Some(pid),
952                filename: self.filename.clone(),
953                arguments: self.arguments.clone(),
954                working_directory: self.working_directory.clone(),
955                sender: Some(stdin_sender),
956                receiver: Some(stdout_receiver),
957                input_queue: VecDeque::new(),
958                exit_callback: self.exit_callback.clone(),
959            },
960        );
961
962        self.pid = Some(pid);
963
964        Ok(pid)
965    }
966
967    /// Asynchronously retrieves a handle to a process for a given process ID (PID).
968    ///
969    /// This function checks if a process with the specified PID exists in a shared
970    /// process pool. If the process is found, it returns an `Option` containing a
971    /// `ProcessHandle` for the process. Otherwise, it returns `None`.
972    ///
973    /// # Arguments
974    ///
975    /// * `pid` - A 32-bit unsigned integer representing the process ID of the
976    ///           desired process.
977    ///
978    /// # Returns
979    ///
980    /// * `Some(ProcessHandle)` - If a process with the given PID is found in the
981    ///                           process pool.
982    /// * `None` - If the process does not exist in the process pool or if the
983    ///            process pool is uninitialized.
984    ///
985    /// # Example
986    ///
987    /// ```rust
988    /// if let Some(handle) = get_process_by_pid(12345).await {
989    ///     println!("Process found with PID: {}", handle.pid);
990    /// } else {
991    ///     println!("Process not found.");
992    /// }
993    /// ```
994    ///
995    /// # Errors
996    ///
997    /// This function will return `None` if the global process pool (`PROCESS_POOL`)
998    /// has not been initialized or is unavailable.
999    ///
1000    /// # Note
1001    ///
1002    /// This is an asynchronous function and must be awaited to complete its operation.
1003    pub async fn get_process_by_pid(pid: u32) -> Option<ProcessHandle> {
1004        let process_pool = PROCESS_POOL.get()?;
1005        let pool = process_pool.lock().await;
1006        if pool.contains_key(&pid) { Some(ProcessHandle { pid }) } else { None }
1007    }
1008
1009    /// Asynchronously checks if a process identified by its `pid` is currently running.
1010    ///
1011    /// # Returns
1012    /// - `true` if the process with the stored `pid` is found in the `PROCESS_POOL`.
1013    /// - `false` if the stored `pid` is `None`, the `PROCESS_POOL` is not initialized,
1014    ///   or the `pid` does not exist in the `PROCESS_POOL`.
1015    ///
1016    /// The function first checks if the `pid` instance variable is set. If so, it attempts
1017    /// to access the global `PROCESS_POOL`. If `PROCESS_POOL` is initialized, it acquires
1018    /// a lock and checks if the `pid` exists in the pool.
1019    ///
1020    /// # Examples
1021    /// ```rust
1022    /// let result = my_instance.is_process_running().await;
1023    /// if result {
1024    ///     println!("The process is running.");
1025    /// } else {
1026    ///     println!("The process is not running.");
1027    /// }
1028    /// ```
1029    ///
1030    /// # Async Behavior
1031    /// This function acquires an asynchronous lock on the `PROCESS_POOL` to safely access
1032    /// its contents and will yield if the lock is currently held elsewhere.
1033    ///
1034    /// # Panics
1035    /// This function may panic if the async lock on the `PROCESS_POOL` fails unexpectedly.
1036    ///
1037    /// # Dependencies
1038    /// - `PROCESS_POOL` should be a globally accessible and lazily initialized structure
1039    ///   (e.g., using `once_cell` or similar patterns) that maintains a mapping of currently
1040    ///   active processes.
1041    pub async fn is_process_running(&self) -> bool {
1042        if let Some(pid) = self.pid {
1043            if let Some(process_pool) = PROCESS_POOL.get() {
1044                let pool = process_pool.lock().await;
1045                return pool.contains_key(&pid);
1046            }
1047        }
1048        false
1049    }
1050}