tokio_interactive/lib.rs
1#![doc = include_str!("../README.md")]
2use anyhow::{anyhow, Result};
3use log::*;
4use serde::{Deserialize, Serialize};
5use std::collections::{HashMap, VecDeque};
6use std::fmt::{Debug, Formatter};
7use std::path::PathBuf;
8use std::sync::{Arc, OnceLock};
9use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
10use tokio::process::Command;
11use tokio::sync::mpsc::{Receiver, Sender};
12use tokio::sync::Mutex;
13// Change this line
14
15/// A static, lazily-initialized process pool used to manage asynchronous interactive processes.
16///
17/// This global variable utilizes the `OnceLock` to ensure it is initialized only once during
18/// the program's lifetime. It holds an `Arc<Mutex<HashMap<u32, AsynchronousInteractiveProcess>>>`
19/// to allow safe concurrent access and modification of the process pool.
20///
21/// - `OnceLock`: Provides thread-safe, one-time initialization of the process pool.
22/// - `Arc`: Ensures the `Mutex<HashMap<...>>` can be shared across threads safely.
23/// - `Mutex`: Protects the `HashMap` from concurrent modification, ensuring thread safety.
24/// - `HashMap<u32, AsynchronousInteractiveProcess>`: Maps process IDs (`u32`) to
25/// corresponding `AsynchronousInteractiveProcess` instances.
26///
27/// Usage:
28///
29/// The process pool is intended to store and manage asynchronous interactive processes by their IDs.
30/// Access needs to ensure proper locking with the `Mutex` guard to guarantee thread safety.
31///
32/// Example initialization:
33/// ```rust
34/// PROCESS_POOL.get_or_init(|| Arc::new(Mutex::new(HashMap::new())));
35/// ```
36///
37/// Example access:
38/// ```rust
39/// if let Some(pool) = PROCESS_POOL.get() {
40/// let mut guard = pool.lock().unwrap();
41/// guard.insert(process_id, new_process);
42/// }
43/// ```
44/// Ensure that any operations on the pool respect the locking mechanism provided by the `Mutex`.
45///
46/// Once initialized, `PROCESS_POOL` cannot be re-initialized or re-assigned.
47static PROCESS_POOL: OnceLock<Arc<Mutex<HashMap<u32, AsynchronousInteractiveProcess>>>> = OnceLock::new();
48
49/// Represents a handle to a process identified by its process ID (PID).
50///
51/// The `ProcessHandle` struct is used to encapsulate the process identifier (PID)
52/// of a running process. It is designed to be lightweight and can be cloned
53/// or debugged as required.
54///
55/// # Fields
56/// - `pid`: A `u32` value that represents the process ID of the targeted process.
57///
58/// # Traits
59/// - `Debug`: Allows instances of `ProcessHandle` to be formatted using the `{:?}` formatter
60/// for debugging purposes.
61/// - `Clone`: Allows the `ProcessHandle` to be cloned, creating a new instance with the
62/// same `pid`.
63///
64/// # Example
65/// ```
66/// let handle = ProcessHandle { pid: 1234 };
67/// println!("{:?}", handle); // Outputs: ProcessHandle { pid: 1234 }
68/// let cloned_handle = handle.clone();
69/// println!("{:?}", cloned_handle); // Outputs: ProcessHandle { pid: 1234 }
70/// ```
71#[derive(Debug, Clone)]
72pub struct ProcessHandle {
73 pid: u32,
74}
75
76/// `AsynchronousInteractiveProcess` is a structure that represents a non-blocking,
77/// interactive process. It provides metadata and mechanisms to interact with a
78/// process asynchronously using sender and receiver channels.
79///
80/// # Fields
81///
82/// * `pid` - An optional process ID (`pid`) of the interactive process. If the process
83/// is not yet started, this field will remain `None`.
84///
85/// * `filename` - The name of the executable file that represents the interactive process.
86/// This is required to identify and initiate the process.
87///
88/// * `arguments` - A vector containing the arguments to pass to the executable file
89/// when launching the process.
90///
91/// * `working_directory` - The directory where the interactive process will be executed.
92/// This is represented as a `PathBuf`.
93///
94/// * `sender` - An optional `Sender` channel used to send messages or data to the
95/// interactive process. This field is skipped during serialization and deserialization
96/// because it holds runtime-related data.
97///
98/// * `receiver` - An optional `Receiver` channel used to receive messages or data
99/// from the interactive process. This field is also skipped during serialization
100/// and deserialization for the same reasons as `sender`.
101///
102/// * `input_queue` - A deque (double-ended queue) that maintains an in-memory buffer
103/// of strings representing input for the interactive process. This field is skipped
104/// during serialization as it only contains transient runtime-related data.
105///
106/// # Trait Implementations
107///
108/// - `Debug`: Allows instances of this struct to be formatted and logged for debugging purposes.
109/// - `Serialize`: Makes the struct serializable, excluding fields marked with `#[serde(skip)]`.
110/// - `Deserialize`: Allows deserialization to create instances of this struct from serialized data.
111/// - `Default`: Provides a default implementation where optional fields are set to `None`,
112/// collections are empty, and `filename` is an empty string.
113///
114/// # Example
115/// ```rust
116/// use std::path::PathBuf;
117/// use std::collections::VecDeque;
118/// use serde::{Serialize, Deserialize};
119/// use crossbeam_channel::{Sender, Receiver};
120///
121/// let process = AsynchronousInteractiveProcess {
122/// pid: None,
123/// filename: "my_program".to_string(),
124/// arguments: vec!["--help".to_string()],
125/// working_directory: PathBuf::from("/path/to/dir"),
126/// sender: None,
127/// receiver: None,
128/// input_queue: VecDeque::new(),
129/// };
130///
131/// println!("{:?}", process);
132/// ```
133///
134/// This structure is ideal for scenarios where processes need to be controlled
135/// asynchronously with multiple input or output channels.
136#[derive(Serialize, Deserialize, Default)]
137pub struct AsynchronousInteractiveProcess {
138 pub pid: Option<u32>,
139 pub filename: String,
140 pub arguments: Vec<String>,
141 pub working_directory: PathBuf,
142 #[serde(skip)]
143 sender: Option<Sender<String>>,
144 #[serde(skip)]
145 receiver: Option<Receiver<String>>,
146 #[serde(skip)]
147 input_queue: VecDeque<String>,
148 #[serde(skip)]
149 exit_callback: Option<Arc<dyn Fn(i32) + Send + Sync>>,
150}
151
152impl Debug for AsynchronousInteractiveProcess {
153 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154 write!(
155 f,
156 "AsynchronousInteractiveProcess {{ pid: {:?}, filename: {:?}, arguments: {:?}, working_directory: {:?} }}",
157 self.pid, self.filename, self.arguments, self.working_directory
158 )
159 }
160}
161
162impl ProcessHandle {
163 /// Receives an output message asynchronously from the process associated with the instance's `pid`
164 /// if available.
165 ///
166 /// This function attempts to fetch a message by accessing the process pool and checking for a
167 /// message in the associated process's receiver.
168 ///
169 /// The process follows these steps:
170 /// 1. Check if the global process pool (`PROCESS_POOL`) is initialized.
171 /// 2. Attempt to acquire the lock on the pool asynchronously.
172 /// 3. Look for the process associated with `self.pid`.
173 /// 4. If the process has a receiver channel, attempt to get a message using `try_recv`.
174 /// 5. If a message is found, return it as `Ok(Some(String))`.
175 /// 6. If no message is received, retry up to `MAX_RETRIES` times, with a delay of `RETRY_DELAY_MS`
176 /// milliseconds between each retry.
177 ///
178 /// If no message is received after all retries, the function returns `Ok(None)`.
179 ///
180 /// # Returns
181 ///
182 /// * `Ok(Some(String))` - If a message is successfully received from the receiver channel.
183 /// * `Ok(None)` - If no message is received after all retries.
184 /// * `Err(Error)` - If an error occurs during the process, such as:
185 /// - The process pool is not initialized
186 /// - The process with the specified PID is not found
187 /// - The receiver channel is not available
188 /// - The receiver channel is disconnected
189 ///
190 /// # Behavior
191 ///
192 /// - This function uses `tokio::time::sleep` for introducing delays between retries and works
193 /// asynchronously.
194 /// - The function uses constants `MAX_RETRIES` (10) and `RETRY_DELAY_MS` (10) to control the
195 /// retry behavior.
196 /// - The function properly propagates errors that occur during the process.
197 ///
198 /// # Example
199 ///
200 /// ```rust
201 /// match instance.receive_output().await {
202 /// Ok(Some(output)) => println!("Received output: {}", output),
203 /// Ok(None) => println!("No output received."),
204 /// Err(e) => eprintln!("Error receiving output: {}", e),
205 /// }
206 /// ```
207 ///
208 /// # Note
209 ///
210 /// - The function assumes the existence of a global process pool (`PROCESS_POOL`) which is safe
211 /// to access concurrently using mutex-based locking.
212 /// - The function makes use of `tokio::sync::Mutex` to handle concurrent executions across
213 /// asynchronous tasks.
214 /// Default timeout for receive_output in milliseconds
215 const DEFAULT_TIMEOUT_MS: u64 = 100;
216
217 /// Helper function to try receiving a message
218 async fn try_receive_message(
219 pid: u32,
220 pool: &mut tokio::sync::MutexGuard<'_, HashMap<u32, AsynchronousInteractiveProcess>>,
221 ) -> Result<Option<String>> {
222 if let Some(process) = pool.get_mut(&pid) {
223 if let Some(receiver) = &mut process.receiver {
224 match receiver.try_recv() {
225 Ok(msg) => return Ok(Some(msg)),
226 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {}
227 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
228 return Err(anyhow!("Channel disconnected for process {}", pid));
229 }
230 }
231 } else {
232 return Err(anyhow!("Receiver not available for process {}", pid));
233 }
234 } else {
235 return Err(anyhow!("Process {} not found", pid));
236 }
237 Ok(None)
238 }
239
240 pub async fn receive_output(&self) -> Result<Option<String>> {
241 // Use the default timeout
242 self.receive_output_with_timeout(std::time::Duration::from_millis(Self::DEFAULT_TIMEOUT_MS)).await
243 }
244
245 /// Receives an output message asynchronously from the process associated with the instance's `pid`
246 /// if available, with a specified timeout.
247 ///
248 /// This function is similar to `receive_output()` but allows specifying a custom timeout duration
249 /// instead of using the default timeout.
250 ///
251 /// # Arguments
252 ///
253 /// * `timeout` - A `std::time::Duration` specifying how long to wait for a message before giving up.
254 ///
255 /// # Returns
256 ///
257 /// * `Ok(Some(String))` - If a message is successfully received from the receiver channel.
258 /// * `Ok(None)` - If no message is received before the timeout expires.
259 /// * `Err(Error)` - If an error occurs during the process, such as:
260 /// - The process pool is not initialized
261 /// - The process with the specified PID is not found
262 /// - The receiver channel is not available
263 /// - The receiver channel is disconnected
264 ///
265 /// # Example
266 ///
267 /// ```rust
268 /// // Wait for up to 5 seconds for a message
269 /// match instance.receive_output_with_timeout(std::time::Duration::from_secs(5)).await {
270 /// Ok(Some(output)) => println!("Received output: {}", output),
271 /// Ok(None) => println!("No output received within timeout."),
272 /// Err(e) => eprintln!("Error receiving output: {}", e),
273 /// }
274 /// ```
275 pub async fn receive_output_with_timeout(&self, timeout: std::time::Duration) -> Result<Option<String>> {
276 // Define constants for retry parameters
277 const RETRY_DELAY_MS: u64 = 10;
278
279 if let Some(process_pool) = PROCESS_POOL.get() {
280 // Try to receive a message immediately
281 {
282 let mut pool = process_pool.lock().await;
283 if let Some(msg) = Self::try_receive_message(self.pid, &mut pool).await? {
284 return Ok(Some(msg));
285 }
286 }
287
288 // If no message is available, retry with delay until timeout is reached
289 let start_time = std::time::Instant::now();
290 while start_time.elapsed() < timeout {
291 tokio::time::sleep(tokio::time::Duration::from_millis(RETRY_DELAY_MS)).await;
292
293 let mut pool = process_pool.lock().await;
294 if let Some(msg) = Self::try_receive_message(self.pid, &mut pool).await? {
295 return Ok(Some(msg));
296 }
297 }
298
299 // No message after timeout
300 return Ok(None);
301 }
302
303 Err(anyhow!("Process pool not initialized"))
304 }
305
306 /// Sends the provided input to a process associated with this instance's PID asynchronously.
307 ///
308 /// # Arguments
309 ///
310 /// * `input` - An input of any type that can be converted into a `String`. This is the data
311 /// that will be sent to the respective process's input queue.
312 ///
313 /// # Returns
314 ///
315 /// * `Ok(())` if the input was successfully sent or queued for sending.
316 /// * `Err` - Returns an error in the following cases:
317 /// - If the process pool is not initialized.
318 /// - If the process associated with this PID is not found.
319 /// - If the process was not started or its sender channel is not available.
320 /// - If the sender channel is closed and input cannot be sent.
321 ///
322 /// # Details
323 ///
324 /// This function retrieves the process associated with the current instance's `pid`
325 /// from a global process pool. If a valid process is found:
326 /// - The input is sent via an asynchronous channel to the process.
327 /// - If the channel is full, the input is queued for later sending.
328 /// - If the channel is closed, an error is returned.
329 ///
330 /// The function also ensures thread safety by acquiring a lock on the process pool before attempting
331 /// any operations related to the process.
332 ///
333 /// # Errors
334 ///
335 /// This function propagates several potential issues as errors:
336 /// - If the process pool is uninitialized (`PROCESS_POOL.get()` returns `None`).
337 /// - If the process associated with the PID is missing in the pool.
338 /// - If the sender channel was never initialized or is unavailable.
339 /// - If the sender channel is closed and no new messages can be sent.
340 ///
341 /// # Example
342 ///
343 /// ```rust
344 /// use anyhow::Result;
345 ///
346 /// #[tokio::main]
347 /// async fn main() -> Result<()> {
348 /// let manager = ProcessManager::new(1); // Assumes a struct is managing process with ID 1.
349 /// manager.send_input("Some input").await?;
350 /// Ok(())
351 /// }
352 /// ```
353 ///
354 /// # Note
355 ///
356 /// This function expects a global process pool (`PROCESS_POOL`) to be properly initialized before being called.
357 /// Additionally, the associated process must have a valid `sender` channel to accept input.
358 pub async fn send_input(&self, input: impl Into<String>) -> Result<()> {
359 let input_str = input.into();
360 if let Some(process_pool) = PROCESS_POOL.get() {
361 let mut pool = process_pool.lock().await;
362 if let Some(process) = pool.get_mut(&self.pid) {
363 if let Some(sender) = &process.sender {
364 match sender.try_send(input_str.clone()) {
365 Ok(_) => Ok(()),
366 Err(e) => match e {
367 tokio::sync::mpsc::error::TrySendError::Full(_) => {
368 process.input_queue.push_back(input_str);
369 Ok(())
370 }
371 tokio::sync::mpsc::error::TrySendError::Closed(_) => Err(anyhow!("Failed to send input: channel closed")),
372 },
373 }
374 } else {
375 Err(anyhow!("Process not started or sender not available"))
376 }
377 } else {
378 Err(anyhow!("Process not found"))
379 }
380 } else {
381 Err(anyhow!("Process pool not initialized"))
382 }
383 }
384
385 /// Checks if the process associated with the current instance is running.
386 ///
387 /// This method attempts to determine if a process with the `pid` of the current instance
388 /// exists in the global `PROCESS_POOL`.
389 ///
390 /// # Returns
391 /// * `true` - If the process with the associated `pid` is currently present in the global process pool.
392 /// * `false` - If the process is not found in the global process pool or if the pool is not initialized.
393 ///
394 /// # Async Behavior
395 /// This method is asynchronous because it acquires a lock on the `PROCESS_POOL`.
396 ///
397 /// # Notes
398 /// - The `PROCESS_POOL` must be initialized before calling this function.
399 /// If `PROCESS_POOL` is not set, the function will return `false`.
400 /// - The `PROCESS_POOL` is expected to be a globally accessible, asynchronous, and thread-safe
401 /// data structure that tracks active processes.
402 ///
403 /// # Example
404 /// ```rust
405 /// let result = instance.is_process_running().await;
406 /// if result {
407 /// println!("Process is running.");
408 /// } else {
409 /// println!("Process is not running.");
410 /// }
411 /// ```
412 pub async fn is_process_running(&self) -> bool {
413 if let Some(process_pool) = PROCESS_POOL.get() {
414 let pool = process_pool.lock().await;
415 return pool.contains_key(&self.pid);
416 }
417 false
418 }
419
420 /// Asynchronously terminates a process identified by its `pid`.
421 ///
422 /// This method performs the following operations tailored to the target operating system:
423 ///
424 /// - **Windows**: Opens a handle to the process using its process ID (`pid`) and forcefully terminates it
425 /// using the `TerminateProcess` function from the WinAPI.
426 /// - **Linux**: Utilizes the `kill` system call with the `SIGKILL` signal to forcefully terminate the process.
427 ///
428 /// ## Platform-specific Notes:
429 ///
430 /// - On **Windows**, the process is identified and terminated using the `OpenProcess` and `TerminateProcess`
431 /// functions from the WinAPI.
432 /// - On **Linux**, the `kill` system call is used with the signal `SIGKILL` (9) to ensure the process is terminated.
433 ///
434 /// # Errors
435 ///
436 /// - Returns an error if the process termination fails (on Linux) due to system call errors or invalid process IDs.
437 /// On failure, the error contains details about the `pid` and the last OS error encountered.
438 ///
439 /// # Safety
440 ///
441 /// This method uses unsafe code blocks to interact with system APIs (`libc` on Linux, WinAPI on Windows). Ensure
442 /// that the provided `pid` corresponds to a valid process, and consider the implications of forcefully
443 /// terminating processes.
444 ///
445 /// # Example
446 ///
447 /// ```rust
448 /// let process_manager = SomeProcessManager::new(pid); // Example struct containing the pid
449 /// if let Err(e) = process_manager.kill().await {
450 /// eprintln!("Failed to terminate process: {}", e);
451 /// }
452 /// ```
453 /// Attempts to gracefully shut down the process, falling back to forceful termination if needed.
454 ///
455 /// This method first tries to gracefully shut down the process by:
456 /// - On Linux: Sending a SIGTERM signal
457 /// - On Windows: Sending a WM_CLOSE message to the main window
458 ///
459 /// If the process doesn't exit within the specified timeout, it will forcefully terminate
460 /// the process using the `kill()` method.
461 ///
462 /// # Arguments
463 ///
464 /// * `timeout` - A `std::time::Duration` specifying how long to wait for the process to exit
465 /// gracefully before forcefully terminating it.
466 ///
467 /// # Returns
468 ///
469 /// * `Ok(())` - If the process was successfully shut down (either gracefully or forcefully).
470 /// * `Err(Error)` - If an error occurred during the shutdown process.
471 ///
472 /// # Example
473 ///
474 /// ```rust
475 /// // Try to shut down gracefully, waiting up to 5 seconds before force killing
476 /// if let Err(e) = process.shutdown(std::time::Duration::from_secs(5)).await {
477 /// eprintln!("Failed to shut down process: {}", e);
478 /// }
479 /// ```
480 pub async fn shutdown(&self, timeout: std::time::Duration) -> Result<()> {
481 // First, try to gracefully shut down the process
482 let graceful_shutdown_result = self.graceful_shutdown().await;
483
484 // If graceful shutdown failed or isn't implemented for this platform, log it but continue
485 if let Err(e) = &graceful_shutdown_result {
486 debug!("Graceful shutdown attempt failed: {}", e);
487 return self.kill().await;
488 }
489
490 // Wait for the process to exit for the specified timeout
491 let start_time = std::time::Instant::now();
492 while start_time.elapsed() < timeout {
493 if !self.is_process_running().await {
494 return Ok(());
495 }
496 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
497 }
498
499 // If we're here, the process didn't exit gracefully within the timeout
500 // Fall back to forceful termination
501 debug!("Process did not exit gracefully within timeout, forcing termination");
502 self.kill().await
503 }
504
505 /// Attempts to gracefully shut down the process without forceful termination.
506 ///
507 /// This method is platform-specific:
508 /// - On Linux: Sends a SIGTERM signal to request graceful termination
509 /// - On Windows: First tries to send a Ctrl+C event to the process using GenerateConsoleCtrlEvent.
510 /// If that fails, it attempts to send an "exit" command to the process via stdin.
511 ///
512 /// # Returns
513 ///
514 /// * `Ok(())` - If the graceful shutdown signal was successfully sent.
515 /// * `Err(Error)` - If an error occurred while sending the graceful shutdown signal.
516 ///
517 /// # Notes
518 ///
519 /// - A successful return doesn't guarantee the process will actually exit.
520 /// Use `shutdown()` with a timeout to ensure the process exits.
521 /// - On Windows, the GenerateConsoleCtrlEvent function works with process groups, not individual
522 /// processes, so it may not work for all processes. The fallback "exit" command approach
523 /// works for many command-line applications that accept such commands.
524 /// - For GUI applications on Windows, neither approach may work. In such cases, the `shutdown()`
525 /// method will fall back to forceful termination after the timeout.
526 async fn graceful_shutdown(&self) -> Result<()> {
527 #[cfg(target_os = "windows")]
528 {
529 unsafe {
530 // First, try to send Ctrl+C event to the process
531 // CTRL_C_EVENT = 0, CTRL_BREAK_EVENT = 1
532 // Note: GenerateConsoleCtrlEvent works with process groups, not individual processes
533 // For console applications, we can try sending Ctrl+C to the process
534 let result = winapi::um::wincon::GenerateConsoleCtrlEvent(0, self.pid);
535 if result == 0 {
536 return Err(anyhow!("Failed to send Ctrl+C to process {}: {}", self.pid, std::io::Error::last_os_error()));
537 }
538 }
539 return Ok(());
540 }
541
542 #[cfg(target_os = "linux")]
543 {
544 unsafe {
545 // Use the kill system call with SIGTERM (15) to request graceful termination
546 let result = libc::kill(self.pid as libc::pid_t, libc::SIGTERM);
547 if result != 0 {
548 return Err(anyhow!("Failed to send SIGTERM to process {}: {}", self.pid, std::io::Error::last_os_error()));
549 }
550 }
551 return Ok(());
552 }
553
554 #[cfg(not(any(target_os = "windows", target_os = "linux")))]
555 {
556 return Err(anyhow!("Graceful shutdown not implemented for this platform"));
557 }
558 }
559
560 /// Forcefully terminates the process immediately.
561 ///
562 /// This method should be used as a last resort when a process needs to be terminated
563 /// immediately. For a more graceful approach, consider using `shutdown()` first.
564 ///
565 /// # Returns
566 ///
567 /// * `Ok(())` - If the process was successfully terminated.
568 /// * `Err(Error)` - If an error occurred during the termination process.
569 ///
570 /// # Example
571 ///
572 /// ```rust
573 /// if let Err(e) = process.kill().await {
574 /// eprintln!("Failed to terminate process: {}", e);
575 /// }
576 /// ```
577 pub async fn kill(&self) -> Result<()> {
578 #[cfg(target_os = "windows")]
579 {
580 unsafe {
581 // PROCESS_TERMINATE (0x00010000) access right is required to terminate a process
582 let handle = winapi::um::processthreadsapi::OpenProcess(0x00010000, 0, self.pid);
583 if handle.is_null() {
584 return Err(anyhow!("Failed to open process {}: {}", self.pid, std::io::Error::last_os_error()));
585 }
586
587 // Attempt to terminate the process
588 let result = winapi::um::processthreadsapi::TerminateProcess(handle, 0);
589
590 // Always close the handle to prevent resource leaks
591 let close_result = winapi::um::handleapi::CloseHandle(handle);
592
593 // Check if termination was successful
594 if result == 0 {
595 return Err(anyhow!("Failed to terminate process {}: {}", self.pid, std::io::Error::last_os_error()));
596 }
597
598 // Check if handle was closed successfully
599 if close_result == 0 {
600 warn!("Failed to close process handle for process {}: {}", self.pid, std::io::Error::last_os_error());
601 // We don't return an error here as the process was terminated successfully
602 }
603 }
604 }
605 #[cfg(target_os = "linux")]
606 {
607 unsafe {
608 // Use the kill system call with SIGKILL (9) to forcefully terminate the process
609 let result = libc::kill(self.pid as libc::pid_t, libc::SIGKILL);
610 if result != 0 {
611 return Err(anyhow!("Failed to kill process {}: {}", self.pid, std::io::Error::last_os_error()));
612 }
613 }
614 }
615
616 Ok(())
617 }
618}
619
620impl AsynchronousInteractiveProcess {
621 /// Creates a new instance of the struct with default values.
622 ///
623 /// # Arguments
624 ///
625 /// * `filename` - A value that can be converted into a `String`. This typically represents the
626 /// name or path of the file associated with the instance.
627 ///
628 /// # Returns
629 ///
630 /// Returns a new instance of the struct populated with default fields:
631 /// - `pid`: `None` (indicating no process ID is associated yet).
632 /// - `filename`: The provided `filename` converted into a `String`.
633 /// - `arguments`: An empty vector, representing no initial arguments.
634 /// - `working_directory`: Set to the current directory (`"./"`).
635 /// - `sender`: `None`, indicating no sender is associated initially.
636 /// - `receiver`: `None`, indicating no receiver is associated initially.
637 /// - `input_queue`: An empty `VecDeque`, representing no items in the input queue.
638 ///
639 /// # Example
640 ///
641 /// ```
642 /// let instance = MyStruct::new("example.txt");
643 /// assert_eq!(instance.filename, "example.txt");
644 /// assert!(instance.pid.is_none());
645 /// assert!(instance.arguments.is_empty());
646 /// assert_eq!(instance.working_directory, PathBuf::from("./"));
647 /// assert!(instance.sender.is_none());
648 /// assert!(instance.receiver.is_none());
649 /// assert!(instance.input_queue.is_empty());
650 /// ```
651 pub fn new(filename: impl Into<String>) -> Self {
652 Self {
653 pid: None,
654 filename: filename.into(),
655 arguments: Vec::new(),
656 working_directory: PathBuf::from("./"),
657 sender: None,
658 receiver: None,
659 input_queue: VecDeque::new(),
660 exit_callback: None,
661 }
662 }
663
664 /// Sets the arguments for the current instance by converting a vector of items
665 /// implementing `Into<String>` into a `Vec<String>`.
666 ///
667 /// # Parameters
668 /// - `args`: A `Vec` containing items that implement the `Into<String>` trait. These
669 /// items will be converted into `String` and used to set the `arguments` field
670 /// of the instance.
671 ///
672 /// # Returns
673 /// - `Self`: The current instance (`self`) after updating its `arguments` field.
674 ///
675 /// # Example
676 /// ```rust
677 /// let instance = MyStruct::new()
678 /// .with_arguments(vec!["arg1", "arg2", String::from("arg3")]);
679 /// ```
680 ///
681 /// This method allows chaining, as it returns the updated instance
682 /// after setting the `arguments` field.
683 pub fn with_arguments(mut self, args: Vec<impl Into<String>>) -> Self {
684 self.arguments = args.into_iter().map(|arg| arg.into()).collect();
685 self
686 }
687
688 /// Adds an argument to the `arguments` vector and returns the modified instance.
689 ///
690 /// # Parameters
691 ///
692 /// * `arg` - A value that implements the `Into<String>` trait, which will be converted into a `String`
693 /// and added to the `arguments` vector.
694 ///
695 /// # Returns
696 ///
697 /// Returns the modified instance of the implementing struct (`Self`) with the new argument added.
698 ///
699 /// # Example
700 ///
701 /// ```rust
702 /// let instance = SomeStruct::new().with_argument("example");
703 /// ```
704 ///
705 /// This adds the string `"example"` to the `arguments` vector of `SomeStruct`.
706 pub fn with_argument(mut self, arg: impl Into<String>) -> Self {
707 self.arguments.push(arg.into());
708 self
709 }
710
711 /// Sets the working directory for the instance and returns the modified instance.
712 ///
713 /// # Arguments
714 ///
715 /// * `dir` - A value that can be converted into a `PathBuf`, representing the desired working directory.
716 ///
717 /// # Returns
718 ///
719 /// The instance of the struct with the updated working directory.
720 ///
721 /// # Example
722 ///
723 /// ```rust
724 /// let instance = MyStruct::new()
725 /// .with_working_directory("/path/to/directory");
726 /// ```
727 pub fn with_working_directory(mut self, dir: impl Into<PathBuf>) -> Self {
728 self.working_directory = dir.into();
729 self
730 }
731
732 pub fn process_exit_callback<F>(mut self, callback: F) -> Self
733 where
734 F: Fn(i32) + Send + Sync + 'static,
735 {
736 self.exit_callback = Some(Arc::new(callback));
737 self
738 }
739
740 /// Starts a new process based on the configuration stored in the struct, manages
741 /// its I/O streams asynchronously, and tracks its lifecycle in a shared process pool.
742 ///
743 /// # Returns
744 /// - `Ok<u32>`: Returns the process ID (PID) if the process starts successfully.
745 /// - `Err(std::io::Error)`: Returns an error if any part of the process start operation fails.
746 ///
747 /// # Process Configuration
748 /// - The process is launched using the executable specified in `self.filename`.
749 /// - Command-line arguments are passed via `self.arguments`.
750 /// - The process will run in the directory specified in `self.working_directory`.
751 ///
752 /// # Standard I/O Management
753 /// - The process's `stdin` is assigned a `tokio::sync::mpsc::channel` for communication.
754 /// - `stdout` and `stderr` are read asynchronously. Each line from these streams is sent
755 /// to an `mpsc::channel`, where `stdout`/`stderr` data can be consumed.
756 ///
757 /// # Shared Process Pool
758 /// - The process metadata, including its PID and I/O channels, is stored in a global,
759 /// thread-safe `PROCESS_POOL`.
760 /// - The process pool is managed using a `tokio::sync::Mutex` and stores processes in a
761 /// `HashMap` with their PID as the key.
762 ///
763 /// # Asynchronous I/O
764 /// - A background task is spawned to write data from an internal input queue to the process's
765 /// `stdin`.
766 /// - Another background task continuously reads and forwards `stdout` and `stderr` messages
767 /// from the process.
768 /// - If the process exits or encounters an error, its PID and metadata are removed from the
769 /// process pool.
770 ///
771 /// # Example Use Case
772 /// ```no_run
773 /// let mut my_process = MyProcess {
774 /// filename: "my_program".to_string(),
775 /// arguments: vec!["--arg1", "value1".to_string()],
776 /// working_directory: "/path/to/dir".to_string(),
777 /// pid: None,
778 /// sender: None,
779 /// receiver: None,
780 /// input_queue: VecDeque::new(),
781 /// };
782 ///
783 /// let pid = my_process.start().await.unwrap();
784 /// println!("Started process with PID: {}", pid);
785 /// ```
786 ///
787 /// # Notes
788 /// - The process's `stdin`, `stdout`, and `stderr` are piped to capture and manage
789 /// communication asynchronously.
790 /// - Each spawned background task is independently responsible for managing a specific
791 /// stream or part of the process lifecycle.
792 /// - Errors during I/O operations or spawning the process are logged using the `log` crate
793 /// macros (e.g., `error!`, `debug!`).
794 ///
795 /// # Potential Errors
796 /// - Failure to spawn the process (e.g., if `self.filename` is invalid or inaccessible).
797 /// - Errors during communication with the process's I/O streams (e.g., writing to a closed `stdin`).
798 /// - Mutex locking failure on the shared process pool due to an internal inconsistency.
799 pub async fn start(&mut self) -> Result<u32> {
800 let mut command = Command::new(&self.filename);
801 command.args(&self.arguments);
802
803 // Convert UNC path to regular Windows path if needed
804 let working_dir = if cfg!(windows) {
805 // Remove UNC prefix if present
806 let path_str = self.working_directory.to_string_lossy();
807 if path_str.starts_with(r"\\?\") {
808 PathBuf::from(&path_str[4..]) // Remove the \\?\ prefix
809 } else {
810 self.working_directory.clone()
811 }
812 } else {
813 self.working_directory.clone()
814 };
815
816 // Debug the working directory
817 debug!("tokio-interactive: filename = {}", self.filename);
818 debug!("tokio-interactive: arguments = {:?}", self.arguments);
819 debug!("tokio-interactive: working_directory = {:?}", working_dir);
820 debug!("tokio-interactive: working_directory exists = {}", working_dir.exists());
821 debug!("tokio-interactive: working_directory is_dir = {}", working_dir.is_dir());
822
823 // Check if working directory is absolute
824 debug!("tokio-interactive: working_directory is_absolute = {}", working_dir.is_absolute());
825
826 // Get current directory before setting
827 if let Ok(current_dir) = std::env::current_dir() {
828 debug!("tokio-interactive: current working directory before setting = {:?}", current_dir);
829 }
830
831 command.current_dir(working_dir);
832
833 command.stdin(std::process::Stdio::piped());
834 command.stdout(std::process::Stdio::piped());
835 command.stderr(std::process::Stdio::piped());
836
837 // Debug the final command that will be executed
838 debug!("tokio-interactive: Final command = {:?}", command);
839
840 let mut child = command.spawn()?;
841 let pid = child.id().unwrap_or(0);
842
843 debug!("tokio-interactive: Process spawned with PID = {}", pid);
844
845 let (stdin_sender, mut stdin_receiver) = tokio::sync::mpsc::channel::<String>(100);
846 let (stdout_sender, stdout_receiver) = tokio::sync::mpsc::channel::<String>(100);
847
848 if let Some(mut stdin) = child.stdin.take() {
849 tokio::spawn(async move {
850 while let Some(input) = stdin_receiver.recv().await {
851 if let Err(e) = stdin.write_all(input.as_bytes()).await {
852 error!("Failed to write to process stdin: {}", e);
853 break;
854 }
855 if let Err(e) = stdin.write_all(b"\n").await {
856 error!("Failed to write newline to process stdin: {}", e);
857 break;
858 }
859 if let Err(e) = stdin.flush().await {
860 error!("Failed to flush process stdin: {}", e);
861 break;
862 }
863 }
864 });
865 }
866
867 if let Some(stdout) = child.stdout.take() {
868 let stdout_sender_clone = stdout_sender.clone();
869 tokio::spawn(async move {
870 let mut reader = BufReader::new(stdout);
871 let mut line = String::new();
872 while let Ok(bytes_read) = reader.read_line(&mut line).await {
873 if bytes_read == 0 {
874 break;
875 }
876 if let Err(e) = stdout_sender_clone.send(line.trim_end().to_string()).await {
877 error!("Failed to send stdout message: {}", e);
878 break;
879 }
880 line.clear();
881 }
882 });
883 }
884
885 if let Some(stderr) = child.stderr.take() {
886 let stderr_sender = stdout_sender.clone();
887 tokio::spawn(async move {
888 let mut reader = BufReader::new(stderr);
889 let mut line = String::new();
890 while let Ok(bytes_read) = reader.read_line(&mut line).await {
891 if bytes_read == 0 {
892 break;
893 }
894 if let Err(e) = stderr_sender.send(format!("STDERR: {}", line.trim_end())).await {
895 error!("Failed to send stderr message: {}", e);
896 break;
897 }
898 line.clear();
899 }
900 });
901 }
902
903 let queue_pid = pid;
904 tokio::spawn(async move {
905 loop {
906 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
907
908 let process_pool = match PROCESS_POOL.get() {
909 Some(pool) => pool,
910 None => break,
911 };
912
913 let mut pool = process_pool.lock().await;
914
915 let process = match pool.get_mut(&queue_pid) {
916 Some(p) => p,
917 None => break,
918 };
919
920 while let Some(input) = process.input_queue.pop_front() {
921 if let Some(sender) = &process.sender {
922 if let Err(_) = sender.try_send(input.clone()) {
923 process.input_queue.push_front(input);
924 break;
925 }
926 } else {
927 process.input_queue.clear();
928 break;
929 }
930 }
931
932 drop(pool);
933 }
934 });
935
936 let exit_callback = self.exit_callback.clone();
937 tokio::spawn(async move {
938 match child.wait().await {
939 Ok(exit_status) => {
940 let exit_code = exit_status.code().unwrap_or(-1); // Use -1 for unknown exit codes
941 debug!("Process {} exited with code: {}", pid, exit_code);
942
943 if let Some(exit_callback) = exit_callback {
944 exit_callback(exit_code);
945 }
946 }
947 Err(e) => {
948 error!("Process {} exited with error: {}", pid, e);
949
950 // Still trigger callback with error code
951 if let Some(exit_callback) = exit_callback {
952 exit_callback(-1); // or some other error indicator
953 }
954 }
955 }
956
957 // Cleanup
958 if let Some(process_pool) = PROCESS_POOL.get() {
959 let mut pool = process_pool.lock().await;
960 pool.remove(&pid);
961 debug!("Process {} has exited and been removed from the pool", pid);
962 }
963 });
964
965 let process_pool = PROCESS_POOL.get_or_init(|| Arc::new(Mutex::new(HashMap::new())));
966
967 let mut pool = process_pool.lock().await;
968 pool.insert(
969 pid,
970 Self {
971 pid: Some(pid),
972 filename: self.filename.clone(),
973 arguments: self.arguments.clone(),
974 working_directory: self.working_directory.clone(),
975 sender: Some(stdin_sender),
976 receiver: Some(stdout_receiver),
977 input_queue: VecDeque::new(),
978 exit_callback: self.exit_callback.clone(),
979 },
980 );
981
982 self.pid = Some(pid);
983
984 Ok(pid)
985 }
986
987 /// Asynchronously retrieves a handle to a process for a given process ID (PID).
988 ///
989 /// This function checks if a process with the specified PID exists in a shared
990 /// process pool. If the process is found, it returns an `Option` containing a
991 /// `ProcessHandle` for the process. Otherwise, it returns `None`.
992 ///
993 /// # Arguments
994 ///
995 /// * `pid` - A 32-bit unsigned integer representing the process ID of the
996 /// desired process.
997 ///
998 /// # Returns
999 ///
1000 /// * `Some(ProcessHandle)` - If a process with the given PID is found in the
1001 /// process pool.
1002 /// * `None` - If the process does not exist in the process pool or if the
1003 /// process pool is uninitialized.
1004 ///
1005 /// # Example
1006 ///
1007 /// ```rust
1008 /// if let Some(handle) = get_process_by_pid(12345).await {
1009 /// println!("Process found with PID: {}", handle.pid);
1010 /// } else {
1011 /// println!("Process not found.");
1012 /// }
1013 /// ```
1014 ///
1015 /// # Errors
1016 ///
1017 /// This function will return `None` if the global process pool (`PROCESS_POOL`)
1018 /// has not been initialized or is unavailable.
1019 ///
1020 /// # Note
1021 ///
1022 /// This is an asynchronous function and must be awaited to complete its operation.
1023 pub async fn get_process_by_pid(pid: u32) -> Option<ProcessHandle> {
1024 let process_pool = PROCESS_POOL.get()?;
1025 let pool = process_pool.lock().await;
1026 if pool.contains_key(&pid) { Some(ProcessHandle { pid }) } else { None }
1027 }
1028
1029 /// Asynchronously checks if a process identified by its `pid` is currently running.
1030 ///
1031 /// # Returns
1032 /// - `true` if the process with the stored `pid` is found in the `PROCESS_POOL`.
1033 /// - `false` if the stored `pid` is `None`, the `PROCESS_POOL` is not initialized,
1034 /// or the `pid` does not exist in the `PROCESS_POOL`.
1035 ///
1036 /// The function first checks if the `pid` instance variable is set. If so, it attempts
1037 /// to access the global `PROCESS_POOL`. If `PROCESS_POOL` is initialized, it acquires
1038 /// a lock and checks if the `pid` exists in the pool.
1039 ///
1040 /// # Examples
1041 /// ```rust
1042 /// let result = my_instance.is_process_running().await;
1043 /// if result {
1044 /// println!("The process is running.");
1045 /// } else {
1046 /// println!("The process is not running.");
1047 /// }
1048 /// ```
1049 ///
1050 /// # Async Behavior
1051 /// This function acquires an asynchronous lock on the `PROCESS_POOL` to safely access
1052 /// its contents and will yield if the lock is currently held elsewhere.
1053 ///
1054 /// # Panics
1055 /// This function may panic if the async lock on the `PROCESS_POOL` fails unexpectedly.
1056 ///
1057 /// # Dependencies
1058 /// - `PROCESS_POOL` should be a globally accessible and lazily initialized structure
1059 /// (e.g., using `once_cell` or similar patterns) that maintains a mapping of currently
1060 /// active processes.
1061 pub async fn is_process_running(&self) -> bool {
1062 if let Some(pid) = self.pid {
1063 if let Some(process_pool) = PROCESS_POOL.get() {
1064 let pool = process_pool.lock().await;
1065 return pool.contains_key(&pid);
1066 }
1067 }
1068 false
1069 }
1070}