tokio_interactive/
lib.rs

1#![doc = include_str!("../README.md")]
2use anyhow::{Result, anyhow};
3use log::*;
4use serde::{Deserialize, Serialize};
5use std::collections::{HashMap, VecDeque};
6use std::path::PathBuf;
7use std::sync::{Arc, OnceLock};
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
9use tokio::process::Command;
10use tokio::sync::Mutex;
11use tokio::sync::mpsc::{Receiver, Sender}; // Change this line
12
13/// A static, lazily-initialized process pool used to manage asynchronous interactive processes.
14///
15/// This global variable utilizes the `OnceLock` to ensure it is initialized only once during
16/// the program's lifetime. It holds an `Arc<Mutex<HashMap<u32, AsynchronousInteractiveProcess>>>`
17/// to allow safe concurrent access and modification of the process pool.
18///
19/// - `OnceLock`: Provides thread-safe, one-time initialization of the process pool.
20/// - `Arc`: Ensures the `Mutex<HashMap<...>>` can be shared across threads safely.
21/// - `Mutex`: Protects the `HashMap` from concurrent modification, ensuring thread safety.
22/// - `HashMap<u32, AsynchronousInteractiveProcess>`: Maps process IDs (`u32`) to
23///   corresponding `AsynchronousInteractiveProcess` instances.
24///
25/// Usage:
26///
27/// The process pool is intended to store and manage asynchronous interactive processes by their IDs.
28/// Access needs to ensure proper locking with the `Mutex` guard to guarantee thread safety.
29///
30/// Example initialization:
31/// ```rust
32/// PROCESS_POOL.get_or_init(|| Arc::new(Mutex::new(HashMap::new())));
33/// ```
34///
35/// Example access:
36/// ```rust
37/// if let Some(pool) = PROCESS_POOL.get() {
38///     let mut guard = pool.lock().unwrap();
39///     guard.insert(process_id, new_process);
40/// }
41/// ```
42/// Ensure that any operations on the pool respect the locking mechanism provided by the `Mutex`.
43///
44/// Once initialized, `PROCESS_POOL` cannot be re-initialized or re-assigned.
45static PROCESS_POOL: OnceLock<Arc<Mutex<HashMap<u32, AsynchronousInteractiveProcess>>>> = OnceLock::new();
46
47/// Represents a handle to a process identified by its process ID (PID).
48///
49/// The `ProcessHandle` struct is used to encapsulate the process identifier (PID)
50/// of a running process. It is designed to be lightweight and can be cloned
51/// or debugged as required.
52///
53/// # Fields
54/// - `pid`: A `u32` value that represents the process ID of the targeted process.
55///
56/// # Traits
57/// - `Debug`: Allows instances of `ProcessHandle` to be formatted using the `{:?}` formatter
58///   for debugging purposes.
59/// - `Clone`: Allows the `ProcessHandle` to be cloned, creating a new instance with the
60///   same `pid`.
61///
62/// # Example
63/// ```
64/// let handle = ProcessHandle { pid: 1234 };
65/// println!("{:?}", handle); // Outputs: ProcessHandle { pid: 1234 }
66/// let cloned_handle = handle.clone();
67/// println!("{:?}", cloned_handle); // Outputs: ProcessHandle { pid: 1234 }
68/// ```
69#[derive(Debug, Clone)]
70pub struct ProcessHandle {
71    pid: u32,
72}
73
74/// `AsynchronousInteractiveProcess` is a structure that represents a non-blocking,
75/// interactive process. It provides metadata and mechanisms to interact with a
76/// process asynchronously using sender and receiver channels.
77///
78/// # Fields
79///
80/// * `pid` - An optional process ID (`pid`) of the interactive process. If the process
81///   is not yet started, this field will remain `None`.
82///
83/// * `filename` - The name of the executable file that represents the interactive process.
84///   This is required to identify and initiate the process.
85///
86/// * `arguments` - A vector containing the arguments to pass to the executable file
87///   when launching the process.
88///
89/// * `working_directory` - The directory where the interactive process will be executed.
90///   This is represented as a `PathBuf`.
91///
92/// * `sender` - An optional `Sender` channel used to send messages or data to the
93///   interactive process. This field is skipped during serialization and deserialization
94///   because it holds runtime-related data.
95///
96/// * `receiver` - An optional `Receiver` channel used to receive messages or data
97///   from the interactive process. This field is also skipped during serialization
98///   and deserialization for the same reasons as `sender`.
99///
100/// * `input_queue` - A deque (double-ended queue) that maintains an in-memory buffer
101///   of strings representing input for the interactive process. This field is skipped
102///   during serialization as it only contains transient runtime-related data.
103///
104/// # Trait Implementations
105///
106/// - `Debug`: Allows instances of this struct to be formatted and logged for debugging purposes.
107/// - `Serialize`: Makes the struct serializable, excluding fields marked with `#[serde(skip)]`.
108/// - `Deserialize`: Allows deserialization to create instances of this struct from serialized data.
109/// - `Default`: Provides a default implementation where optional fields are set to `None`,
110///   collections are empty, and `filename` is an empty string.
111///
112/// # Example
113/// ```rust
114/// use std::path::PathBuf;
115/// use std::collections::VecDeque;
116/// use serde::{Serialize, Deserialize};
117/// use crossbeam_channel::{Sender, Receiver};
118///
119/// let process = AsynchronousInteractiveProcess {
120///     pid: None,
121///     filename: "my_program".to_string(),
122///     arguments: vec!["--help".to_string()],
123///     working_directory: PathBuf::from("/path/to/dir"),
124///     sender: None,
125///     receiver: None,
126///     input_queue: VecDeque::new(),
127/// };
128///
129/// println!("{:?}", process);
130/// ```
131///
132/// This structure is ideal for scenarios where processes need to be controlled
133/// asynchronously with multiple input or output channels.
134#[derive(Debug, Serialize, Deserialize, Default)]
135pub struct AsynchronousInteractiveProcess {
136    pub pid: Option<u32>,
137    pub filename: String,
138    pub arguments: Vec<String>,
139    pub working_directory: PathBuf,
140    #[serde(skip)]
141    sender: Option<Sender<String>>,
142    #[serde(skip)]
143    receiver: Option<Receiver<String>>,
144    #[serde(skip)]
145    input_queue: VecDeque<String>,
146}
147
148impl ProcessHandle {
149    /// Receives an output message asynchronously from the process associated with the instance's `pid`
150    /// if available.
151    ///
152    /// This function attempts to fetch a message by accessing the process pool and checking for a
153    /// message in the associated process's receiver.
154    ///
155    /// The process follows these steps:
156    /// 1. Check if the global process pool (`PROCESS_POOL`) is initialized.
157    /// 2. Attempt to acquire the lock on the pool asynchronously.
158    /// 3. Look for the process associated with `self.pid`.
159    /// 4. If the process has a receiver channel, attempt to get a message using `try_recv`.
160    /// 5. If a message is found, return it as `Ok(Some(String))`.
161    /// 6. If no message is received, retry up to `MAX_RETRIES` times, with a delay of `RETRY_DELAY_MS`
162    ///    milliseconds between each retry.
163    ///
164    /// If no message is received after all retries, the function returns `Ok(None)`.
165    ///
166    /// # Returns
167    ///
168    /// * `Ok(Some(String))` - If a message is successfully received from the receiver channel.
169    /// * `Ok(None)` - If no message is received after all retries.
170    /// * `Err(Error)` - If an error occurs during the process, such as:
171    ///   - The process pool is not initialized
172    ///   - The process with the specified PID is not found
173    ///   - The receiver channel is not available
174    ///   - The receiver channel is disconnected
175    ///
176    /// # Behavior
177    ///
178    /// - This function uses `tokio::time::sleep` for introducing delays between retries and works
179    ///   asynchronously.
180    /// - The function uses constants `MAX_RETRIES` (10) and `RETRY_DELAY_MS` (10) to control the
181    ///   retry behavior.
182    /// - The function properly propagates errors that occur during the process.
183    ///
184    /// # Example
185    ///
186    /// ```rust
187    /// match instance.receive_output().await {
188    ///     Ok(Some(output)) => println!("Received output: {}", output),
189    ///     Ok(None) => println!("No output received."),
190    ///     Err(e) => eprintln!("Error receiving output: {}", e),
191    /// }
192    /// ```
193    ///
194    /// # Note
195    ///
196    /// - The function assumes the existence of a global process pool (`PROCESS_POOL`) which is safe
197    ///   to access concurrently using mutex-based locking.
198    /// - The function makes use of `tokio::sync::Mutex` to handle concurrent executions across
199    ///   asynchronous tasks.
200    /// Default timeout for receive_output in milliseconds
201    const DEFAULT_TIMEOUT_MS: u64 = 100;
202
203    /// Helper function to try receiving a message
204    async fn try_receive_message(
205        pid: u32,
206        pool: &mut tokio::sync::MutexGuard<'_, HashMap<u32, AsynchronousInteractiveProcess>>,
207    ) -> Result<Option<String>> {
208        if let Some(process) = pool.get_mut(&pid) {
209            if let Some(receiver) = &mut process.receiver {
210                match receiver.try_recv() {
211                    Ok(msg) => return Ok(Some(msg)),
212                    Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {}
213                    Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
214                        return Err(anyhow!("Channel disconnected for process {}", pid));
215                    }
216                }
217            } else {
218                return Err(anyhow!("Receiver not available for process {}", pid));
219            }
220        } else {
221            return Err(anyhow!("Process {} not found", pid));
222        }
223        Ok(None)
224    }
225
226    pub async fn receive_output(&self) -> Result<Option<String>> {
227        // Use the default timeout
228        self.receive_output_with_timeout(std::time::Duration::from_millis(Self::DEFAULT_TIMEOUT_MS)).await
229    }
230
231    /// Receives an output message asynchronously from the process associated with the instance's `pid`
232    /// if available, with a specified timeout.
233    ///
234    /// This function is similar to `receive_output()` but allows specifying a custom timeout duration
235    /// instead of using the default timeout.
236    ///
237    /// # Arguments
238    ///
239    /// * `timeout` - A `std::time::Duration` specifying how long to wait for a message before giving up.
240    ///
241    /// # Returns
242    ///
243    /// * `Ok(Some(String))` - If a message is successfully received from the receiver channel.
244    /// * `Ok(None)` - If no message is received before the timeout expires.
245    /// * `Err(Error)` - If an error occurs during the process, such as:
246    ///   - The process pool is not initialized
247    ///   - The process with the specified PID is not found
248    ///   - The receiver channel is not available
249    ///   - The receiver channel is disconnected
250    ///
251    /// # Example
252    ///
253    /// ```rust
254    /// // Wait for up to 5 seconds for a message
255    /// match instance.receive_output_with_timeout(std::time::Duration::from_secs(5)).await {
256    ///     Ok(Some(output)) => println!("Received output: {}", output),
257    ///     Ok(None) => println!("No output received within timeout."),
258    ///     Err(e) => eprintln!("Error receiving output: {}", e),
259    /// }
260    /// ```
261    pub async fn receive_output_with_timeout(&self, timeout: std::time::Duration) -> Result<Option<String>> {
262        // Define constants for retry parameters
263        const RETRY_DELAY_MS: u64 = 10;
264
265        if let Some(process_pool) = PROCESS_POOL.get() {
266            // Try to receive a message immediately
267            {
268                let mut pool = process_pool.lock().await;
269                if let Some(msg) = Self::try_receive_message(self.pid, &mut pool).await? {
270                    return Ok(Some(msg));
271                }
272            }
273
274            // If no message is available, retry with delay until timeout is reached
275            let start_time = std::time::Instant::now();
276            while start_time.elapsed() < timeout {
277                tokio::time::sleep(tokio::time::Duration::from_millis(RETRY_DELAY_MS)).await;
278
279                let mut pool = process_pool.lock().await;
280                if let Some(msg) = Self::try_receive_message(self.pid, &mut pool).await? {
281                    return Ok(Some(msg));
282                }
283            }
284
285            // No message after timeout
286            return Ok(None);
287        }
288
289        Err(anyhow!("Process pool not initialized"))
290    }
291
292    /// Sends the provided input to a process associated with this instance's PID asynchronously.
293    ///
294    /// # Arguments
295    ///
296    /// * `input` - An input of any type that can be converted into a `String`. This is the data
297    ///   that will be sent to the respective process's input queue.
298    ///
299    /// # Returns
300    ///
301    /// * `Ok(())` if the input was successfully sent or queued for sending.
302    /// * `Err` - Returns an error in the following cases:
303    ///     - If the process pool is not initialized.
304    ///     - If the process associated with this PID is not found.
305    ///     - If the process was not started or its sender channel is not available.
306    ///     - If the sender channel is closed and input cannot be sent.
307    ///
308    /// # Details
309    ///
310    /// This function retrieves the process associated with the current instance's `pid`
311    /// from a global process pool. If a valid process is found:
312    /// - The input is sent via an asynchronous channel to the process.
313    /// - If the channel is full, the input is queued for later sending.
314    /// - If the channel is closed, an error is returned.
315    ///
316    /// The function also ensures thread safety by acquiring a lock on the process pool before attempting
317    /// any operations related to the process.
318    ///
319    /// # Errors
320    ///
321    /// This function propagates several potential issues as errors:
322    /// - If the process pool is uninitialized (`PROCESS_POOL.get()` returns `None`).
323    /// - If the process associated with the PID is missing in the pool.
324    /// - If the sender channel was never initialized or is unavailable.
325    /// - If the sender channel is closed and no new messages can be sent.
326    ///
327    /// # Example
328    ///
329    /// ```rust
330    /// use anyhow::Result;
331    ///
332    /// #[tokio::main]
333    /// async fn main() -> Result<()> {
334    ///     let manager = ProcessManager::new(1); // Assumes a struct is managing process with ID 1.
335    ///     manager.send_input("Some input").await?;
336    ///     Ok(())
337    /// }
338    /// ```
339    ///
340    /// # Note
341    ///
342    /// This function expects a global process pool (`PROCESS_POOL`) to be properly initialized before being called.
343    /// Additionally, the associated process must have a valid `sender` channel to accept input.
344    pub async fn send_input(&self, input: impl Into<String>) -> Result<()> {
345        let input_str = input.into();
346        if let Some(process_pool) = PROCESS_POOL.get() {
347            let mut pool = process_pool.lock().await;
348            if let Some(process) = pool.get_mut(&self.pid) {
349                if let Some(sender) = &process.sender {
350                    match sender.try_send(input_str.clone()) {
351                        Ok(_) => Ok(()),
352                        Err(e) => match e {
353                            tokio::sync::mpsc::error::TrySendError::Full(_) => {
354                                process.input_queue.push_back(input_str);
355                                Ok(())
356                            }
357                            tokio::sync::mpsc::error::TrySendError::Closed(_) => {
358                                Err(anyhow!("Failed to send input: channel closed"))
359                            }
360                        },
361                    }
362                } else {
363                    Err(anyhow!("Process not started or sender not available"))
364                }
365            } else {
366                Err(anyhow!("Process not found"))
367            }
368        } else {
369            Err(anyhow!("Process pool not initialized"))
370        }
371    }
372
373    /// Checks if the process associated with the current instance is running.
374    ///
375    /// This method attempts to determine if a process with the `pid` of the current instance
376    /// exists in the global `PROCESS_POOL`.
377    ///
378    /// # Returns
379    /// * `true` - If the process with the associated `pid` is currently present in the global process pool.
380    /// * `false` - If the process is not found in the global process pool or if the pool is not initialized.
381    ///
382    /// # Async Behavior
383    /// This method is asynchronous because it acquires a lock on the `PROCESS_POOL`.
384    ///
385    /// # Notes
386    /// - The `PROCESS_POOL` must be initialized before calling this function.
387    ///   If `PROCESS_POOL` is not set, the function will return `false`.
388    /// - The `PROCESS_POOL` is expected to be a globally accessible, asynchronous, and thread-safe
389    ///   data structure that tracks active processes.
390    ///
391    /// # Example
392    /// ```rust
393    /// let result = instance.is_process_running().await;
394    /// if result {
395    ///     println!("Process is running.");
396    /// } else {
397    ///     println!("Process is not running.");
398    /// }
399    /// ```
400    pub async fn is_process_running(&self) -> bool {
401        if let Some(process_pool) = PROCESS_POOL.get() {
402            let pool = process_pool.lock().await;
403            return pool.contains_key(&self.pid);
404        }
405        false
406    }
407
408    /// Asynchronously terminates a process identified by its `pid`.
409    ///
410    /// This method performs the following operations tailored to the target operating system:
411    ///
412    /// - **Windows**: Opens a handle to the process using its process ID (`pid`) and forcefully terminates it
413    ///   using the `TerminateProcess` function from the WinAPI.
414    /// - **Linux**: Utilizes the `kill` system call with the `SIGKILL` signal to forcefully terminate the process.
415    ///
416    /// ## Platform-specific Notes:
417    ///
418    /// - On **Windows**, the process is identified and terminated using the `OpenProcess` and `TerminateProcess`
419    ///   functions from the WinAPI.
420    /// - On **Linux**, the `kill` system call is used with the signal `SIGKILL` (9) to ensure the process is terminated.
421    ///
422    /// # Errors
423    ///
424    /// - Returns an error if the process termination fails (on Linux) due to system call errors or invalid process IDs.
425    ///   On failure, the error contains details about the `pid` and the last OS error encountered.
426    ///
427    /// # Safety
428    ///
429    /// This method uses unsafe code blocks to interact with system APIs (`libc` on Linux, WinAPI on Windows). Ensure
430    /// that the provided `pid` corresponds to a valid process, and consider the implications of forcefully
431    /// terminating processes.
432    ///
433    /// # Example
434    ///
435    /// ```rust
436    /// let process_manager = SomeProcessManager::new(pid); // Example struct containing the pid
437    /// if let Err(e) = process_manager.kill().await {
438    ///     eprintln!("Failed to terminate process: {}", e);
439    /// }
440    /// ```
441    /// Attempts to gracefully shut down the process, falling back to forceful termination if needed.
442    ///
443    /// This method first tries to gracefully shut down the process by:
444    /// - On Linux: Sending a SIGTERM signal
445    /// - On Windows: Sending a WM_CLOSE message to the main window
446    ///
447    /// If the process doesn't exit within the specified timeout, it will forcefully terminate
448    /// the process using the `kill()` method.
449    ///
450    /// # Arguments
451    ///
452    /// * `timeout` - A `std::time::Duration` specifying how long to wait for the process to exit
453    ///   gracefully before forcefully terminating it.
454    ///
455    /// # Returns
456    ///
457    /// * `Ok(())` - If the process was successfully shut down (either gracefully or forcefully).
458    /// * `Err(Error)` - If an error occurred during the shutdown process.
459    ///
460    /// # Example
461    ///
462    /// ```rust
463    /// // Try to shut down gracefully, waiting up to 5 seconds before force killing
464    /// if let Err(e) = process.shutdown(std::time::Duration::from_secs(5)).await {
465    ///     eprintln!("Failed to shut down process: {}", e);
466    /// }
467    /// ```
468    pub async fn shutdown(&self, timeout: std::time::Duration) -> Result<()> {
469        // First, try to gracefully shut down the process
470        let graceful_shutdown_result = self.graceful_shutdown().await;
471
472        // If graceful shutdown failed or isn't implemented for this platform, log it but continue
473        if let Err(e) = &graceful_shutdown_result {
474            debug!("Graceful shutdown attempt failed: {}", e);
475            return self.kill().await;
476        }
477
478        // Wait for the process to exit for the specified timeout
479        let start_time = std::time::Instant::now();
480        while start_time.elapsed() < timeout {
481            if !self.is_process_running().await {
482                return Ok(());
483            }
484            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
485        }
486
487        // If we're here, the process didn't exit gracefully within the timeout
488        // Fall back to forceful termination
489        debug!("Process did not exit gracefully within timeout, forcing termination");
490        self.kill().await
491    }
492
493    /// Attempts to gracefully shut down the process without forceful termination.
494    ///
495    /// This method is platform-specific:
496    /// - On Linux: Sends a SIGTERM signal to request graceful termination
497    /// - On Windows: First tries to send a Ctrl+C event to the process using GenerateConsoleCtrlEvent.
498    ///   If that fails, it attempts to send an "exit" command to the process via stdin.
499    ///
500    /// # Returns
501    ///
502    /// * `Ok(())` - If the graceful shutdown signal was successfully sent.
503    /// * `Err(Error)` - If an error occurred while sending the graceful shutdown signal.
504    ///
505    /// # Notes
506    ///
507    /// - A successful return doesn't guarantee the process will actually exit.
508    ///   Use `shutdown()` with a timeout to ensure the process exits.
509    /// - On Windows, the GenerateConsoleCtrlEvent function works with process groups, not individual
510    ///   processes, so it may not work for all processes. The fallback "exit" command approach
511    ///   works for many command-line applications that accept such commands.
512    /// - For GUI applications on Windows, neither approach may work. In such cases, the `shutdown()`
513    ///   method will fall back to forceful termination after the timeout.
514    async fn graceful_shutdown(&self) -> Result<()> {
515        #[cfg(target_os = "windows")]
516        {
517            unsafe {
518                // First, try to send Ctrl+C event to the process
519                // CTRL_C_EVENT = 0, CTRL_BREAK_EVENT = 1
520                // Note: GenerateConsoleCtrlEvent works with process groups, not individual processes
521                // For console applications, we can try sending Ctrl+C to the process
522                let result = winapi::um::wincon::GenerateConsoleCtrlEvent(0, self.pid);
523                if result == 0 {
524                    return Err(anyhow!(
525                        "Failed to send Ctrl+C to process {}: {}",
526                        self.pid,
527                        std::io::Error::last_os_error()
528                    ));
529                }
530            }
531            return Ok(());
532        }
533
534        #[cfg(target_os = "linux")]
535        {
536            unsafe {
537                // Use the kill system call with SIGTERM (15) to request graceful termination
538                let result = libc::kill(self.pid as libc::pid_t, libc::SIGTERM);
539                if result != 0 {
540                    return Err(anyhow!(
541                        "Failed to send SIGTERM to process {}: {}",
542                        self.pid,
543                        std::io::Error::last_os_error()
544                    ));
545                }
546            }
547            return Ok(());
548        }
549
550        #[cfg(not(any(target_os = "windows", target_os = "linux")))]
551        {
552            return Err(anyhow!("Graceful shutdown not implemented for this platform"));
553        }
554    }
555
556    /// Forcefully terminates the process immediately.
557    ///
558    /// This method should be used as a last resort when a process needs to be terminated
559    /// immediately. For a more graceful approach, consider using `shutdown()` first.
560    ///
561    /// # Returns
562    ///
563    /// * `Ok(())` - If the process was successfully terminated.
564    /// * `Err(Error)` - If an error occurred during the termination process.
565    ///
566    /// # Example
567    ///
568    /// ```rust
569    /// if let Err(e) = process.kill().await {
570    ///     eprintln!("Failed to terminate process: {}", e);
571    /// }
572    /// ```
573    pub async fn kill(&self) -> Result<()> {
574        #[cfg(target_os = "windows")]
575        {
576            unsafe {
577                // PROCESS_TERMINATE (0x00010000) access right is required to terminate a process
578                let handle = winapi::um::processthreadsapi::OpenProcess(0x00010000, 0, self.pid);
579                if handle.is_null() {
580                    return Err(anyhow!("Failed to open process {}: {}", self.pid, std::io::Error::last_os_error()));
581                }
582
583                // Attempt to terminate the process
584                let result = winapi::um::processthreadsapi::TerminateProcess(handle, 0);
585
586                // Always close the handle to prevent resource leaks
587                let close_result = winapi::um::handleapi::CloseHandle(handle);
588
589                // Check if termination was successful
590                if result == 0 {
591                    return Err(anyhow!(
592                        "Failed to terminate process {}: {}",
593                        self.pid,
594                        std::io::Error::last_os_error()
595                    ));
596                }
597
598                // Check if handle was closed successfully
599                if close_result == 0 {
600                    warn!(
601                        "Failed to close process handle for process {}: {}",
602                        self.pid,
603                        std::io::Error::last_os_error()
604                    );
605                    // We don't return an error here as the process was terminated successfully
606                }
607            }
608        }
609        #[cfg(target_os = "linux")]
610        {
611            unsafe {
612                // Use the kill system call with SIGKILL (9) to forcefully terminate the process
613                let result = libc::kill(self.pid as libc::pid_t, libc::SIGKILL);
614                if result != 0 {
615                    return Err(anyhow!("Failed to kill process {}: {}", self.pid, std::io::Error::last_os_error()));
616                }
617            }
618        }
619
620        Ok(())
621    }
622}
623
624impl AsynchronousInteractiveProcess {
625    /// Creates a new instance of the struct with default values.
626    ///
627    /// # Arguments
628    ///
629    /// * `filename` - A value that can be converted into a `String`. This typically represents the
630    ///   name or path of the file associated with the instance.
631    ///
632    /// # Returns
633    ///
634    /// Returns a new instance of the struct populated with default fields:
635    /// - `pid`: `None` (indicating no process ID is associated yet).
636    /// - `filename`: The provided `filename` converted into a `String`.
637    /// - `arguments`: An empty vector, representing no initial arguments.
638    /// - `working_directory`: Set to the current directory (`"./"`).
639    /// - `sender`: `None`, indicating no sender is associated initially.
640    /// - `receiver`: `None`, indicating no receiver is associated initially.
641    /// - `input_queue`: An empty `VecDeque`, representing no items in the input queue.
642    ///
643    /// # Example
644    ///
645    /// ```
646    /// let instance = MyStruct::new("example.txt");
647    /// assert_eq!(instance.filename, "example.txt");
648    /// assert!(instance.pid.is_none());
649    /// assert!(instance.arguments.is_empty());
650    /// assert_eq!(instance.working_directory, PathBuf::from("./"));
651    /// assert!(instance.sender.is_none());
652    /// assert!(instance.receiver.is_none());
653    /// assert!(instance.input_queue.is_empty());
654    /// ```
655    pub fn new(filename: impl Into<String>) -> Self {
656        Self {
657            pid: None,
658            filename: filename.into(),
659            arguments: Vec::new(),
660            working_directory: PathBuf::from("./"),
661            sender: None,
662            receiver: None,
663            input_queue: VecDeque::new(),
664        }
665    }
666
667    /// Sets the arguments for the current instance by converting a vector of items
668    /// implementing `Into<String>` into a `Vec<String>`.
669    ///
670    /// # Parameters
671    /// - `args`: A `Vec` containing items that implement the `Into<String>` trait. These
672    ///   items will be converted into `String` and used to set the `arguments` field
673    ///   of the instance.
674    ///
675    /// # Returns
676    /// - `Self`: The current instance (`self`) after updating its `arguments` field.
677    ///
678    /// # Example
679    /// ```rust
680    /// let instance = MyStruct::new()
681    ///     .with_arguments(vec!["arg1", "arg2", String::from("arg3")]);
682    /// ```
683    ///
684    /// This method allows chaining, as it returns the updated instance
685    /// after setting the `arguments` field.
686    pub fn with_arguments(mut self, args: Vec<impl Into<String>>) -> Self {
687        self.arguments = args.into_iter().map(|arg| arg.into()).collect();
688        self
689    }
690
691    /// Adds an argument to the `arguments` vector and returns the modified instance.
692    ///
693    /// # Parameters
694    ///
695    /// * `arg` - A value that implements the `Into<String>` trait, which will be converted into a `String`
696    ///   and added to the `arguments` vector.
697    ///
698    /// # Returns
699    ///
700    /// Returns the modified instance of the implementing struct (`Self`) with the new argument added.
701    ///
702    /// # Example
703    ///
704    /// ```rust
705    /// let instance = SomeStruct::new().with_argument("example");
706    /// ```
707    ///
708    /// This adds the string `"example"` to the `arguments` vector of `SomeStruct`.
709    pub fn with_argument(mut self, arg: impl Into<String>) -> Self {
710        self.arguments.push(arg.into());
711        self
712    }
713
714    /// Sets the working directory for the instance and returns the modified instance.
715    ///
716    /// # Arguments
717    ///
718    /// * `dir` - A value that can be converted into a `PathBuf`, representing the desired working directory.
719    ///
720    /// # Returns
721    ///
722    /// The instance of the struct with the updated working directory.
723    ///
724    /// # Example
725    ///
726    /// ```rust
727    /// let instance = MyStruct::new()
728    ///     .with_working_directory("/path/to/directory");
729    /// ```
730    pub fn with_working_directory(mut self, dir: impl Into<PathBuf>) -> Self {
731        self.working_directory = dir.into();
732        self
733    }
734
735    /// Starts a new process based on the configuration stored in the struct, manages
736    /// its I/O streams asynchronously, and tracks its lifecycle in a shared process pool.
737    ///
738    /// # Returns
739    /// - `Ok<u32>`: Returns the process ID (PID) if the process starts successfully.
740    /// - `Err(std::io::Error)`: Returns an error if any part of the process start operation fails.
741    ///
742    /// # Process Configuration
743    /// - The process is launched using the executable specified in `self.filename`.
744    /// - Command-line arguments are passed via `self.arguments`.
745    /// - The process will run in the directory specified in `self.working_directory`.
746    ///
747    /// # Standard I/O Management
748    /// - The process's `stdin` is assigned a `tokio::sync::mpsc::channel` for communication.
749    /// - `stdout` and `stderr` are read asynchronously. Each line from these streams is sent
750    ///   to an `mpsc::channel`, where `stdout`/`stderr` data can be consumed.
751    ///
752    /// # Shared Process Pool
753    /// - The process metadata, including its PID and I/O channels, is stored in a global,
754    ///   thread-safe `PROCESS_POOL`.
755    /// - The process pool is managed using a `tokio::sync::Mutex` and stores processes in a
756    ///   `HashMap` with their PID as the key.
757    ///
758    /// # Asynchronous I/O
759    /// - A background task is spawned to write data from an internal input queue to the process's
760    ///   `stdin`.
761    /// - Another background task continuously reads and forwards `stdout` and `stderr` messages
762    ///   from the process.
763    /// - If the process exits or encounters an error, its PID and metadata are removed from the
764    ///   process pool.
765    ///
766    /// # Example Use Case
767    /// ```no_run
768    /// let mut my_process = MyProcess {
769    ///     filename: "my_program".to_string(),
770    ///     arguments: vec!["--arg1", "value1".to_string()],
771    ///     working_directory: "/path/to/dir".to_string(),
772    ///     pid: None,
773    ///     sender: None,
774    ///     receiver: None,
775    ///     input_queue: VecDeque::new(),
776    /// };
777    ///
778    /// let pid = my_process.start().await.unwrap();
779    /// println!("Started process with PID: {}", pid);
780    /// ```
781    ///
782    /// # Notes
783    /// - The process's `stdin`, `stdout`, and `stderr` are piped to capture and manage
784    ///   communication asynchronously.
785    /// - Each spawned background task is independently responsible for managing a specific
786    ///   stream or part of the process lifecycle.
787    /// - Errors during I/O operations or spawning the process are logged using the `log` crate
788    ///   macros (e.g., `error!`, `debug!`).
789    ///
790    /// # Potential Errors
791    /// - Failure to spawn the process (e.g., if `self.filename` is invalid or inaccessible).
792    /// - Errors during communication with the process's I/O streams (e.g., writing to a closed `stdin`).
793    /// - Mutex locking failure on the shared process pool due to an internal inconsistency.
794    pub async fn start(&mut self) -> Result<u32> {
795        let mut command = Command::new(&self.filename);
796        command.args(&self.arguments);
797        command.current_dir(&self.working_directory);
798
799        command.stdin(std::process::Stdio::piped());
800        command.stdout(std::process::Stdio::piped());
801        command.stderr(std::process::Stdio::piped());
802
803        let mut child = command.spawn()?;
804        let pid = child.id().unwrap_or(0);
805
806        let (stdin_sender, mut stdin_receiver) = tokio::sync::mpsc::channel::<String>(100);
807        let (stdout_sender, stdout_receiver) = tokio::sync::mpsc::channel::<String>(100);
808
809        if let Some(mut stdin) = child.stdin.take() {
810            tokio::spawn(async move {
811                while let Some(input) = stdin_receiver.recv().await {
812                    if let Err(e) = stdin.write_all(input.as_bytes()).await {
813                        error!("Failed to write to process stdin: {}", e);
814                        break;
815                    }
816                    if let Err(e) = stdin.write_all(b"\n").await {
817                        error!("Failed to write newline to process stdin: {}", e);
818                        break;
819                    }
820                    if let Err(e) = stdin.flush().await {
821                        error!("Failed to flush process stdin: {}", e);
822                        break;
823                    }
824                }
825            });
826        }
827
828        if let Some(stdout) = child.stdout.take() {
829            let stdout_sender_clone = stdout_sender.clone();
830            tokio::spawn(async move {
831                let mut reader = BufReader::new(stdout);
832                let mut line = String::new();
833                while let Ok(bytes_read) = reader.read_line(&mut line).await {
834                    if bytes_read == 0 {
835                        break;
836                    }
837                    if let Err(e) = stdout_sender_clone.send(line.trim_end().to_string()).await {
838                        error!("Failed to send stdout message: {}", e);
839                        break;
840                    }
841                    line.clear();
842                }
843            });
844        }
845
846        if let Some(stderr) = child.stderr.take() {
847            let stderr_sender = stdout_sender.clone();
848            tokio::spawn(async move {
849                let mut reader = BufReader::new(stderr);
850                let mut line = String::new();
851                while let Ok(bytes_read) = reader.read_line(&mut line).await {
852                    if bytes_read == 0 {
853                        break;
854                    }
855                    if let Err(e) = stderr_sender.send(format!("STDERR: {}", line.trim_end())).await {
856                        error!("Failed to send stderr message: {}", e);
857                        break;
858                    }
859                    line.clear();
860                }
861            });
862        }
863
864        let queue_pid = pid;
865        tokio::spawn(async move {
866            loop {
867                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
868
869                let process_pool = match PROCESS_POOL.get() {
870                    Some(pool) => pool,
871                    None => break,
872                };
873
874                let mut pool = process_pool.lock().await;
875
876                let process = match pool.get_mut(&queue_pid) {
877                    Some(p) => p,
878                    None => break,
879                };
880
881                while let Some(input) = process.input_queue.pop_front() {
882                    if let Some(sender) = &process.sender {
883                        if let Err(_) = sender.try_send(input.clone()) {
884                            process.input_queue.push_front(input);
885                            break;
886                        }
887                    } else {
888                        process.input_queue.clear();
889                        break;
890                    }
891                }
892
893                drop(pool);
894            }
895        });
896
897        tokio::spawn(async move {
898            if let Err(e) = child.wait().await {
899                error!("Process {} exited with error: {}", pid, e);
900            } else {
901                debug!("Process {} exited successfully", pid);
902            }
903
904            if let Some(process_pool) = PROCESS_POOL.get() {
905                let mut pool = process_pool.lock().await;
906                pool.remove(&pid);
907                debug!("Process {} has exited and been removed from the pool", pid);
908            }
909        });
910
911        let process_pool = PROCESS_POOL.get_or_init(|| Arc::new(Mutex::new(HashMap::new())));
912
913        let mut pool = process_pool.lock().await;
914        pool.insert(
915            pid,
916            Self {
917                pid: Some(pid),
918                filename: self.filename.clone(),
919                arguments: self.arguments.clone(),
920                working_directory: self.working_directory.clone(),
921                sender: Some(stdin_sender),
922                receiver: Some(stdout_receiver),
923                input_queue: VecDeque::new(),
924            },
925        );
926
927        self.pid = Some(pid);
928
929        Ok(pid)
930    }
931
932    /// Asynchronously retrieves a handle to a process for a given process ID (PID).
933    ///
934    /// This function checks if a process with the specified PID exists in a shared
935    /// process pool. If the process is found, it returns an `Option` containing a
936    /// `ProcessHandle` for the process. Otherwise, it returns `None`.
937    ///
938    /// # Arguments
939    ///
940    /// * `pid` - A 32-bit unsigned integer representing the process ID of the
941    ///           desired process.
942    ///
943    /// # Returns
944    ///
945    /// * `Some(ProcessHandle)` - If a process with the given PID is found in the
946    ///                           process pool.
947    /// * `None` - If the process does not exist in the process pool or if the
948    ///            process pool is uninitialized.
949    ///
950    /// # Example
951    ///
952    /// ```rust
953    /// if let Some(handle) = get_process_by_pid(12345).await {
954    ///     println!("Process found with PID: {}", handle.pid);
955    /// } else {
956    ///     println!("Process not found.");
957    /// }
958    /// ```
959    ///
960    /// # Errors
961    ///
962    /// This function will return `None` if the global process pool (`PROCESS_POOL`)
963    /// has not been initialized or is unavailable.
964    ///
965    /// # Note
966    ///
967    /// This is an asynchronous function and must be awaited to complete its operation.
968    pub async fn get_process_by_pid(pid: u32) -> Option<ProcessHandle> {
969        let process_pool = PROCESS_POOL.get()?;
970        let pool = process_pool.lock().await;
971        if pool.contains_key(&pid) { Some(ProcessHandle { pid }) } else { None }
972    }
973
974    /// Asynchronously checks if a process identified by its `pid` is currently running.
975    ///
976    /// # Returns
977    /// - `true` if the process with the stored `pid` is found in the `PROCESS_POOL`.
978    /// - `false` if the stored `pid` is `None`, the `PROCESS_POOL` is not initialized,
979    ///   or the `pid` does not exist in the `PROCESS_POOL`.
980    ///
981    /// The function first checks if the `pid` instance variable is set. If so, it attempts
982    /// to access the global `PROCESS_POOL`. If `PROCESS_POOL` is initialized, it acquires
983    /// a lock and checks if the `pid` exists in the pool.
984    ///
985    /// # Examples
986    /// ```rust
987    /// let result = my_instance.is_process_running().await;
988    /// if result {
989    ///     println!("The process is running.");
990    /// } else {
991    ///     println!("The process is not running.");
992    /// }
993    /// ```
994    ///
995    /// # Async Behavior
996    /// This function acquires an asynchronous lock on the `PROCESS_POOL` to safely access
997    /// its contents and will yield if the lock is currently held elsewhere.
998    ///
999    /// # Panics
1000    /// This function may panic if the async lock on the `PROCESS_POOL` fails unexpectedly.
1001    ///
1002    /// # Dependencies
1003    /// - `PROCESS_POOL` should be a globally accessible and lazily initialized structure
1004    ///   (e.g., using `once_cell` or similar patterns) that maintains a mapping of currently
1005    ///   active processes.
1006    pub async fn is_process_running(&self) -> bool {
1007        if let Some(pid) = self.pid {
1008            if let Some(process_pool) = PROCESS_POOL.get() {
1009                let pool = process_pool.lock().await;
1010                return pool.contains_key(&pid);
1011            }
1012        }
1013        false
1014    }
1015}