tokio_interactive/
lib.rs

1#![doc = include_str!("../README.md")]
2use anyhow::{Result, anyhow};
3use log::*;
4use serde::{Deserialize, Serialize};
5use std::collections::{HashMap, VecDeque};
6use std::fmt::{Debug, Formatter};
7use std::path::PathBuf;
8use std::sync::{Arc, OnceLock};
9use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
10use tokio::process::Command;
11use tokio::sync::Mutex;
12use tokio::sync::mpsc::Sender;
13use tokio::sync::broadcast;
14// Change this line
15
16/// A static, lazily-initialized process pool used to manage asynchronous interactive processes.
17///
18/// This global variable utilizes the `OnceLock` to ensure it is initialized only once during
19/// the program's lifetime. It holds an `Arc<Mutex<HashMap<u32, AsynchronousInteractiveProcess>>>`
20/// to allow safe concurrent access and modification of the process pool.
21///
22/// - `OnceLock`: Provides thread-safe, one-time initialization of the process pool.
23/// - `Arc`: Ensures the `Mutex<HashMap<...>>` can be shared across threads safely.
24/// - `Mutex`: Protects the `HashMap` from concurrent modification, ensuring thread safety.
25/// - `HashMap<u32, AsynchronousInteractiveProcess>`: Maps process IDs (`u32`) to
26///   corresponding `AsynchronousInteractiveProcess` instances.
27///
28/// Usage:
29///
30/// The process pool is intended to store and manage asynchronous interactive processes by their IDs.
31/// Access needs to ensure proper locking with the `Mutex` guard to guarantee thread safety.
32///
33/// Example initialization:
34/// ```rust
35/// PROCESS_POOL.get_or_init(|| Arc::new(Mutex::new(HashMap::new())));
36/// ```
37///
38/// Example access:
39/// ```rust
40/// if let Some(pool) = PROCESS_POOL.get() {
41///     let mut guard = pool.lock().unwrap();
42///     guard.insert(process_id, new_process);
43/// }
44/// ```
45/// Ensure that any operations on the pool respect the locking mechanism provided by the `Mutex`.
46///
47/// Once initialized, `PROCESS_POOL` cannot be re-initialized or re-assigned.
48static PROCESS_POOL: OnceLock<Arc<Mutex<HashMap<u32, AsynchronousInteractiveProcess>>>> = OnceLock::new();
49
50/// Represents a handle to a process identified by its process ID (PID).
51///
52/// The `ProcessHandle` struct is used to encapsulate the process identifier (PID)
53/// of a running process. It is designed to be lightweight and can be cloned
54/// or debugged as required.
55///
56/// # Fields
57/// - `pid`: A `u32` value that represents the process ID of the targeted process.
58///
59/// # Traits
60/// - `Debug`: Allows instances of `ProcessHandle` to be formatted using the `{:?}` formatter
61///   for debugging purposes.
62/// - `Clone`: Allows the `ProcessHandle` to be cloned, creating a new instance with the
63///   same `pid`.
64///
65/// # Example
66/// ```
67/// let handle = ProcessHandle { pid: 1234 };
68/// println!("{:?}", handle); // Outputs: ProcessHandle { pid: 1234 }
69/// let cloned_handle = handle.clone();
70/// println!("{:?}", cloned_handle); // Outputs: ProcessHandle { pid: 1234 }
71/// ```
72#[derive(Debug)]
73pub struct ProcessHandle {
74    pid: u32,
75    receiver: broadcast::Receiver<String>,
76}
77
78/// `AsynchronousInteractiveProcess` is a structure that represents a non-blocking,
79/// interactive process. It provides metadata and mechanisms to interact with a
80/// process asynchronously using sender and receiver channels.
81///
82/// # Fields
83///
84/// * `pid` - An optional process ID (`pid`) of the interactive process. If the process
85///   is not yet started, this field will remain `None`.
86///
87/// * `filename` - The name of the executable file that represents the interactive process.
88///   This is required to identify and initiate the process.
89///
90/// * `arguments` - A vector containing the arguments to pass to the executable file
91///   when launching the process.
92///
93/// * `working_directory` - The directory where the interactive process will be executed.
94///   This is represented as a `PathBuf`.
95///
96/// * `sender` - An optional `Sender` channel used to send messages or data to the
97///   interactive process. This field is skipped during serialization and deserialization
98///   because it holds runtime-related data.
99///
100/// * `receiver` - An optional `Receiver` channel used to receive messages or data
101///   from the interactive process. This field is also skipped during serialization
102///   and deserialization for the same reasons as `sender`.
103///
104/// * `input_queue` - A deque (double-ended queue) that maintains an in-memory buffer
105///   of strings representing input for the interactive process. This field is skipped
106///   during serialization as it only contains transient runtime-related data.
107///
108/// # Trait Implementations
109///
110/// - `Debug`: Allows instances of this struct to be formatted and logged for debugging purposes.
111/// - `Serialize`: Makes the struct serializable, excluding fields marked with `#[serde(skip)]`.
112/// - `Deserialize`: Allows deserialization to create instances of this struct from serialized data.
113/// - `Default`: Provides a default implementation where optional fields are set to `None`,
114///   collections are empty, and `filename` is an empty string.
115///
116/// # Example
117/// ```rust
118/// use std::path::PathBuf;
119/// use std::collections::VecDeque;
120/// use serde::{Serialize, Deserialize};
121/// use crossbeam_channel::{Sender, Receiver};
122///
123/// let process = AsynchronousInteractiveProcess {
124///     pid: None,
125///     filename: "my_program".to_string(),
126///     arguments: vec!["--help".to_string()],
127///     working_directory: PathBuf::from("/path/to/dir"),
128///     sender: None,
129///     receiver: None,
130///     input_queue: VecDeque::new(),
131/// };
132///
133/// println!("{:?}", process);
134/// ```
135///
136/// This structure is ideal for scenarios where processes need to be controlled
137/// asynchronously with multiple input or output channels.
138#[derive(Serialize, Deserialize, Default)]
139pub struct AsynchronousInteractiveProcess {
140    pub pid: Option<u32>,
141    pub filename: String,
142    pub arguments: Vec<String>,
143    pub working_directory: PathBuf,
144    #[serde(skip)]
145    sender: Option<Sender<String>>,
146    #[serde(skip)]
147    output_broadcaster: Option<broadcast::Sender<String>>,
148    #[serde(skip)]
149    _keep_alive_receiver: Option<broadcast::Receiver<String>>,
150    #[serde(skip)]
151    input_queue: VecDeque<String>,
152    #[serde(skip)]
153    exit_callback: Option<Arc<dyn Fn(i32) + Send + Sync>>,
154}
155
156impl Debug for AsynchronousInteractiveProcess {
157    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
158        write!(
159            f,
160            "AsynchronousInteractiveProcess {{ pid: {:?}, filename: {:?}, arguments: {:?}, working_directory: {:?} }}",
161            self.pid, self.filename, self.arguments, self.working_directory
162        )
163    }
164}
165
166impl ProcessHandle {
167    /// Receives an output message asynchronously from the process associated with the instance's `pid`
168    /// if available.
169    ///
170    /// This function attempts to fetch a message by accessing the process pool and checking for a
171    /// message in the associated process's receiver.
172    ///
173    /// The process follows these steps:
174    /// 1. Check if the global process pool (`PROCESS_POOL`) is initialized.
175    /// 2. Attempt to acquire the lock on the pool asynchronously.
176    /// 3. Look for the process associated with `self.pid`.
177    /// 4. If the process has a receiver channel, attempt to get a message using `try_recv`.
178    /// 5. If a message is found, return it as `Ok(Some(String))`.
179    /// 6. If no message is received, retry up to `MAX_RETRIES` times, with a delay of `RETRY_DELAY_MS`
180    ///    milliseconds between each retry.
181    ///
182    /// If no message is received after all retries, the function returns `Ok(None)`.
183    ///
184    /// # Returns
185    ///
186    /// * `Ok(Some(String))` - If a message is successfully received from the receiver channel.
187    /// * `Ok(None)` - If no message is received after all retries.
188    /// * `Err(Error)` - If an error occurs during the process, such as:
189    ///   - The process pool is not initialized
190    ///   - The process with the specified PID is not found
191    ///   - The receiver channel is not available
192    ///   - The receiver channel is disconnected
193    ///
194    /// # Behavior
195    ///
196    /// - This function uses `tokio::time::sleep` for introducing delays between retries and works
197    ///   asynchronously.
198    /// - The function uses constants `MAX_RETRIES` (10) and `RETRY_DELAY_MS` (10) to control the
199    ///   retry behavior.
200    /// - The function properly propagates errors that occur during the process.
201    ///
202    /// # Example
203    ///
204    /// ```rust
205    /// match instance.receive_output().await {
206    ///     Ok(Some(output)) => println!("Received output: {}", output),
207    ///     Ok(None) => println!("No output received."),
208    ///     Err(e) => eprintln!("Error receiving output: {}", e),
209    /// }
210    /// ```
211    ///
212    /// # Note
213    ///
214    /// - The function assumes the existence of a global process pool (`PROCESS_POOL`) which is safe
215    ///   to access concurrently using mutex-based locking.
216    /// - The function makes use of `tokio::sync::Mutex` to handle concurrent executions across
217    ///   asynchronous tasks.
218    /// Default timeout for receive_output in milliseconds
219    const DEFAULT_TIMEOUT_MS: u64 = 100;
220
221
222    pub async fn receive_output(&mut self) -> Result<Option<String>> {
223        // Use the default timeout
224        self.receive_output_with_timeout(std::time::Duration::from_millis(Self::DEFAULT_TIMEOUT_MS)).await
225    }
226
227    /// Receives an output message asynchronously from the process associated with the instance's `pid`
228    /// if available, with a specified timeout.
229    ///
230    /// This function is similar to `receive_output()` but allows specifying a custom timeout duration
231    /// instead of using the default timeout.
232    ///
233    /// # Arguments
234    ///
235    /// * `timeout` - A `std::time::Duration` specifying how long to wait for a message before giving up.
236    ///
237    /// # Returns
238    ///
239    /// * `Ok(Some(String))` - If a message is successfully received from the receiver channel.
240    /// * `Ok(None)` - If no message is received before the timeout expires.
241    /// * `Err(Error)` - If an error occurs during the process, such as:
242    ///   - The process pool is not initialized
243    ///   - The process with the specified PID is not found
244    ///   - The receiver channel is not available
245    ///   - The receiver channel is disconnected
246    ///
247    /// # Example
248    ///
249    /// ```rust
250    /// // Wait for up to 5 seconds for a message
251    /// match instance.receive_output_with_timeout(std::time::Duration::from_secs(5)).await {
252    ///     Ok(Some(output)) => println!("Received output: {}", output),
253    ///     Ok(None) => println!("No output received within timeout."),
254    ///     Err(e) => eprintln!("Error receiving output: {}", e),
255    /// }
256    /// ```
257    pub async fn receive_output_with_timeout(&mut self, timeout: std::time::Duration) -> Result<Option<String>> {
258        // Define constants for retry parameters
259        const RETRY_DELAY_MS: u64 = 10;
260
261        // Try to receive a message immediately
262        match self.receiver.try_recv() {
263            Ok(msg) => return Ok(Some(msg)),
264            Err(broadcast::error::TryRecvError::Empty) => {}
265            Err(broadcast::error::TryRecvError::Closed) => {
266                return Err(anyhow!("Broadcast channel closed for process {}", self.pid));
267            }
268            Err(broadcast::error::TryRecvError::Lagged(_)) => {
269                // Messages were dropped due to lag, but we can continue
270            }
271        }
272
273        // If no message is available, retry with delay until timeout is reached
274        let start_time = std::time::Instant::now();
275        while start_time.elapsed() < timeout {
276            tokio::time::sleep(tokio::time::Duration::from_millis(RETRY_DELAY_MS)).await;
277
278            match self.receiver.try_recv() {
279                Ok(msg) => return Ok(Some(msg)),
280                Err(broadcast::error::TryRecvError::Empty) => {}
281                Err(broadcast::error::TryRecvError::Closed) => {
282                    return Err(anyhow!("Broadcast channel closed for process {}", self.pid));
283                }
284                Err(broadcast::error::TryRecvError::Lagged(_)) => {
285                    // Messages were dropped due to lag, but we can continue
286                }
287            }
288        }
289
290        // No message after timeout
291        Ok(None)
292    }
293
294    /// Sends the provided input to a process associated with this instance's PID asynchronously.
295    ///
296    /// # Arguments
297    ///
298    /// * `input` - An input of any type that can be converted into a `String`. This is the data
299    ///   that will be sent to the respective process's input queue.
300    ///
301    /// # Returns
302    ///
303    /// * `Ok(())` if the input was successfully sent or queued for sending.
304    /// * `Err` - Returns an error in the following cases:
305    ///     - If the process pool is not initialized.
306    ///     - If the process associated with this PID is not found.
307    ///     - If the process was not started or its sender channel is not available.
308    ///     - If the sender channel is closed and input cannot be sent.
309    ///
310    /// # Details
311    ///
312    /// This function retrieves the process associated with the current instance's `pid`
313    /// from a global process pool. If a valid process is found:
314    /// - The input is sent via an asynchronous channel to the process.
315    /// - If the channel is full, the input is queued for later sending.
316    /// - If the channel is closed, an error is returned.
317    ///
318    /// The function also ensures thread safety by acquiring a lock on the process pool before attempting
319    /// any operations related to the process.
320    ///
321    /// # Errors
322    ///
323    /// This function propagates several potential issues as errors:
324    /// - If the process pool is uninitialized (`PROCESS_POOL.get()` returns `None`).
325    /// - If the process associated with the PID is missing in the pool.
326    /// - If the sender channel was never initialized or is unavailable.
327    /// - If the sender channel is closed and no new messages can be sent.
328    ///
329    /// # Example
330    ///
331    /// ```rust
332    /// use anyhow::Result;
333    ///
334    /// #[tokio::main]
335    /// async fn main() -> Result<()> {
336    ///     let manager = ProcessManager::new(1); // Assumes a struct is managing process with ID 1.
337    ///     manager.send_input("Some input").await?;
338    ///     Ok(())
339    /// }
340    /// ```
341    ///
342    /// # Note
343    ///
344    /// This function expects a global process pool (`PROCESS_POOL`) to be properly initialized before being called.
345    /// Additionally, the associated process must have a valid `sender` channel to accept input.
346    pub async fn send_input(&self, input: impl Into<String>) -> Result<()> {
347        let input_str = input.into();
348        if let Some(process_pool) = PROCESS_POOL.get() {
349            let mut pool = process_pool.lock().await;
350            if let Some(process) = pool.get_mut(&self.pid) {
351                if let Some(sender) = &process.sender {
352                    match sender.try_send(input_str.clone()) {
353                        Ok(_) => Ok(()),
354                        Err(e) => match e {
355                            tokio::sync::mpsc::error::TrySendError::Full(_) => {
356                                process.input_queue.push_back(input_str);
357                                Ok(())
358                            }
359                            tokio::sync::mpsc::error::TrySendError::Closed(_) => {
360                                Err(anyhow!("Failed to send input: channel closed"))
361                            }
362                        },
363                    }
364                } else {
365                    Err(anyhow!("Process not started or sender not available"))
366                }
367            } else {
368                Err(anyhow!("Process not found"))
369            }
370        } else {
371            Err(anyhow!("Process pool not initialized"))
372        }
373    }
374
375    /// Checks if the process associated with the current instance is running.
376    ///
377    /// This method attempts to determine if a process with the `pid` of the current instance
378    /// exists in the global `PROCESS_POOL`.
379    ///
380    /// # Returns
381    /// * `true` - If the process with the associated `pid` is currently present in the global process pool.
382    /// * `false` - If the process is not found in the global process pool or if the pool is not initialized.
383    ///
384    /// # Async Behavior
385    /// This method is asynchronous because it acquires a lock on the `PROCESS_POOL`.
386    ///
387    /// # Notes
388    /// - The `PROCESS_POOL` must be initialized before calling this function.
389    ///   If `PROCESS_POOL` is not set, the function will return `false`.
390    /// - The `PROCESS_POOL` is expected to be a globally accessible, asynchronous, and thread-safe
391    ///   data structure that tracks active processes.
392    ///
393    /// # Example
394    /// ```rust
395    /// let result = instance.is_process_running().await;
396    /// if result {
397    ///     println!("Process is running.");
398    /// } else {
399    ///     println!("Process is not running.");
400    /// }
401    /// ```
402    pub async fn is_process_running(&self) -> bool {
403        if let Some(process_pool) = PROCESS_POOL.get() {
404            let pool = process_pool.lock().await;
405            return pool.contains_key(&self.pid);
406        }
407        false
408    }
409
410    /// Asynchronously terminates a process identified by its `pid`.
411    ///
412    /// This method performs the following operations tailored to the target operating system:
413    ///
414    /// - **Windows**: Opens a handle to the process using its process ID (`pid`) and forcefully terminates it
415    ///   using the `TerminateProcess` function from the WinAPI.
416    /// - **Linux**: Utilizes the `kill` system call with the `SIGKILL` signal to forcefully terminate the process.
417    ///
418    /// ## Platform-specific Notes:
419    ///
420    /// - On **Windows**, the process is identified and terminated using the `OpenProcess` and `TerminateProcess`
421    ///   functions from the WinAPI.
422    /// - On **Linux**, the `kill` system call is used with the signal `SIGKILL` (9) to ensure the process is terminated.
423    ///
424    /// # Errors
425    ///
426    /// - Returns an error if the process termination fails (on Linux) due to system call errors or invalid process IDs.
427    ///   On failure, the error contains details about the `pid` and the last OS error encountered.
428    ///
429    /// # Safety
430    ///
431    /// This method uses unsafe code blocks to interact with system APIs (`libc` on Linux, WinAPI on Windows). Ensure
432    /// that the provided `pid` corresponds to a valid process, and consider the implications of forcefully
433    /// terminating processes.
434    ///
435    /// # Example
436    ///
437    /// ```rust
438    /// let process_manager = SomeProcessManager::new(pid); // Example struct containing the pid
439    /// if let Err(e) = process_manager.kill().await {
440    ///     eprintln!("Failed to terminate process: {}", e);
441    /// }
442    /// ```
443    /// Attempts to gracefully shut down the process, falling back to forceful termination if needed.
444    ///
445    /// This method first tries to gracefully shut down the process by:
446    /// - On Linux: Sending a SIGTERM signal
447    /// - On Windows: Sending a WM_CLOSE message to the main window
448    ///
449    /// If the process doesn't exit within the specified timeout, it will forcefully terminate
450    /// the process using the `kill()` method.
451    ///
452    /// # Arguments
453    ///
454    /// * `timeout` - A `std::time::Duration` specifying how long to wait for the process to exit
455    ///   gracefully before forcefully terminating it.
456    ///
457    /// # Returns
458    ///
459    /// * `Ok(())` - If the process was successfully shut down (either gracefully or forcefully).
460    /// * `Err(Error)` - If an error occurred during the shutdown process.
461    ///
462    /// # Example
463    ///
464    /// ```rust
465    /// // Try to shut down gracefully, waiting up to 5 seconds before force killing
466    /// if let Err(e) = process.shutdown(std::time::Duration::from_secs(5)).await {
467    ///     eprintln!("Failed to shut down process: {}", e);
468    /// }
469    /// ```
470    pub async fn shutdown(&self, timeout: std::time::Duration) -> Result<()> {
471        // First, try to gracefully shut down the process
472        let graceful_shutdown_result = self.graceful_shutdown().await;
473
474        // If graceful shutdown failed or isn't implemented for this platform, log it but continue
475        if let Err(e) = &graceful_shutdown_result {
476            debug!("Graceful shutdown attempt failed: {}", e);
477            return self.kill().await;
478        }
479
480        // Wait for the process to exit for the specified timeout
481        let start_time = std::time::Instant::now();
482        while start_time.elapsed() < timeout {
483            if !self.is_process_running().await {
484                return Ok(());
485            }
486            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
487        }
488
489        // If we're here, the process didn't exit gracefully within the timeout
490        // Fall back to forceful termination
491        debug!("Process did not exit gracefully within timeout, forcing termination");
492        self.kill().await
493    }
494
495    /// Attempts to gracefully shut down the process without forceful termination.
496    ///
497    /// This method is platform-specific:
498    /// - On Linux: Sends a SIGTERM signal to request graceful termination
499    /// - On Windows: First tries to send a Ctrl+C event to the process using GenerateConsoleCtrlEvent.
500    ///   If that fails, it attempts to send an "exit" command to the process via stdin.
501    ///
502    /// # Returns
503    ///
504    /// * `Ok(())` - If the graceful shutdown signal was successfully sent.
505    /// * `Err(Error)` - If an error occurred while sending the graceful shutdown signal.
506    ///
507    /// # Notes
508    ///
509    /// - A successful return doesn't guarantee the process will actually exit.
510    ///   Use `shutdown()` with a timeout to ensure the process exits.
511    /// - On Windows, the GenerateConsoleCtrlEvent function works with process groups, not individual
512    ///   processes, so it may not work for all processes. The fallback "exit" command approach
513    ///   works for many command-line applications that accept such commands.
514    /// - For GUI applications on Windows, neither approach may work. In such cases, the `shutdown()`
515    ///   method will fall back to forceful termination after the timeout.
516    async fn graceful_shutdown(&self) -> Result<()> {
517        #[cfg(target_os = "windows")]
518        {
519            unsafe {
520                // First, try to send Ctrl+C event to the process
521                // CTRL_C_EVENT = 0, CTRL_BREAK_EVENT = 1
522                // Note: GenerateConsoleCtrlEvent works with process groups, not individual processes
523                // For console applications, we can try sending Ctrl+C to the process
524                let result = winapi::um::wincon::GenerateConsoleCtrlEvent(0, self.pid);
525                if result == 0 {
526                    return Err(anyhow!(
527                        "Failed to send Ctrl+C to process {}: {}",
528                        self.pid,
529                        std::io::Error::last_os_error()
530                    ));
531                }
532            }
533            return Ok(());
534        }
535
536        #[cfg(target_os = "linux")]
537        {
538            unsafe {
539                // Use the kill system call with SIGTERM (15) to request graceful termination
540                let result = libc::kill(self.pid as libc::pid_t, libc::SIGTERM);
541                if result != 0 {
542                    return Err(anyhow!(
543                        "Failed to send SIGTERM to process {}: {}",
544                        self.pid,
545                        std::io::Error::last_os_error()
546                    ));
547                }
548            }
549            return Ok(());
550        }
551
552        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
553        {
554            return Err(anyhow!("Graceful shutdown not implemented for this platform"));
555        }
556    }
557
558    /// Forcefully terminates the process immediately.
559    ///
560    /// This method should be used as a last resort when a process needs to be terminated
561    /// immediately. For a more graceful approach, consider using `shutdown()` first.
562    ///
563    /// # Returns
564    ///
565    /// * `Ok(())` - If the process was successfully terminated.
566    /// * `Err(Error)` - If an error occurred during the termination process.
567    ///
568    /// # Example
569    ///
570    /// ```rust
571    /// if let Err(e) = process.kill().await {
572    ///     eprintln!("Failed to terminate process: {}", e);
573    /// }
574    /// ```
575    pub async fn kill(&self) -> Result<()> {
576        #[cfg(target_os = "windows")]
577        {
578            unsafe {
579                // PROCESS_TERMINATE (0x00010000) access right is required to terminate a process
580                let handle = winapi::um::processthreadsapi::OpenProcess(0x00010000, 0, self.pid);
581                if handle.is_null() {
582                    return Err(anyhow!("Failed to open process {}: {}", self.pid, std::io::Error::last_os_error()));
583                }
584
585                // Attempt to terminate the process
586                let result = winapi::um::processthreadsapi::TerminateProcess(handle, 0);
587
588                // Always close the handle to prevent resource leaks
589                let close_result = winapi::um::handleapi::CloseHandle(handle);
590
591                // Check if termination was successful
592                if result == 0 {
593                    return Err(anyhow!(
594                        "Failed to terminate process {}: {}",
595                        self.pid,
596                        std::io::Error::last_os_error()
597                    ));
598                }
599
600                // Check if handle was closed successfully
601                if close_result == 0 {
602                    warn!(
603                        "Failed to close process handle for process {}: {}",
604                        self.pid,
605                        std::io::Error::last_os_error()
606                    );
607                    // We don't return an error here as the process was terminated successfully
608                }
609            }
610        }
611        #[cfg(target_os = "linux")]
612        {
613            unsafe {
614                // Use the kill system call with SIGKILL (9) to forcefully terminate the process
615                let result = libc::kill(self.pid as libc::pid_t, libc::SIGKILL);
616                if result != 0 {
617                    return Err(anyhow!("Failed to kill process {}: {}", self.pid, std::io::Error::last_os_error()));
618                }
619            }
620        }
621
622        Ok(())
623    }
624}
625
626impl AsynchronousInteractiveProcess {
627    /// Creates a new instance of the struct with default values.
628    ///
629    /// # Arguments
630    ///
631    /// * `filename` - A value that can be converted into a `String`. This typically represents the
632    ///   name or path of the file associated with the instance.
633    ///
634    /// # Returns
635    ///
636    /// Returns a new instance of the struct populated with default fields:
637    /// - `pid`: `None` (indicating no process ID is associated yet).
638    /// - `filename`: The provided `filename` converted into a `String`.
639    /// - `arguments`: An empty vector, representing no initial arguments.
640    /// - `working_directory`: Set to the current directory (`"./"`).
641    /// - `sender`: `None`, indicating no sender is associated initially.
642    /// - `receiver`: `None`, indicating no receiver is associated initially.
643    /// - `input_queue`: An empty `VecDeque`, representing no items in the input queue.
644    ///
645    /// # Example
646    ///
647    /// ```
648    /// let instance = MyStruct::new("example.txt");
649    /// assert_eq!(instance.filename, "example.txt");
650    /// assert!(instance.pid.is_none());
651    /// assert!(instance.arguments.is_empty());
652    /// assert_eq!(instance.working_directory, PathBuf::from("./"));
653    /// assert!(instance.sender.is_none());
654    /// assert!(instance.receiver.is_none());
655    /// assert!(instance.input_queue.is_empty());
656    /// ```
657    pub fn new(filename: impl Into<String>) -> Self {
658        Self {
659            pid: None,
660            filename: filename.into(),
661            arguments: Vec::new(),
662            working_directory: PathBuf::from("./"),
663            sender: None,
664            output_broadcaster: None,
665            _keep_alive_receiver: None,
666            input_queue: VecDeque::new(),
667            exit_callback: None,
668        }
669    }
670
671    /// Sets the arguments for the current instance by converting a vector of items
672    /// implementing `Into<String>` into a `Vec<String>`.
673    ///
674    /// # Parameters
675    /// - `args`: A `Vec` containing items that implement the `Into<String>` trait. These
676    ///   items will be converted into `String` and used to set the `arguments` field
677    ///   of the instance.
678    ///
679    /// # Returns
680    /// - `Self`: The current instance (`self`) after updating its `arguments` field.
681    ///
682    /// # Example
683    /// ```rust
684    /// let instance = MyStruct::new()
685    ///     .with_arguments(vec!["arg1", "arg2", String::from("arg3")]);
686    /// ```
687    ///
688    /// This method allows chaining, as it returns the updated instance
689    /// after setting the `arguments` field.
690    pub fn with_arguments(mut self, args: Vec<impl Into<String>>) -> Self {
691        self.arguments = args.into_iter().map(|arg| arg.into()).collect();
692        self
693    }
694
695    /// Adds an argument to the `arguments` vector and returns the modified instance.
696    ///
697    /// # Parameters
698    ///
699    /// * `arg` - A value that implements the `Into<String>` trait, which will be converted into a `String`
700    ///   and added to the `arguments` vector.
701    ///
702    /// # Returns
703    ///
704    /// Returns the modified instance of the implementing struct (`Self`) with the new argument added.
705    ///
706    /// # Example
707    ///
708    /// ```rust
709    /// let instance = SomeStruct::new().with_argument("example");
710    /// ```
711    ///
712    /// This adds the string `"example"` to the `arguments` vector of `SomeStruct`.
713    pub fn with_argument(mut self, arg: impl Into<String>) -> Self {
714        self.arguments.push(arg.into());
715        self
716    }
717
718    /// Sets the working directory for the instance and returns the modified instance.
719    ///
720    /// # Arguments
721    ///
722    /// * `dir` - A value that can be converted into a `PathBuf`, representing the desired working directory.
723    ///
724    /// # Returns
725    ///
726    /// The instance of the struct with the updated working directory.
727    ///
728    /// # Example
729    ///
730    /// ```rust
731    /// let instance = MyStruct::new()
732    ///     .with_working_directory("/path/to/directory");
733    /// ```
734    pub fn with_working_directory(mut self, dir: impl Into<PathBuf>) -> Self {
735        self.working_directory = dir.into();
736        self
737    }
738
739    pub fn process_exit_callback<F>(mut self, callback: F) -> Self
740    where
741        F: Fn(i32) + Send + Sync + 'static,
742    {
743        self.exit_callback = Some(Arc::new(callback));
744        self
745    }
746
747    /// Starts a new process based on the configuration stored in the struct, manages
748    /// its I/O streams asynchronously, and tracks its lifecycle in a shared process pool.
749    ///
750    /// # Returns
751    /// - `Ok<u32>`: Returns the process ID (PID) if the process starts successfully.
752    /// - `Err(std::io::Error)`: Returns an error if any part of the process start operation fails.
753    ///
754    /// # Process Configuration
755    /// - The process is launched using the executable specified in `self.filename`.
756    /// - Command-line arguments are passed via `self.arguments`.
757    /// - The process will run in the directory specified in `self.working_directory`.
758    ///
759    /// # Standard I/O Management
760    /// - The process's `stdin` is assigned a `tokio::sync::mpsc::channel` for communication.
761    /// - `stdout` and `stderr` are read asynchronously. Each line from these streams is sent
762    ///   to an `mpsc::channel`, where `stdout`/`stderr` data can be consumed.
763    ///
764    /// # Shared Process Pool
765    /// - The process metadata, including its PID and I/O channels, is stored in a global,
766    ///   thread-safe `PROCESS_POOL`.
767    /// - The process pool is managed using a `tokio::sync::Mutex` and stores processes in a
768    ///   `HashMap` with their PID as the key.
769    ///
770    /// # Asynchronous I/O
771    /// - A background task is spawned to write data from an internal input queue to the process's
772    ///   `stdin`.
773    /// - Another background task continuously reads and forwards `stdout` and `stderr` messages
774    ///   from the process.
775    /// - If the process exits or encounters an error, its PID and metadata are removed from the
776    ///   process pool.
777    ///
778    /// # Example Use Case
779    /// ```no_run
780    /// let mut my_process = MyProcess {
781    ///     filename: "my_program".to_string(),
782    ///     arguments: vec!["--arg1", "value1".to_string()],
783    ///     working_directory: "/path/to/dir".to_string(),
784    ///     pid: None,
785    ///     sender: None,
786    ///     receiver: None,
787    ///     input_queue: VecDeque::new(),
788    /// };
789    ///
790    /// let pid = my_process.start().await.unwrap();
791    /// println!("Started process with PID: {}", pid);
792    /// ```
793    ///
794    /// # Notes
795    /// - The process's `stdin`, `stdout`, and `stderr` are piped to capture and manage
796    ///   communication asynchronously.
797    /// - Each spawned background task is independently responsible for managing a specific
798    ///   stream or part of the process lifecycle.
799    /// - Errors during I/O operations or spawning the process are logged using the `log` crate
800    ///   macros (e.g., `error!`, `debug!`).
801    ///
802    /// # Potential Errors
803    /// - Failure to spawn the process (e.g., if `self.filename` is invalid or inaccessible).
804    /// - Errors during communication with the process's I/O streams (e.g., writing to a closed `stdin`).
805    /// - Mutex locking failure on the shared process pool due to an internal inconsistency.
806    pub async fn start(&mut self) -> Result<u32> {
807        let mut command = Command::new(&self.filename);
808        command.args(&self.arguments);
809
810        // Convert UNC path to regular Windows path if needed
811        let working_dir = if cfg!(windows) {
812            // Remove UNC prefix if present
813            let path_str = self.working_directory.to_string_lossy();
814            if path_str.starts_with(r"\\?\") {
815                PathBuf::from(&path_str[4..]) // Remove the \\?\ prefix
816            } else {
817                self.working_directory.clone()
818            }
819        } else {
820            self.working_directory.clone()
821        };
822
823        // Debug the working directory
824        debug!("tokio-interactive: filename = {}", self.filename);
825        debug!("tokio-interactive: arguments = {:?}", self.arguments);
826        debug!("tokio-interactive: working_directory = {:?}", working_dir);
827        debug!("tokio-interactive: working_directory exists = {}", working_dir.exists());
828        debug!("tokio-interactive: working_directory is_dir = {}", working_dir.is_dir());
829
830        // Check if working directory is absolute
831        debug!("tokio-interactive: working_directory is_absolute = {}", working_dir.is_absolute());
832
833        // Get current directory before setting
834        if let Ok(current_dir) = std::env::current_dir() {
835            debug!("tokio-interactive: current working directory before setting = {:?}", current_dir);
836        }
837
838        command.current_dir(working_dir);
839
840        command.stdin(std::process::Stdio::piped());
841        command.stdout(std::process::Stdio::piped());
842        command.stderr(std::process::Stdio::piped());
843
844        // Debug the final command that will be executed
845        debug!("tokio-interactive: Final command = {:?}", command);
846
847        let mut child = command.spawn()?;
848        let pid = child.id().unwrap_or(0);
849
850        debug!("tokio-interactive: Process spawned with PID = {}", pid);
851
852        let (stdin_sender, mut stdin_receiver) = tokio::sync::mpsc::channel::<String>(100);
853        let (stdout_broadcaster, _keep_alive_receiver) = broadcast::channel::<String>(100);
854
855        if let Some(mut stdin) = child.stdin.take() {
856            tokio::spawn(async move {
857                while let Some(input) = stdin_receiver.recv().await {
858                    if let Err(e) = stdin.write_all(input.as_bytes()).await {
859                        error!("Failed to write to process stdin: {}", e);
860                        break;
861                    }
862                    if let Err(e) = stdin.write_all(b"\n").await {
863                        error!("Failed to write newline to process stdin: {}", e);
864                        break;
865                    }
866                    if let Err(e) = stdin.flush().await {
867                        error!("Failed to flush process stdin: {}", e);
868                        break;
869                    }
870                }
871            });
872        }
873
874        if let Some(stdout) = child.stdout.take() {
875            let stdout_broadcaster_clone = stdout_broadcaster.clone();
876            tokio::spawn(async move {
877                let mut reader = BufReader::new(stdout);
878                let mut line = String::new();
879                while let Ok(bytes_read) = reader.read_line(&mut line).await {
880                    if bytes_read == 0 {
881                        break;
882                    }
883                    if let Err(e) = stdout_broadcaster_clone.send(line.trim_end().to_string()) {
884                        error!("Failed to broadcast stdout message: {}", e);
885                        break;
886                    }
887                    line.clear();
888                }
889            });
890        }
891
892        if let Some(stderr) = child.stderr.take() {
893            let stderr_broadcaster = stdout_broadcaster.clone();
894            tokio::spawn(async move {
895                let mut reader = BufReader::new(stderr);
896                let mut line = String::new();
897                while let Ok(bytes_read) = reader.read_line(&mut line).await {
898                    if bytes_read == 0 {
899                        break;
900                    }
901                    if let Err(e) = stderr_broadcaster.send(format!("STDERR: {}", line.trim_end())) {
902                        error!("Failed to broadcast stderr message: {}", e);
903                        break;
904                    }
905                    line.clear();
906                }
907            });
908        }
909
910        let queue_pid = pid;
911        tokio::spawn(async move {
912            loop {
913                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
914
915                let process_pool = match PROCESS_POOL.get() {
916                    Some(pool) => pool,
917                    None => break,
918                };
919
920                let mut pool = process_pool.lock().await;
921
922                let process = match pool.get_mut(&queue_pid) {
923                    Some(p) => p,
924                    None => break,
925                };
926
927                while let Some(input) = process.input_queue.pop_front() {
928                    if let Some(sender) = &process.sender {
929                        if let Err(_) = sender.try_send(input.clone()) {
930                            process.input_queue.push_front(input);
931                            break;
932                        }
933                    } else {
934                        process.input_queue.clear();
935                        break;
936                    }
937                }
938
939                drop(pool);
940            }
941        });
942
943        let exit_callback = self.exit_callback.clone();
944        tokio::spawn(async move {
945            match child.wait().await {
946                Ok(exit_status) => {
947                    let exit_code = exit_status.code().unwrap_or(-1); // Use -1 for unknown exit codes
948                    debug!("Process {} exited with code: {}", pid, exit_code);
949
950                    if let Some(exit_callback) = exit_callback {
951                        exit_callback(exit_code);
952                    }
953                }
954                Err(e) => {
955                    error!("Process {} exited with error: {}", pid, e);
956
957                    // Still trigger callback with error code
958                    if let Some(exit_callback) = exit_callback {
959                        exit_callback(-1); // or some other error indicator
960                    }
961                }
962            }
963
964            // Cleanup
965            if let Some(process_pool) = PROCESS_POOL.get() {
966                let mut pool = process_pool.lock().await;
967                pool.remove(&pid);
968                debug!("Process {} has exited and been removed from the pool", pid);
969            }
970        });
971
972        let process_pool = PROCESS_POOL.get_or_init(|| Arc::new(Mutex::new(HashMap::new())));
973
974        let mut pool = process_pool.lock().await;
975        pool.insert(
976            pid,
977            Self {
978                pid: Some(pid),
979                filename: self.filename.clone(),
980                arguments: self.arguments.clone(),
981                working_directory: self.working_directory.clone(),
982                sender: Some(stdin_sender),
983                output_broadcaster: Some(stdout_broadcaster.clone()),
984                _keep_alive_receiver: Some(_keep_alive_receiver),
985                input_queue: VecDeque::new(),
986                exit_callback: self.exit_callback.clone(),
987            },
988        );
989
990        self.pid = Some(pid);
991
992        Ok(pid)
993    }
994
995    /// Asynchronously retrieves a handle to a process for a given process ID (PID).
996    ///
997    /// This function checks if a process with the specified PID exists in a shared
998    /// process pool. If the process is found, it returns an `Option` containing a
999    /// `ProcessHandle` for the process. Otherwise, it returns `None`.
1000    ///
1001    /// # Arguments
1002    ///
1003    /// * `pid` - A 32-bit unsigned integer representing the process ID of the
1004    ///           desired process.
1005    ///
1006    /// # Returns
1007    ///
1008    /// * `Some(ProcessHandle)` - If a process with the given PID is found in the
1009    ///                           process pool.
1010    /// * `None` - If the process does not exist in the process pool or if the
1011    ///            process pool is uninitialized.
1012    ///
1013    /// # Example
1014    ///
1015    /// ```rust
1016    /// if let Some(handle) = get_process_by_pid(12345).await {
1017    ///     println!("Process found with PID: {}", handle.pid);
1018    /// } else {
1019    ///     println!("Process not found.");
1020    /// }
1021    /// ```
1022    ///
1023    /// # Errors
1024    ///
1025    /// This function will return `None` if the global process pool (`PROCESS_POOL`)
1026    /// has not been initialized or is unavailable.
1027    ///
1028    /// # Note
1029    ///
1030    /// This is an asynchronous function and must be awaited to complete its operation.
1031    pub async fn get_process_by_pid(pid: u32) -> Option<ProcessHandle> {
1032        let process_pool = PROCESS_POOL.get()?;
1033        let pool = process_pool.lock().await;
1034        if let Some(process) = pool.get(&pid) {
1035            if let Some(broadcaster) = &process.output_broadcaster {
1036                let receiver = broadcaster.subscribe();
1037                Some(ProcessHandle { pid, receiver })
1038            } else {
1039                None
1040            }
1041        } else {
1042            None
1043        }
1044    }
1045
1046    /// Asynchronously checks if a process identified by its `pid` is currently running.
1047    ///
1048    /// # Returns
1049    /// - `true` if the process with the stored `pid` is found in the `PROCESS_POOL`.
1050    /// - `false` if the stored `pid` is `None`, the `PROCESS_POOL` is not initialized,
1051    ///   or the `pid` does not exist in the `PROCESS_POOL`.
1052    ///
1053    /// The function first checks if the `pid` instance variable is set. If so, it attempts
1054    /// to access the global `PROCESS_POOL`. If `PROCESS_POOL` is initialized, it acquires
1055    /// a lock and checks if the `pid` exists in the pool.
1056    ///
1057    /// # Examples
1058    /// ```rust
1059    /// let result = my_instance.is_process_running().await;
1060    /// if result {
1061    ///     println!("The process is running.");
1062    /// } else {
1063    ///     println!("The process is not running.");
1064    /// }
1065    /// ```
1066    ///
1067    /// # Async Behavior
1068    /// This function acquires an asynchronous lock on the `PROCESS_POOL` to safely access
1069    /// its contents and will yield if the lock is currently held elsewhere.
1070    ///
1071    /// # Panics
1072    /// This function may panic if the async lock on the `PROCESS_POOL` fails unexpectedly.
1073    ///
1074    /// # Dependencies
1075    /// - `PROCESS_POOL` should be a globally accessible and lazily initialized structure
1076    ///   (e.g., using `once_cell` or similar patterns) that maintains a mapping of currently
1077    ///   active processes.
1078    pub async fn is_process_running(&self) -> bool {
1079        if let Some(pid) = self.pid {
1080            if let Some(process_pool) = PROCESS_POOL.get() {
1081                let pool = process_pool.lock().await;
1082                return pool.contains_key(&pid);
1083            }
1084        }
1085        false
1086    }
1087}