tokio_interactive/
lib.rs

1#![doc = include_str!("../README.md")]
2use anyhow::{anyhow, Result};
3use log::*;
4use serde::{Deserialize, Serialize};
5use std::collections::{HashMap, VecDeque};
6use std::fmt::{Debug, Formatter};
7use std::path::PathBuf;
8use std::sync::{Arc, OnceLock};
9use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
10use tokio::process::Command;
11use tokio::sync::mpsc::{Receiver, Sender};
12use tokio::sync::Mutex;
13// Change this line
14
15/// A static, lazily-initialized process pool used to manage asynchronous interactive processes.
16///
17/// This global variable utilizes the `OnceLock` to ensure it is initialized only once during
18/// the program's lifetime. It holds an `Arc<Mutex<HashMap<u32, AsynchronousInteractiveProcess>>>`
19/// to allow safe concurrent access and modification of the process pool.
20///
21/// - `OnceLock`: Provides thread-safe, one-time initialization of the process pool.
22/// - `Arc`: Ensures the `Mutex<HashMap<...>>` can be shared across threads safely.
23/// - `Mutex`: Protects the `HashMap` from concurrent modification, ensuring thread safety.
24/// - `HashMap<u32, AsynchronousInteractiveProcess>`: Maps process IDs (`u32`) to
25///   corresponding `AsynchronousInteractiveProcess` instances.
26///
27/// Usage:
28///
29/// The process pool is intended to store and manage asynchronous interactive processes by their IDs.
30/// Access needs to ensure proper locking with the `Mutex` guard to guarantee thread safety.
31///
32/// Example initialization:
33/// ```rust
34/// PROCESS_POOL.get_or_init(|| Arc::new(Mutex::new(HashMap::new())));
35/// ```
36///
37/// Example access:
38/// ```rust
39/// if let Some(pool) = PROCESS_POOL.get() {
40///     let mut guard = pool.lock().unwrap();
41///     guard.insert(process_id, new_process);
42/// }
43/// ```
44/// Ensure that any operations on the pool respect the locking mechanism provided by the `Mutex`.
45///
46/// Once initialized, `PROCESS_POOL` cannot be re-initialized or re-assigned.
47static PROCESS_POOL: OnceLock<Arc<Mutex<HashMap<u32, AsynchronousInteractiveProcess>>>> = OnceLock::new();
48
49/// Represents a handle to a process identified by its process ID (PID).
50///
51/// The `ProcessHandle` struct is used to encapsulate the process identifier (PID)
52/// of a running process. It is designed to be lightweight and can be cloned
53/// or debugged as required.
54///
55/// # Fields
56/// - `pid`: A `u32` value that represents the process ID of the targeted process.
57///
58/// # Traits
59/// - `Debug`: Allows instances of `ProcessHandle` to be formatted using the `{:?}` formatter
60///   for debugging purposes.
61/// - `Clone`: Allows the `ProcessHandle` to be cloned, creating a new instance with the
62///   same `pid`.
63///
64/// # Example
65/// ```
66/// let handle = ProcessHandle { pid: 1234 };
67/// println!("{:?}", handle); // Outputs: ProcessHandle { pid: 1234 }
68/// let cloned_handle = handle.clone();
69/// println!("{:?}", cloned_handle); // Outputs: ProcessHandle { pid: 1234 }
70/// ```
71#[derive(Debug, Clone)]
72pub struct ProcessHandle {
73    pid: u32,
74}
75
76/// `AsynchronousInteractiveProcess` is a structure that represents a non-blocking,
77/// interactive process. It provides metadata and mechanisms to interact with a
78/// process asynchronously using sender and receiver channels.
79///
80/// # Fields
81///
82/// * `pid` - An optional process ID (`pid`) of the interactive process. If the process
83///   is not yet started, this field will remain `None`.
84///
85/// * `filename` - The name of the executable file that represents the interactive process.
86///   This is required to identify and initiate the process.
87///
88/// * `arguments` - A vector containing the arguments to pass to the executable file
89///   when launching the process.
90///
91/// * `working_directory` - The directory where the interactive process will be executed.
92///   This is represented as a `PathBuf`.
93///
94/// * `sender` - An optional `Sender` channel used to send messages or data to the
95///   interactive process. This field is skipped during serialization and deserialization
96///   because it holds runtime-related data.
97///
98/// * `receiver` - An optional `Receiver` channel used to receive messages or data
99///   from the interactive process. This field is also skipped during serialization
100///   and deserialization for the same reasons as `sender`.
101///
102/// * `input_queue` - A deque (double-ended queue) that maintains an in-memory buffer
103///   of strings representing input for the interactive process. This field is skipped
104///   during serialization as it only contains transient runtime-related data.
105///
106/// # Trait Implementations
107///
108/// - `Debug`: Allows instances of this struct to be formatted and logged for debugging purposes.
109/// - `Serialize`: Makes the struct serializable, excluding fields marked with `#[serde(skip)]`.
110/// - `Deserialize`: Allows deserialization to create instances of this struct from serialized data.
111/// - `Default`: Provides a default implementation where optional fields are set to `None`,
112///   collections are empty, and `filename` is an empty string.
113///
114/// # Example
115/// ```rust
116/// use std::path::PathBuf;
117/// use std::collections::VecDeque;
118/// use serde::{Serialize, Deserialize};
119/// use crossbeam_channel::{Sender, Receiver};
120///
121/// let process = AsynchronousInteractiveProcess {
122///     pid: None,
123///     filename: "my_program".to_string(),
124///     arguments: vec!["--help".to_string()],
125///     working_directory: PathBuf::from("/path/to/dir"),
126///     sender: None,
127///     receiver: None,
128///     input_queue: VecDeque::new(),
129/// };
130///
131/// println!("{:?}", process);
132/// ```
133///
134/// This structure is ideal for scenarios where processes need to be controlled
135/// asynchronously with multiple input or output channels.
136#[derive(Serialize, Deserialize, Default)]
137pub struct AsynchronousInteractiveProcess {
138    pub pid: Option<u32>,
139    pub filename: String,
140    pub arguments: Vec<String>,
141    pub working_directory: PathBuf,
142    #[serde(skip)]
143    sender: Option<Sender<String>>,
144    #[serde(skip)]
145    receiver: Option<Receiver<String>>,
146    #[serde(skip)]
147    input_queue: VecDeque<String>,
148    #[serde(skip)]
149    exit_callback: Option<Arc<dyn Fn(i32) + Send + Sync>>,
150}
151
152impl Debug for AsynchronousInteractiveProcess {
153    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154        write!(
155            f,
156            "AsynchronousInteractiveProcess {{ pid: {:?}, filename: {:?}, arguments: {:?}, working_directory: {:?} }}",
157            self.pid, self.filename, self.arguments, self.working_directory
158        )
159    }
160}
161
162impl ProcessHandle {
163    /// Receives an output message asynchronously from the process associated with the instance's `pid`
164    /// if available.
165    ///
166    /// This function attempts to fetch a message by accessing the process pool and checking for a
167    /// message in the associated process's receiver.
168    ///
169    /// The process follows these steps:
170    /// 1. Check if the global process pool (`PROCESS_POOL`) is initialized.
171    /// 2. Attempt to acquire the lock on the pool asynchronously.
172    /// 3. Look for the process associated with `self.pid`.
173    /// 4. If the process has a receiver channel, attempt to get a message using `try_recv`.
174    /// 5. If a message is found, return it as `Ok(Some(String))`.
175    /// 6. If no message is received, retry up to `MAX_RETRIES` times, with a delay of `RETRY_DELAY_MS`
176    ///    milliseconds between each retry.
177    ///
178    /// If no message is received after all retries, the function returns `Ok(None)`.
179    ///
180    /// # Returns
181    ///
182    /// * `Ok(Some(String))` - If a message is successfully received from the receiver channel.
183    /// * `Ok(None)` - If no message is received after all retries.
184    /// * `Err(Error)` - If an error occurs during the process, such as:
185    ///   - The process pool is not initialized
186    ///   - The process with the specified PID is not found
187    ///   - The receiver channel is not available
188    ///   - The receiver channel is disconnected
189    ///
190    /// # Behavior
191    ///
192    /// - This function uses `tokio::time::sleep` for introducing delays between retries and works
193    ///   asynchronously.
194    /// - The function uses constants `MAX_RETRIES` (10) and `RETRY_DELAY_MS` (10) to control the
195    ///   retry behavior.
196    /// - The function properly propagates errors that occur during the process.
197    ///
198    /// # Example
199    ///
200    /// ```rust
201    /// match instance.receive_output().await {
202    ///     Ok(Some(output)) => println!("Received output: {}", output),
203    ///     Ok(None) => println!("No output received."),
204    ///     Err(e) => eprintln!("Error receiving output: {}", e),
205    /// }
206    /// ```
207    ///
208    /// # Note
209    ///
210    /// - The function assumes the existence of a global process pool (`PROCESS_POOL`) which is safe
211    ///   to access concurrently using mutex-based locking.
212    /// - The function makes use of `tokio::sync::Mutex` to handle concurrent executions across
213    ///   asynchronous tasks.
214    /// Default timeout for receive_output in milliseconds
215    const DEFAULT_TIMEOUT_MS: u64 = 100;
216
217    /// Helper function to try receiving a message
218    async fn try_receive_message(
219        pid: u32,
220        pool: &mut tokio::sync::MutexGuard<'_, HashMap<u32, AsynchronousInteractiveProcess>>,
221    ) -> Result<Option<String>> {
222        if let Some(process) = pool.get_mut(&pid) {
223            if let Some(receiver) = &mut process.receiver {
224                match receiver.try_recv() {
225                    Ok(msg) => return Ok(Some(msg)),
226                    Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {}
227                    Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
228                        return Err(anyhow!("Channel disconnected for process {}", pid));
229                    }
230                }
231            } else {
232                return Err(anyhow!("Receiver not available for process {}", pid));
233            }
234        } else {
235            return Err(anyhow!("Process {} not found", pid));
236        }
237        Ok(None)
238    }
239
240    pub async fn receive_output(&self) -> Result<Option<String>> {
241        // Use the default timeout
242        self.receive_output_with_timeout(std::time::Duration::from_millis(Self::DEFAULT_TIMEOUT_MS)).await
243    }
244
245    /// Receives an output message asynchronously from the process associated with the instance's `pid`
246    /// if available, with a specified timeout.
247    ///
248    /// This function is similar to `receive_output()` but allows specifying a custom timeout duration
249    /// instead of using the default timeout.
250    ///
251    /// # Arguments
252    ///
253    /// * `timeout` - A `std::time::Duration` specifying how long to wait for a message before giving up.
254    ///
255    /// # Returns
256    ///
257    /// * `Ok(Some(String))` - If a message is successfully received from the receiver channel.
258    /// * `Ok(None)` - If no message is received before the timeout expires.
259    /// * `Err(Error)` - If an error occurs during the process, such as:
260    ///   - The process pool is not initialized
261    ///   - The process with the specified PID is not found
262    ///   - The receiver channel is not available
263    ///   - The receiver channel is disconnected
264    ///
265    /// # Example
266    ///
267    /// ```rust
268    /// // Wait for up to 5 seconds for a message
269    /// match instance.receive_output_with_timeout(std::time::Duration::from_secs(5)).await {
270    ///     Ok(Some(output)) => println!("Received output: {}", output),
271    ///     Ok(None) => println!("No output received within timeout."),
272    ///     Err(e) => eprintln!("Error receiving output: {}", e),
273    /// }
274    /// ```
275    pub async fn receive_output_with_timeout(&self, timeout: std::time::Duration) -> Result<Option<String>> {
276        // Define constants for retry parameters
277        const RETRY_DELAY_MS: u64 = 10;
278
279        if let Some(process_pool) = PROCESS_POOL.get() {
280            // Try to receive a message immediately
281            {
282                let mut pool = process_pool.lock().await;
283                if let Some(msg) = Self::try_receive_message(self.pid, &mut pool).await? {
284                    return Ok(Some(msg));
285                }
286            }
287
288            // If no message is available, retry with delay until timeout is reached
289            let start_time = std::time::Instant::now();
290            while start_time.elapsed() < timeout {
291                tokio::time::sleep(tokio::time::Duration::from_millis(RETRY_DELAY_MS)).await;
292
293                let mut pool = process_pool.lock().await;
294                if let Some(msg) = Self::try_receive_message(self.pid, &mut pool).await? {
295                    return Ok(Some(msg));
296                }
297            }
298
299            // No message after timeout
300            return Ok(None);
301        }
302
303        Err(anyhow!("Process pool not initialized"))
304    }
305
306    /// Sends the provided input to a process associated with this instance's PID asynchronously.
307    ///
308    /// # Arguments
309    ///
310    /// * `input` - An input of any type that can be converted into a `String`. This is the data
311    ///   that will be sent to the respective process's input queue.
312    ///
313    /// # Returns
314    ///
315    /// * `Ok(())` if the input was successfully sent or queued for sending.
316    /// * `Err` - Returns an error in the following cases:
317    ///     - If the process pool is not initialized.
318    ///     - If the process associated with this PID is not found.
319    ///     - If the process was not started or its sender channel is not available.
320    ///     - If the sender channel is closed and input cannot be sent.
321    ///
322    /// # Details
323    ///
324    /// This function retrieves the process associated with the current instance's `pid`
325    /// from a global process pool. If a valid process is found:
326    /// - The input is sent via an asynchronous channel to the process.
327    /// - If the channel is full, the input is queued for later sending.
328    /// - If the channel is closed, an error is returned.
329    ///
330    /// The function also ensures thread safety by acquiring a lock on the process pool before attempting
331    /// any operations related to the process.
332    ///
333    /// # Errors
334    ///
335    /// This function propagates several potential issues as errors:
336    /// - If the process pool is uninitialized (`PROCESS_POOL.get()` returns `None`).
337    /// - If the process associated with the PID is missing in the pool.
338    /// - If the sender channel was never initialized or is unavailable.
339    /// - If the sender channel is closed and no new messages can be sent.
340    ///
341    /// # Example
342    ///
343    /// ```rust
344    /// use anyhow::Result;
345    ///
346    /// #[tokio::main]
347    /// async fn main() -> Result<()> {
348    ///     let manager = ProcessManager::new(1); // Assumes a struct is managing process with ID 1.
349    ///     manager.send_input("Some input").await?;
350    ///     Ok(())
351    /// }
352    /// ```
353    ///
354    /// # Note
355    ///
356    /// This function expects a global process pool (`PROCESS_POOL`) to be properly initialized before being called.
357    /// Additionally, the associated process must have a valid `sender` channel to accept input.
358    pub async fn send_input(&self, input: impl Into<String>) -> Result<()> {
359        let input_str = input.into();
360        if let Some(process_pool) = PROCESS_POOL.get() {
361            let mut pool = process_pool.lock().await;
362            if let Some(process) = pool.get_mut(&self.pid) {
363                if let Some(sender) = &process.sender {
364                    match sender.try_send(input_str.clone()) {
365                        Ok(_) => Ok(()),
366                        Err(e) => match e {
367                            tokio::sync::mpsc::error::TrySendError::Full(_) => {
368                                process.input_queue.push_back(input_str);
369                                Ok(())
370                            }
371                            tokio::sync::mpsc::error::TrySendError::Closed(_) => Err(anyhow!("Failed to send input: channel closed")),
372                        },
373                    }
374                } else {
375                    Err(anyhow!("Process not started or sender not available"))
376                }
377            } else {
378                Err(anyhow!("Process not found"))
379            }
380        } else {
381            Err(anyhow!("Process pool not initialized"))
382        }
383    }
384
385    /// Checks if the process associated with the current instance is running.
386    ///
387    /// This method attempts to determine if a process with the `pid` of the current instance
388    /// exists in the global `PROCESS_POOL`.
389    ///
390    /// # Returns
391    /// * `true` - If the process with the associated `pid` is currently present in the global process pool.
392    /// * `false` - If the process is not found in the global process pool or if the pool is not initialized.
393    ///
394    /// # Async Behavior
395    /// This method is asynchronous because it acquires a lock on the `PROCESS_POOL`.
396    ///
397    /// # Notes
398    /// - The `PROCESS_POOL` must be initialized before calling this function.
399    ///   If `PROCESS_POOL` is not set, the function will return `false`.
400    /// - The `PROCESS_POOL` is expected to be a globally accessible, asynchronous, and thread-safe
401    ///   data structure that tracks active processes.
402    ///
403    /// # Example
404    /// ```rust
405    /// let result = instance.is_process_running().await;
406    /// if result {
407    ///     println!("Process is running.");
408    /// } else {
409    ///     println!("Process is not running.");
410    /// }
411    /// ```
412    pub async fn is_process_running(&self) -> bool {
413        if let Some(process_pool) = PROCESS_POOL.get() {
414            let pool = process_pool.lock().await;
415            return pool.contains_key(&self.pid);
416        }
417        false
418    }
419
420    /// Asynchronously terminates a process identified by its `pid`.
421    ///
422    /// This method performs the following operations tailored to the target operating system:
423    ///
424    /// - **Windows**: Opens a handle to the process using its process ID (`pid`) and forcefully terminates it
425    ///   using the `TerminateProcess` function from the WinAPI.
426    /// - **Linux**: Utilizes the `kill` system call with the `SIGKILL` signal to forcefully terminate the process.
427    ///
428    /// ## Platform-specific Notes:
429    ///
430    /// - On **Windows**, the process is identified and terminated using the `OpenProcess` and `TerminateProcess`
431    ///   functions from the WinAPI.
432    /// - On **Linux**, the `kill` system call is used with the signal `SIGKILL` (9) to ensure the process is terminated.
433    ///
434    /// # Errors
435    ///
436    /// - Returns an error if the process termination fails (on Linux) due to system call errors or invalid process IDs.
437    ///   On failure, the error contains details about the `pid` and the last OS error encountered.
438    ///
439    /// # Safety
440    ///
441    /// This method uses unsafe code blocks to interact with system APIs (`libc` on Linux, WinAPI on Windows). Ensure
442    /// that the provided `pid` corresponds to a valid process, and consider the implications of forcefully
443    /// terminating processes.
444    ///
445    /// # Example
446    ///
447    /// ```rust
448    /// let process_manager = SomeProcessManager::new(pid); // Example struct containing the pid
449    /// if let Err(e) = process_manager.kill().await {
450    ///     eprintln!("Failed to terminate process: {}", e);
451    /// }
452    /// ```
453    /// Attempts to gracefully shut down the process, falling back to forceful termination if needed.
454    ///
455    /// This method first tries to gracefully shut down the process by:
456    /// - On Linux: Sending a SIGTERM signal
457    /// - On Windows: Sending a WM_CLOSE message to the main window
458    ///
459    /// If the process doesn't exit within the specified timeout, it will forcefully terminate
460    /// the process using the `kill()` method.
461    ///
462    /// # Arguments
463    ///
464    /// * `timeout` - A `std::time::Duration` specifying how long to wait for the process to exit
465    ///   gracefully before forcefully terminating it.
466    ///
467    /// # Returns
468    ///
469    /// * `Ok(())` - If the process was successfully shut down (either gracefully or forcefully).
470    /// * `Err(Error)` - If an error occurred during the shutdown process.
471    ///
472    /// # Example
473    ///
474    /// ```rust
475    /// // Try to shut down gracefully, waiting up to 5 seconds before force killing
476    /// if let Err(e) = process.shutdown(std::time::Duration::from_secs(5)).await {
477    ///     eprintln!("Failed to shut down process: {}", e);
478    /// }
479    /// ```
480    pub async fn shutdown(&self, timeout: std::time::Duration) -> Result<()> {
481        // First, try to gracefully shut down the process
482        let graceful_shutdown_result = self.graceful_shutdown().await;
483
484        // If graceful shutdown failed or isn't implemented for this platform, log it but continue
485        if let Err(e) = &graceful_shutdown_result {
486            debug!("Graceful shutdown attempt failed: {}", e);
487            return self.kill().await;
488        }
489
490        // Wait for the process to exit for the specified timeout
491        let start_time = std::time::Instant::now();
492        while start_time.elapsed() < timeout {
493            if !self.is_process_running().await {
494                return Ok(());
495            }
496            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
497        }
498
499        // If we're here, the process didn't exit gracefully within the timeout
500        // Fall back to forceful termination
501        debug!("Process did not exit gracefully within timeout, forcing termination");
502        self.kill().await
503    }
504
505    /// Attempts to gracefully shut down the process without forceful termination.
506    ///
507    /// This method is platform-specific:
508    /// - On Linux: Sends a SIGTERM signal to request graceful termination
509    /// - On Windows: First tries to send a Ctrl+C event to the process using GenerateConsoleCtrlEvent.
510    ///   If that fails, it attempts to send an "exit" command to the process via stdin.
511    ///
512    /// # Returns
513    ///
514    /// * `Ok(())` - If the graceful shutdown signal was successfully sent.
515    /// * `Err(Error)` - If an error occurred while sending the graceful shutdown signal.
516    ///
517    /// # Notes
518    ///
519    /// - A successful return doesn't guarantee the process will actually exit.
520    ///   Use `shutdown()` with a timeout to ensure the process exits.
521    /// - On Windows, the GenerateConsoleCtrlEvent function works with process groups, not individual
522    ///   processes, so it may not work for all processes. The fallback "exit" command approach
523    ///   works for many command-line applications that accept such commands.
524    /// - For GUI applications on Windows, neither approach may work. In such cases, the `shutdown()`
525    ///   method will fall back to forceful termination after the timeout.
526    async fn graceful_shutdown(&self) -> Result<()> {
527        #[cfg(target_os = "windows")]
528        {
529            unsafe {
530                // First, try to send Ctrl+C event to the process
531                // CTRL_C_EVENT = 0, CTRL_BREAK_EVENT = 1
532                // Note: GenerateConsoleCtrlEvent works with process groups, not individual processes
533                // For console applications, we can try sending Ctrl+C to the process
534                let result = winapi::um::wincon::GenerateConsoleCtrlEvent(0, self.pid);
535                if result == 0 {
536                    return Err(anyhow!("Failed to send Ctrl+C to process {}: {}", self.pid, std::io::Error::last_os_error()));
537                }
538            }
539            return Ok(());
540        }
541
542        #[cfg(target_os = "linux")]
543        {
544            unsafe {
545                // Use the kill system call with SIGTERM (15) to request graceful termination
546                let result = libc::kill(self.pid as libc::pid_t, libc::SIGTERM);
547                if result != 0 {
548                    return Err(anyhow!("Failed to send SIGTERM to process {}: {}", self.pid, std::io::Error::last_os_error()));
549                }
550            }
551            return Ok(());
552        }
553
554        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
555        {
556            return Err(anyhow!("Graceful shutdown not implemented for this platform"));
557        }
558    }
559
560    /// Forcefully terminates the process immediately.
561    ///
562    /// This method should be used as a last resort when a process needs to be terminated
563    /// immediately. For a more graceful approach, consider using `shutdown()` first.
564    ///
565    /// # Returns
566    ///
567    /// * `Ok(())` - If the process was successfully terminated.
568    /// * `Err(Error)` - If an error occurred during the termination process.
569    ///
570    /// # Example
571    ///
572    /// ```rust
573    /// if let Err(e) = process.kill().await {
574    ///     eprintln!("Failed to terminate process: {}", e);
575    /// }
576    /// ```
577    pub async fn kill(&self) -> Result<()> {
578        #[cfg(target_os = "windows")]
579        {
580            unsafe {
581                // PROCESS_TERMINATE (0x00010000) access right is required to terminate a process
582                let handle = winapi::um::processthreadsapi::OpenProcess(0x00010000, 0, self.pid);
583                if handle.is_null() {
584                    return Err(anyhow!("Failed to open process {}: {}", self.pid, std::io::Error::last_os_error()));
585                }
586
587                // Attempt to terminate the process
588                let result = winapi::um::processthreadsapi::TerminateProcess(handle, 0);
589
590                // Always close the handle to prevent resource leaks
591                let close_result = winapi::um::handleapi::CloseHandle(handle);
592
593                // Check if termination was successful
594                if result == 0 {
595                    return Err(anyhow!("Failed to terminate process {}: {}", self.pid, std::io::Error::last_os_error()));
596                }
597
598                // Check if handle was closed successfully
599                if close_result == 0 {
600                    warn!("Failed to close process handle for process {}: {}", self.pid, std::io::Error::last_os_error());
601                    // We don't return an error here as the process was terminated successfully
602                }
603            }
604        }
605        #[cfg(target_os = "linux")]
606        {
607            unsafe {
608                // Use the kill system call with SIGKILL (9) to forcefully terminate the process
609                let result = libc::kill(self.pid as libc::pid_t, libc::SIGKILL);
610                if result != 0 {
611                    return Err(anyhow!("Failed to kill process {}: {}", self.pid, std::io::Error::last_os_error()));
612                }
613            }
614        }
615
616        Ok(())
617    }
618}
619
620impl AsynchronousInteractiveProcess {
621    /// Creates a new instance of the struct with default values.
622    ///
623    /// # Arguments
624    ///
625    /// * `filename` - A value that can be converted into a `String`. This typically represents the
626    ///   name or path of the file associated with the instance.
627    ///
628    /// # Returns
629    ///
630    /// Returns a new instance of the struct populated with default fields:
631    /// - `pid`: `None` (indicating no process ID is associated yet).
632    /// - `filename`: The provided `filename` converted into a `String`.
633    /// - `arguments`: An empty vector, representing no initial arguments.
634    /// - `working_directory`: Set to the current directory (`"./"`).
635    /// - `sender`: `None`, indicating no sender is associated initially.
636    /// - `receiver`: `None`, indicating no receiver is associated initially.
637    /// - `input_queue`: An empty `VecDeque`, representing no items in the input queue.
638    ///
639    /// # Example
640    ///
641    /// ```
642    /// let instance = MyStruct::new("example.txt");
643    /// assert_eq!(instance.filename, "example.txt");
644    /// assert!(instance.pid.is_none());
645    /// assert!(instance.arguments.is_empty());
646    /// assert_eq!(instance.working_directory, PathBuf::from("./"));
647    /// assert!(instance.sender.is_none());
648    /// assert!(instance.receiver.is_none());
649    /// assert!(instance.input_queue.is_empty());
650    /// ```
651    pub fn new(filename: impl Into<String>) -> Self {
652        Self {
653            pid: None,
654            filename: filename.into(),
655            arguments: Vec::new(),
656            working_directory: PathBuf::from("./"),
657            sender: None,
658            receiver: None,
659            input_queue: VecDeque::new(),
660            exit_callback: None,
661        }
662    }
663
664    /// Sets the arguments for the current instance by converting a vector of items
665    /// implementing `Into<String>` into a `Vec<String>`.
666    ///
667    /// # Parameters
668    /// - `args`: A `Vec` containing items that implement the `Into<String>` trait. These
669    ///   items will be converted into `String` and used to set the `arguments` field
670    ///   of the instance.
671    ///
672    /// # Returns
673    /// - `Self`: The current instance (`self`) after updating its `arguments` field.
674    ///
675    /// # Example
676    /// ```rust
677    /// let instance = MyStruct::new()
678    ///     .with_arguments(vec!["arg1", "arg2", String::from("arg3")]);
679    /// ```
680    ///
681    /// This method allows chaining, as it returns the updated instance
682    /// after setting the `arguments` field.
683    pub fn with_arguments(mut self, args: Vec<impl Into<String>>) -> Self {
684        self.arguments = args.into_iter().map(|arg| arg.into()).collect();
685        self
686    }
687
688    /// Adds an argument to the `arguments` vector and returns the modified instance.
689    ///
690    /// # Parameters
691    ///
692    /// * `arg` - A value that implements the `Into<String>` trait, which will be converted into a `String`
693    ///   and added to the `arguments` vector.
694    ///
695    /// # Returns
696    ///
697    /// Returns the modified instance of the implementing struct (`Self`) with the new argument added.
698    ///
699    /// # Example
700    ///
701    /// ```rust
702    /// let instance = SomeStruct::new().with_argument("example");
703    /// ```
704    ///
705    /// This adds the string `"example"` to the `arguments` vector of `SomeStruct`.
706    pub fn with_argument(mut self, arg: impl Into<String>) -> Self {
707        self.arguments.push(arg.into());
708        self
709    }
710
711    /// Sets the working directory for the instance and returns the modified instance.
712    ///
713    /// # Arguments
714    ///
715    /// * `dir` - A value that can be converted into a `PathBuf`, representing the desired working directory.
716    ///
717    /// # Returns
718    ///
719    /// The instance of the struct with the updated working directory.
720    ///
721    /// # Example
722    ///
723    /// ```rust
724    /// let instance = MyStruct::new()
725    ///     .with_working_directory("/path/to/directory");
726    /// ```
727    pub fn with_working_directory(mut self, dir: impl Into<PathBuf>) -> Self {
728        self.working_directory = dir.into();
729        self
730    }
731
732    pub fn process_exit_callback<F>(mut self, callback: F) -> Self
733    where
734        F: Fn(i32) + Send + Sync + 'static,
735    {
736        self.exit_callback = Some(Arc::new(callback));
737        self
738    }
739
740    /// Starts a new process based on the configuration stored in the struct, manages
741    /// its I/O streams asynchronously, and tracks its lifecycle in a shared process pool.
742    ///
743    /// # Returns
744    /// - `Ok<u32>`: Returns the process ID (PID) if the process starts successfully.
745    /// - `Err(std::io::Error)`: Returns an error if any part of the process start operation fails.
746    ///
747    /// # Process Configuration
748    /// - The process is launched using the executable specified in `self.filename`.
749    /// - Command-line arguments are passed via `self.arguments`.
750    /// - The process will run in the directory specified in `self.working_directory`.
751    ///
752    /// # Standard I/O Management
753    /// - The process's `stdin` is assigned a `tokio::sync::mpsc::channel` for communication.
754    /// - `stdout` and `stderr` are read asynchronously. Each line from these streams is sent
755    ///   to an `mpsc::channel`, where `stdout`/`stderr` data can be consumed.
756    ///
757    /// # Shared Process Pool
758    /// - The process metadata, including its PID and I/O channels, is stored in a global,
759    ///   thread-safe `PROCESS_POOL`.
760    /// - The process pool is managed using a `tokio::sync::Mutex` and stores processes in a
761    ///   `HashMap` with their PID as the key.
762    ///
763    /// # Asynchronous I/O
764    /// - A background task is spawned to write data from an internal input queue to the process's
765    ///   `stdin`.
766    /// - Another background task continuously reads and forwards `stdout` and `stderr` messages
767    ///   from the process.
768    /// - If the process exits or encounters an error, its PID and metadata are removed from the
769    ///   process pool.
770    ///
771    /// # Example Use Case
772    /// ```no_run
773    /// let mut my_process = MyProcess {
774    ///     filename: "my_program".to_string(),
775    ///     arguments: vec!["--arg1", "value1".to_string()],
776    ///     working_directory: "/path/to/dir".to_string(),
777    ///     pid: None,
778    ///     sender: None,
779    ///     receiver: None,
780    ///     input_queue: VecDeque::new(),
781    /// };
782    ///
783    /// let pid = my_process.start().await.unwrap();
784    /// println!("Started process with PID: {}", pid);
785    /// ```
786    ///
787    /// # Notes
788    /// - The process's `stdin`, `stdout`, and `stderr` are piped to capture and manage
789    ///   communication asynchronously.
790    /// - Each spawned background task is independently responsible for managing a specific
791    ///   stream or part of the process lifecycle.
792    /// - Errors during I/O operations or spawning the process are logged using the `log` crate
793    ///   macros (e.g., `error!`, `debug!`).
794    ///
795    /// # Potential Errors
796    /// - Failure to spawn the process (e.g., if `self.filename` is invalid or inaccessible).
797    /// - Errors during communication with the process's I/O streams (e.g., writing to a closed `stdin`).
798    /// - Mutex locking failure on the shared process pool due to an internal inconsistency.
799    pub async fn start(&mut self) -> Result<u32> {
800        let mut command = Command::new(&self.filename);
801        command.args(&self.arguments);
802
803        // Convert UNC path to regular Windows path if needed
804        let working_dir = if cfg!(windows) {
805            // Remove UNC prefix if present
806            let path_str = self.working_directory.to_string_lossy();
807            if path_str.starts_with(r"\\?\") {
808                PathBuf::from(&path_str[4..]) // Remove the \\?\ prefix
809            } else {
810                self.working_directory.clone()
811            }
812        } else {
813            self.working_directory.clone()
814        };
815
816        // Debug the working directory
817        debug!("tokio-interactive: filename = {}", self.filename);
818        debug!("tokio-interactive: arguments = {:?}", self.arguments);
819        debug!("tokio-interactive: working_directory = {:?}", working_dir);
820        debug!("tokio-interactive: working_directory exists = {}", working_dir.exists());
821        debug!("tokio-interactive: working_directory is_dir = {}", working_dir.is_dir());
822
823        // Check if working directory is absolute
824        debug!("tokio-interactive: working_directory is_absolute = {}", working_dir.is_absolute());
825
826        // Get current directory before setting
827        if let Ok(current_dir) = std::env::current_dir() {
828            debug!("tokio-interactive: current working directory before setting = {:?}", current_dir);
829        }
830
831        command.current_dir(working_dir);
832
833        command.stdin(std::process::Stdio::piped());
834        command.stdout(std::process::Stdio::piped());
835        command.stderr(std::process::Stdio::piped());
836
837        // Debug the final command that will be executed
838        debug!("tokio-interactive: Final command = {:?}", command);
839
840        let mut child = command.spawn()?;
841        let pid = child.id().unwrap_or(0);
842
843        debug!("tokio-interactive: Process spawned with PID = {}", pid);
844
845        let (stdin_sender, mut stdin_receiver) = tokio::sync::mpsc::channel::<String>(100);
846        let (stdout_sender, stdout_receiver) = tokio::sync::mpsc::channel::<String>(100);
847
848        if let Some(mut stdin) = child.stdin.take() {
849            tokio::spawn(async move {
850                while let Some(input) = stdin_receiver.recv().await {
851                    if let Err(e) = stdin.write_all(input.as_bytes()).await {
852                        error!("Failed to write to process stdin: {}", e);
853                        break;
854                    }
855                    if let Err(e) = stdin.write_all(b"\n").await {
856                        error!("Failed to write newline to process stdin: {}", e);
857                        break;
858                    }
859                    if let Err(e) = stdin.flush().await {
860                        error!("Failed to flush process stdin: {}", e);
861                        break;
862                    }
863                }
864            });
865        }
866
867        if let Some(stdout) = child.stdout.take() {
868            let stdout_sender_clone = stdout_sender.clone();
869            tokio::spawn(async move {
870                let mut reader = BufReader::new(stdout);
871                let mut line = String::new();
872                while let Ok(bytes_read) = reader.read_line(&mut line).await {
873                    if bytes_read == 0 {
874                        break;
875                    }
876                    if let Err(e) = stdout_sender_clone.send(line.trim_end().to_string()).await {
877                        error!("Failed to send stdout message: {}", e);
878                        break;
879                    }
880                    line.clear();
881                }
882            });
883        }
884
885        if let Some(stderr) = child.stderr.take() {
886            let stderr_sender = stdout_sender.clone();
887            tokio::spawn(async move {
888                let mut reader = BufReader::new(stderr);
889                let mut line = String::new();
890                while let Ok(bytes_read) = reader.read_line(&mut line).await {
891                    if bytes_read == 0 {
892                        break;
893                    }
894                    if let Err(e) = stderr_sender.send(format!("STDERR: {}", line.trim_end())).await {
895                        error!("Failed to send stderr message: {}", e);
896                        break;
897                    }
898                    line.clear();
899                }
900            });
901        }
902
903        let queue_pid = pid;
904        tokio::spawn(async move {
905            loop {
906                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
907
908                let process_pool = match PROCESS_POOL.get() {
909                    Some(pool) => pool,
910                    None => break,
911                };
912
913                let mut pool = process_pool.lock().await;
914
915                let process = match pool.get_mut(&queue_pid) {
916                    Some(p) => p,
917                    None => break,
918                };
919
920                while let Some(input) = process.input_queue.pop_front() {
921                    if let Some(sender) = &process.sender {
922                        if let Err(_) = sender.try_send(input.clone()) {
923                            process.input_queue.push_front(input);
924                            break;
925                        }
926                    } else {
927                        process.input_queue.clear();
928                        break;
929                    }
930                }
931
932                drop(pool);
933            }
934        });
935
936        let exit_callback = self.exit_callback.clone();
937        tokio::spawn(async move {
938            match child.wait().await {
939                Ok(exit_status) => {
940                    let exit_code = exit_status.code().unwrap_or(-1); // Use -1 for unknown exit codes
941                    debug!("Process {} exited with code: {}", pid, exit_code);
942
943                    if let Some(exit_callback) = exit_callback {
944                        exit_callback(exit_code);
945                    }
946                }
947                Err(e) => {
948                    error!("Process {} exited with error: {}", pid, e);
949
950                    // Still trigger callback with error code
951                    if let Some(exit_callback) = exit_callback {
952                        exit_callback(-1); // or some other error indicator
953                    }
954                }
955            }
956
957            // Cleanup
958            if let Some(process_pool) = PROCESS_POOL.get() {
959                let mut pool = process_pool.lock().await;
960                pool.remove(&pid);
961                debug!("Process {} has exited and been removed from the pool", pid);
962            }
963        });
964
965        let process_pool = PROCESS_POOL.get_or_init(|| Arc::new(Mutex::new(HashMap::new())));
966
967        let mut pool = process_pool.lock().await;
968        pool.insert(
969            pid,
970            Self {
971                pid: Some(pid),
972                filename: self.filename.clone(),
973                arguments: self.arguments.clone(),
974                working_directory: self.working_directory.clone(),
975                sender: Some(stdin_sender),
976                receiver: Some(stdout_receiver),
977                input_queue: VecDeque::new(),
978                exit_callback: self.exit_callback.clone(),
979            },
980        );
981
982        self.pid = Some(pid);
983
984        Ok(pid)
985    }
986
987    /// Asynchronously retrieves a handle to a process for a given process ID (PID).
988    ///
989    /// This function checks if a process with the specified PID exists in a shared
990    /// process pool. If the process is found, it returns an `Option` containing a
991    /// `ProcessHandle` for the process. Otherwise, it returns `None`.
992    ///
993    /// # Arguments
994    ///
995    /// * `pid` - A 32-bit unsigned integer representing the process ID of the
996    ///           desired process.
997    ///
998    /// # Returns
999    ///
1000    /// * `Some(ProcessHandle)` - If a process with the given PID is found in the
1001    ///                           process pool.
1002    /// * `None` - If the process does not exist in the process pool or if the
1003    ///            process pool is uninitialized.
1004    ///
1005    /// # Example
1006    ///
1007    /// ```rust
1008    /// if let Some(handle) = get_process_by_pid(12345).await {
1009    ///     println!("Process found with PID: {}", handle.pid);
1010    /// } else {
1011    ///     println!("Process not found.");
1012    /// }
1013    /// ```
1014    ///
1015    /// # Errors
1016    ///
1017    /// This function will return `None` if the global process pool (`PROCESS_POOL`)
1018    /// has not been initialized or is unavailable.
1019    ///
1020    /// # Note
1021    ///
1022    /// This is an asynchronous function and must be awaited to complete its operation.
1023    pub async fn get_process_by_pid(pid: u32) -> Option<ProcessHandle> {
1024        let process_pool = PROCESS_POOL.get()?;
1025        let pool = process_pool.lock().await;
1026        if pool.contains_key(&pid) { Some(ProcessHandle { pid }) } else { None }
1027    }
1028
1029    /// Asynchronously checks if a process identified by its `pid` is currently running.
1030    ///
1031    /// # Returns
1032    /// - `true` if the process with the stored `pid` is found in the `PROCESS_POOL`.
1033    /// - `false` if the stored `pid` is `None`, the `PROCESS_POOL` is not initialized,
1034    ///   or the `pid` does not exist in the `PROCESS_POOL`.
1035    ///
1036    /// The function first checks if the `pid` instance variable is set. If so, it attempts
1037    /// to access the global `PROCESS_POOL`. If `PROCESS_POOL` is initialized, it acquires
1038    /// a lock and checks if the `pid` exists in the pool.
1039    ///
1040    /// # Examples
1041    /// ```rust
1042    /// let result = my_instance.is_process_running().await;
1043    /// if result {
1044    ///     println!("The process is running.");
1045    /// } else {
1046    ///     println!("The process is not running.");
1047    /// }
1048    /// ```
1049    ///
1050    /// # Async Behavior
1051    /// This function acquires an asynchronous lock on the `PROCESS_POOL` to safely access
1052    /// its contents and will yield if the lock is currently held elsewhere.
1053    ///
1054    /// # Panics
1055    /// This function may panic if the async lock on the `PROCESS_POOL` fails unexpectedly.
1056    ///
1057    /// # Dependencies
1058    /// - `PROCESS_POOL` should be a globally accessible and lazily initialized structure
1059    ///   (e.g., using `once_cell` or similar patterns) that maintains a mapping of currently
1060    ///   active processes.
1061    pub async fn is_process_running(&self) -> bool {
1062        if let Some(pid) = self.pid {
1063            if let Some(process_pool) = PROCESS_POOL.get() {
1064                let pool = process_pool.lock().await;
1065                return pool.contains_key(&pid);
1066            }
1067        }
1068        false
1069    }
1070}