tokio_interactive/lib.rs
1#![doc = include_str!("../README.md")]
2use anyhow::{Result, anyhow};
3use log::*;
4use serde::{Deserialize, Serialize};
5use std::collections::{HashMap, VecDeque};
6use std::fmt::{Debug, Formatter};
7use std::path::PathBuf;
8use std::sync::{Arc, OnceLock};
9use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
10use tokio::process::Command;
11use tokio::sync::Mutex;
12use tokio::sync::mpsc::Sender;
13use tokio::sync::broadcast;
14// Change this line
15
16/// A static, lazily-initialized process pool used to manage asynchronous interactive processes.
17///
18/// This global variable utilizes the `OnceLock` to ensure it is initialized only once during
19/// the program's lifetime. It holds an `Arc<Mutex<HashMap<u32, AsynchronousInteractiveProcess>>>`
20/// to allow safe concurrent access and modification of the process pool.
21///
22/// - `OnceLock`: Provides thread-safe, one-time initialization of the process pool.
23/// - `Arc`: Ensures the `Mutex<HashMap<...>>` can be shared across threads safely.
24/// - `Mutex`: Protects the `HashMap` from concurrent modification, ensuring thread safety.
25/// - `HashMap<u32, AsynchronousInteractiveProcess>`: Maps process IDs (`u32`) to
26/// corresponding `AsynchronousInteractiveProcess` instances.
27///
28/// Usage:
29///
30/// The process pool is intended to store and manage asynchronous interactive processes by their IDs.
31/// Access needs to ensure proper locking with the `Mutex` guard to guarantee thread safety.
32///
33/// Example initialization:
34/// ```rust
35/// PROCESS_POOL.get_or_init(|| Arc::new(Mutex::new(HashMap::new())));
36/// ```
37///
38/// Example access:
39/// ```rust
40/// if let Some(pool) = PROCESS_POOL.get() {
41/// let mut guard = pool.lock().unwrap();
42/// guard.insert(process_id, new_process);
43/// }
44/// ```
45/// Ensure that any operations on the pool respect the locking mechanism provided by the `Mutex`.
46///
47/// Once initialized, `PROCESS_POOL` cannot be re-initialized or re-assigned.
48static PROCESS_POOL: OnceLock<Arc<Mutex<HashMap<u32, AsynchronousInteractiveProcess>>>> = OnceLock::new();
49
50/// Represents a handle to a process identified by its process ID (PID).
51///
52/// The `ProcessHandle` struct is used to encapsulate the process identifier (PID)
53/// of a running process. It is designed to be lightweight and can be cloned
54/// or debugged as required.
55///
56/// # Fields
57/// - `pid`: A `u32` value that represents the process ID of the targeted process.
58///
59/// # Traits
60/// - `Debug`: Allows instances of `ProcessHandle` to be formatted using the `{:?}` formatter
61/// for debugging purposes.
62/// - `Clone`: Allows the `ProcessHandle` to be cloned, creating a new instance with the
63/// same `pid`.
64///
65/// # Example
66/// ```
67/// let handle = ProcessHandle { pid: 1234 };
68/// println!("{:?}", handle); // Outputs: ProcessHandle { pid: 1234 }
69/// let cloned_handle = handle.clone();
70/// println!("{:?}", cloned_handle); // Outputs: ProcessHandle { pid: 1234 }
71/// ```
72#[derive(Debug)]
73pub struct ProcessHandle {
74 pid: u32,
75 receiver: broadcast::Receiver<String>,
76}
77
78/// `AsynchronousInteractiveProcess` is a structure that represents a non-blocking,
79/// interactive process. It provides metadata and mechanisms to interact with a
80/// process asynchronously using sender and receiver channels.
81///
82/// # Fields
83///
84/// * `pid` - An optional process ID (`pid`) of the interactive process. If the process
85/// is not yet started, this field will remain `None`.
86///
87/// * `filename` - The name of the executable file that represents the interactive process.
88/// This is required to identify and initiate the process.
89///
90/// * `arguments` - A vector containing the arguments to pass to the executable file
91/// when launching the process.
92///
93/// * `working_directory` - The directory where the interactive process will be executed.
94/// This is represented as a `PathBuf`.
95///
96/// * `sender` - An optional `Sender` channel used to send messages or data to the
97/// interactive process. This field is skipped during serialization and deserialization
98/// because it holds runtime-related data.
99///
100/// * `receiver` - An optional `Receiver` channel used to receive messages or data
101/// from the interactive process. This field is also skipped during serialization
102/// and deserialization for the same reasons as `sender`.
103///
104/// * `input_queue` - A deque (double-ended queue) that maintains an in-memory buffer
105/// of strings representing input for the interactive process. This field is skipped
106/// during serialization as it only contains transient runtime-related data.
107///
108/// # Trait Implementations
109///
110/// - `Debug`: Allows instances of this struct to be formatted and logged for debugging purposes.
111/// - `Serialize`: Makes the struct serializable, excluding fields marked with `#[serde(skip)]`.
112/// - `Deserialize`: Allows deserialization to create instances of this struct from serialized data.
113/// - `Default`: Provides a default implementation where optional fields are set to `None`,
114/// collections are empty, and `filename` is an empty string.
115///
116/// # Example
117/// ```rust
118/// use std::path::PathBuf;
119/// use std::collections::VecDeque;
120/// use serde::{Serialize, Deserialize};
121/// use crossbeam_channel::{Sender, Receiver};
122///
123/// let process = AsynchronousInteractiveProcess {
124/// pid: None,
125/// filename: "my_program".to_string(),
126/// arguments: vec!["--help".to_string()],
127/// working_directory: PathBuf::from("/path/to/dir"),
128/// sender: None,
129/// receiver: None,
130/// input_queue: VecDeque::new(),
131/// };
132///
133/// println!("{:?}", process);
134/// ```
135///
136/// This structure is ideal for scenarios where processes need to be controlled
137/// asynchronously with multiple input or output channels.
138#[derive(Serialize, Deserialize, Default)]
139pub struct AsynchronousInteractiveProcess {
140 pub pid: Option<u32>,
141 pub filename: String,
142 pub arguments: Vec<String>,
143 pub working_directory: PathBuf,
144 #[serde(skip)]
145 sender: Option<Sender<String>>,
146 #[serde(skip)]
147 output_broadcaster: Option<broadcast::Sender<String>>,
148 #[serde(skip)]
149 _keep_alive_receiver: Option<broadcast::Receiver<String>>,
150 #[serde(skip)]
151 input_queue: VecDeque<String>,
152 #[serde(skip)]
153 exit_callback: Option<Arc<dyn Fn(i32) + Send + Sync>>,
154}
155
156impl Debug for AsynchronousInteractiveProcess {
157 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
158 write!(
159 f,
160 "AsynchronousInteractiveProcess {{ pid: {:?}, filename: {:?}, arguments: {:?}, working_directory: {:?} }}",
161 self.pid, self.filename, self.arguments, self.working_directory
162 )
163 }
164}
165
166impl ProcessHandle {
167 /// Receives an output message asynchronously from the process associated with the instance's `pid`
168 /// if available.
169 ///
170 /// This function attempts to fetch a message by accessing the process pool and checking for a
171 /// message in the associated process's receiver.
172 ///
173 /// The process follows these steps:
174 /// 1. Check if the global process pool (`PROCESS_POOL`) is initialized.
175 /// 2. Attempt to acquire the lock on the pool asynchronously.
176 /// 3. Look for the process associated with `self.pid`.
177 /// 4. If the process has a receiver channel, attempt to get a message using `try_recv`.
178 /// 5. If a message is found, return it as `Ok(Some(String))`.
179 /// 6. If no message is received, retry up to `MAX_RETRIES` times, with a delay of `RETRY_DELAY_MS`
180 /// milliseconds between each retry.
181 ///
182 /// If no message is received after all retries, the function returns `Ok(None)`.
183 ///
184 /// # Returns
185 ///
186 /// * `Ok(Some(String))` - If a message is successfully received from the receiver channel.
187 /// * `Ok(None)` - If no message is received after all retries.
188 /// * `Err(Error)` - If an error occurs during the process, such as:
189 /// - The process pool is not initialized
190 /// - The process with the specified PID is not found
191 /// - The receiver channel is not available
192 /// - The receiver channel is disconnected
193 ///
194 /// # Behavior
195 ///
196 /// - This function uses `tokio::time::sleep` for introducing delays between retries and works
197 /// asynchronously.
198 /// - The function uses constants `MAX_RETRIES` (10) and `RETRY_DELAY_MS` (10) to control the
199 /// retry behavior.
200 /// - The function properly propagates errors that occur during the process.
201 ///
202 /// # Example
203 ///
204 /// ```rust
205 /// match instance.receive_output().await {
206 /// Ok(Some(output)) => println!("Received output: {}", output),
207 /// Ok(None) => println!("No output received."),
208 /// Err(e) => eprintln!("Error receiving output: {}", e),
209 /// }
210 /// ```
211 ///
212 /// # Note
213 ///
214 /// - The function assumes the existence of a global process pool (`PROCESS_POOL`) which is safe
215 /// to access concurrently using mutex-based locking.
216 /// - The function makes use of `tokio::sync::Mutex` to handle concurrent executions across
217 /// asynchronous tasks.
218 /// Default timeout for receive_output in milliseconds
219 const DEFAULT_TIMEOUT_MS: u64 = 100;
220
221
222 pub async fn receive_output(&mut self) -> Result<Option<String>> {
223 // Use the default timeout
224 self.receive_output_with_timeout(std::time::Duration::from_millis(Self::DEFAULT_TIMEOUT_MS)).await
225 }
226
227 /// Receives an output message asynchronously from the process associated with the instance's `pid`
228 /// if available, with a specified timeout.
229 ///
230 /// This function is similar to `receive_output()` but allows specifying a custom timeout duration
231 /// instead of using the default timeout.
232 ///
233 /// # Arguments
234 ///
235 /// * `timeout` - A `std::time::Duration` specifying how long to wait for a message before giving up.
236 ///
237 /// # Returns
238 ///
239 /// * `Ok(Some(String))` - If a message is successfully received from the receiver channel.
240 /// * `Ok(None)` - If no message is received before the timeout expires.
241 /// * `Err(Error)` - If an error occurs during the process, such as:
242 /// - The process pool is not initialized
243 /// - The process with the specified PID is not found
244 /// - The receiver channel is not available
245 /// - The receiver channel is disconnected
246 ///
247 /// # Example
248 ///
249 /// ```rust
250 /// // Wait for up to 5 seconds for a message
251 /// match instance.receive_output_with_timeout(std::time::Duration::from_secs(5)).await {
252 /// Ok(Some(output)) => println!("Received output: {}", output),
253 /// Ok(None) => println!("No output received within timeout."),
254 /// Err(e) => eprintln!("Error receiving output: {}", e),
255 /// }
256 /// ```
257 pub async fn receive_output_with_timeout(&mut self, timeout: std::time::Duration) -> Result<Option<String>> {
258 // Define constants for retry parameters
259 const RETRY_DELAY_MS: u64 = 10;
260
261 // Try to receive a message immediately
262 match self.receiver.try_recv() {
263 Ok(msg) => return Ok(Some(msg)),
264 Err(broadcast::error::TryRecvError::Empty) => {}
265 Err(broadcast::error::TryRecvError::Closed) => {
266 return Err(anyhow!("Broadcast channel closed for process {}", self.pid));
267 }
268 Err(broadcast::error::TryRecvError::Lagged(_)) => {
269 // Messages were dropped due to lag, but we can continue
270 }
271 }
272
273 // If no message is available, retry with delay until timeout is reached
274 let start_time = std::time::Instant::now();
275 while start_time.elapsed() < timeout {
276 tokio::time::sleep(tokio::time::Duration::from_millis(RETRY_DELAY_MS)).await;
277
278 match self.receiver.try_recv() {
279 Ok(msg) => return Ok(Some(msg)),
280 Err(broadcast::error::TryRecvError::Empty) => {}
281 Err(broadcast::error::TryRecvError::Closed) => {
282 return Err(anyhow!("Broadcast channel closed for process {}", self.pid));
283 }
284 Err(broadcast::error::TryRecvError::Lagged(_)) => {
285 // Messages were dropped due to lag, but we can continue
286 }
287 }
288 }
289
290 // No message after timeout
291 Ok(None)
292 }
293
294 /// Sends the provided input to a process associated with this instance's PID asynchronously.
295 ///
296 /// # Arguments
297 ///
298 /// * `input` - An input of any type that can be converted into a `String`. This is the data
299 /// that will be sent to the respective process's input queue.
300 ///
301 /// # Returns
302 ///
303 /// * `Ok(())` if the input was successfully sent or queued for sending.
304 /// * `Err` - Returns an error in the following cases:
305 /// - If the process pool is not initialized.
306 /// - If the process associated with this PID is not found.
307 /// - If the process was not started or its sender channel is not available.
308 /// - If the sender channel is closed and input cannot be sent.
309 ///
310 /// # Details
311 ///
312 /// This function retrieves the process associated with the current instance's `pid`
313 /// from a global process pool. If a valid process is found:
314 /// - The input is sent via an asynchronous channel to the process.
315 /// - If the channel is full, the input is queued for later sending.
316 /// - If the channel is closed, an error is returned.
317 ///
318 /// The function also ensures thread safety by acquiring a lock on the process pool before attempting
319 /// any operations related to the process.
320 ///
321 /// # Errors
322 ///
323 /// This function propagates several potential issues as errors:
324 /// - If the process pool is uninitialized (`PROCESS_POOL.get()` returns `None`).
325 /// - If the process associated with the PID is missing in the pool.
326 /// - If the sender channel was never initialized or is unavailable.
327 /// - If the sender channel is closed and no new messages can be sent.
328 ///
329 /// # Example
330 ///
331 /// ```rust
332 /// use anyhow::Result;
333 ///
334 /// #[tokio::main]
335 /// async fn main() -> Result<()> {
336 /// let manager = ProcessManager::new(1); // Assumes a struct is managing process with ID 1.
337 /// manager.send_input("Some input").await?;
338 /// Ok(())
339 /// }
340 /// ```
341 ///
342 /// # Note
343 ///
344 /// This function expects a global process pool (`PROCESS_POOL`) to be properly initialized before being called.
345 /// Additionally, the associated process must have a valid `sender` channel to accept input.
346 pub async fn send_input(&self, input: impl Into<String>) -> Result<()> {
347 let input_str = input.into();
348 if let Some(process_pool) = PROCESS_POOL.get() {
349 let mut pool = process_pool.lock().await;
350 if let Some(process) = pool.get_mut(&self.pid) {
351 if let Some(sender) = &process.sender {
352 match sender.try_send(input_str.clone()) {
353 Ok(_) => Ok(()),
354 Err(e) => match e {
355 tokio::sync::mpsc::error::TrySendError::Full(_) => {
356 process.input_queue.push_back(input_str);
357 Ok(())
358 }
359 tokio::sync::mpsc::error::TrySendError::Closed(_) => {
360 Err(anyhow!("Failed to send input: channel closed"))
361 }
362 },
363 }
364 } else {
365 Err(anyhow!("Process not started or sender not available"))
366 }
367 } else {
368 Err(anyhow!("Process not found"))
369 }
370 } else {
371 Err(anyhow!("Process pool not initialized"))
372 }
373 }
374
375 /// Checks if the process associated with the current instance is running.
376 ///
377 /// This method attempts to determine if a process with the `pid` of the current instance
378 /// exists in the global `PROCESS_POOL`.
379 ///
380 /// # Returns
381 /// * `true` - If the process with the associated `pid` is currently present in the global process pool.
382 /// * `false` - If the process is not found in the global process pool or if the pool is not initialized.
383 ///
384 /// # Async Behavior
385 /// This method is asynchronous because it acquires a lock on the `PROCESS_POOL`.
386 ///
387 /// # Notes
388 /// - The `PROCESS_POOL` must be initialized before calling this function.
389 /// If `PROCESS_POOL` is not set, the function will return `false`.
390 /// - The `PROCESS_POOL` is expected to be a globally accessible, asynchronous, and thread-safe
391 /// data structure that tracks active processes.
392 ///
393 /// # Example
394 /// ```rust
395 /// let result = instance.is_process_running().await;
396 /// if result {
397 /// println!("Process is running.");
398 /// } else {
399 /// println!("Process is not running.");
400 /// }
401 /// ```
402 pub async fn is_process_running(&self) -> bool {
403 if let Some(process_pool) = PROCESS_POOL.get() {
404 let pool = process_pool.lock().await;
405 return pool.contains_key(&self.pid);
406 }
407 false
408 }
409
410 /// Asynchronously terminates a process identified by its `pid`.
411 ///
412 /// This method performs the following operations tailored to the target operating system:
413 ///
414 /// - **Windows**: Opens a handle to the process using its process ID (`pid`) and forcefully terminates it
415 /// using the `TerminateProcess` function from the WinAPI.
416 /// - **Linux**: Utilizes the `kill` system call with the `SIGKILL` signal to forcefully terminate the process.
417 ///
418 /// ## Platform-specific Notes:
419 ///
420 /// - On **Windows**, the process is identified and terminated using the `OpenProcess` and `TerminateProcess`
421 /// functions from the WinAPI.
422 /// - On **Linux**, the `kill` system call is used with the signal `SIGKILL` (9) to ensure the process is terminated.
423 ///
424 /// # Errors
425 ///
426 /// - Returns an error if the process termination fails (on Linux) due to system call errors or invalid process IDs.
427 /// On failure, the error contains details about the `pid` and the last OS error encountered.
428 ///
429 /// # Safety
430 ///
431 /// This method uses unsafe code blocks to interact with system APIs (`libc` on Linux, WinAPI on Windows). Ensure
432 /// that the provided `pid` corresponds to a valid process, and consider the implications of forcefully
433 /// terminating processes.
434 ///
435 /// # Example
436 ///
437 /// ```rust
438 /// let process_manager = SomeProcessManager::new(pid); // Example struct containing the pid
439 /// if let Err(e) = process_manager.kill().await {
440 /// eprintln!("Failed to terminate process: {}", e);
441 /// }
442 /// ```
443 /// Attempts to gracefully shut down the process, falling back to forceful termination if needed.
444 ///
445 /// This method first tries to gracefully shut down the process by:
446 /// - On Linux: Sending a SIGTERM signal
447 /// - On Windows: Sending a WM_CLOSE message to the main window
448 ///
449 /// If the process doesn't exit within the specified timeout, it will forcefully terminate
450 /// the process using the `kill()` method.
451 ///
452 /// # Arguments
453 ///
454 /// * `timeout` - A `std::time::Duration` specifying how long to wait for the process to exit
455 /// gracefully before forcefully terminating it.
456 ///
457 /// # Returns
458 ///
459 /// * `Ok(())` - If the process was successfully shut down (either gracefully or forcefully).
460 /// * `Err(Error)` - If an error occurred during the shutdown process.
461 ///
462 /// # Example
463 ///
464 /// ```rust
465 /// // Try to shut down gracefully, waiting up to 5 seconds before force killing
466 /// if let Err(e) = process.shutdown(std::time::Duration::from_secs(5)).await {
467 /// eprintln!("Failed to shut down process: {}", e);
468 /// }
469 /// ```
470 pub async fn shutdown(&self, timeout: std::time::Duration) -> Result<()> {
471 // First, try to gracefully shut down the process
472 let graceful_shutdown_result = self.graceful_shutdown().await;
473
474 // If graceful shutdown failed or isn't implemented for this platform, log it but continue
475 if let Err(e) = &graceful_shutdown_result {
476 debug!("Graceful shutdown attempt failed: {}", e);
477 return self.kill().await;
478 }
479
480 // Wait for the process to exit for the specified timeout
481 let start_time = std::time::Instant::now();
482 while start_time.elapsed() < timeout {
483 if !self.is_process_running().await {
484 return Ok(());
485 }
486 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
487 }
488
489 // If we're here, the process didn't exit gracefully within the timeout
490 // Fall back to forceful termination
491 debug!("Process did not exit gracefully within timeout, forcing termination");
492 self.kill().await
493 }
494
495 /// Attempts to gracefully shut down the process without forceful termination.
496 ///
497 /// This method is platform-specific:
498 /// - On Linux: Sends a SIGTERM signal to request graceful termination
499 /// - On Windows: First tries to send a Ctrl+C event to the process using GenerateConsoleCtrlEvent.
500 /// If that fails, it attempts to send an "exit" command to the process via stdin.
501 ///
502 /// # Returns
503 ///
504 /// * `Ok(())` - If the graceful shutdown signal was successfully sent.
505 /// * `Err(Error)` - If an error occurred while sending the graceful shutdown signal.
506 ///
507 /// # Notes
508 ///
509 /// - A successful return doesn't guarantee the process will actually exit.
510 /// Use `shutdown()` with a timeout to ensure the process exits.
511 /// - On Windows, the GenerateConsoleCtrlEvent function works with process groups, not individual
512 /// processes, so it may not work for all processes. The fallback "exit" command approach
513 /// works for many command-line applications that accept such commands.
514 /// - For GUI applications on Windows, neither approach may work. In such cases, the `shutdown()`
515 /// method will fall back to forceful termination after the timeout.
516 async fn graceful_shutdown(&self) -> Result<()> {
517 #[cfg(target_os = "windows")]
518 {
519 unsafe {
520 // First, try to send Ctrl+C event to the process
521 // CTRL_C_EVENT = 0, CTRL_BREAK_EVENT = 1
522 // Note: GenerateConsoleCtrlEvent works with process groups, not individual processes
523 // For console applications, we can try sending Ctrl+C to the process
524 let result = winapi::um::wincon::GenerateConsoleCtrlEvent(0, self.pid);
525 if result == 0 {
526 return Err(anyhow!(
527 "Failed to send Ctrl+C to process {}: {}",
528 self.pid,
529 std::io::Error::last_os_error()
530 ));
531 }
532 }
533 return Ok(());
534 }
535
536 #[cfg(target_os = "linux")]
537 {
538 unsafe {
539 // Use the kill system call with SIGTERM (15) to request graceful termination
540 let result = libc::kill(self.pid as libc::pid_t, libc::SIGTERM);
541 if result != 0 {
542 return Err(anyhow!(
543 "Failed to send SIGTERM to process {}: {}",
544 self.pid,
545 std::io::Error::last_os_error()
546 ));
547 }
548 }
549 return Ok(());
550 }
551
552 #[cfg(not(any(target_os = "windows", target_os = "linux")))]
553 {
554 return Err(anyhow!("Graceful shutdown not implemented for this platform"));
555 }
556 }
557
558 /// Forcefully terminates the process immediately.
559 ///
560 /// This method should be used as a last resort when a process needs to be terminated
561 /// immediately. For a more graceful approach, consider using `shutdown()` first.
562 ///
563 /// # Returns
564 ///
565 /// * `Ok(())` - If the process was successfully terminated.
566 /// * `Err(Error)` - If an error occurred during the termination process.
567 ///
568 /// # Example
569 ///
570 /// ```rust
571 /// if let Err(e) = process.kill().await {
572 /// eprintln!("Failed to terminate process: {}", e);
573 /// }
574 /// ```
575 pub async fn kill(&self) -> Result<()> {
576 #[cfg(target_os = "windows")]
577 {
578 unsafe {
579 // PROCESS_TERMINATE (0x00010000) access right is required to terminate a process
580 let handle = winapi::um::processthreadsapi::OpenProcess(0x00010000, 0, self.pid);
581 if handle.is_null() {
582 return Err(anyhow!("Failed to open process {}: {}", self.pid, std::io::Error::last_os_error()));
583 }
584
585 // Attempt to terminate the process
586 let result = winapi::um::processthreadsapi::TerminateProcess(handle, 0);
587
588 // Always close the handle to prevent resource leaks
589 let close_result = winapi::um::handleapi::CloseHandle(handle);
590
591 // Check if termination was successful
592 if result == 0 {
593 return Err(anyhow!(
594 "Failed to terminate process {}: {}",
595 self.pid,
596 std::io::Error::last_os_error()
597 ));
598 }
599
600 // Check if handle was closed successfully
601 if close_result == 0 {
602 warn!(
603 "Failed to close process handle for process {}: {}",
604 self.pid,
605 std::io::Error::last_os_error()
606 );
607 // We don't return an error here as the process was terminated successfully
608 }
609 }
610 }
611 #[cfg(target_os = "linux")]
612 {
613 unsafe {
614 // Use the kill system call with SIGKILL (9) to forcefully terminate the process
615 let result = libc::kill(self.pid as libc::pid_t, libc::SIGKILL);
616 if result != 0 {
617 return Err(anyhow!("Failed to kill process {}: {}", self.pid, std::io::Error::last_os_error()));
618 }
619 }
620 }
621
622 Ok(())
623 }
624}
625
626impl AsynchronousInteractiveProcess {
627 /// Creates a new instance of the struct with default values.
628 ///
629 /// # Arguments
630 ///
631 /// * `filename` - A value that can be converted into a `String`. This typically represents the
632 /// name or path of the file associated with the instance.
633 ///
634 /// # Returns
635 ///
636 /// Returns a new instance of the struct populated with default fields:
637 /// - `pid`: `None` (indicating no process ID is associated yet).
638 /// - `filename`: The provided `filename` converted into a `String`.
639 /// - `arguments`: An empty vector, representing no initial arguments.
640 /// - `working_directory`: Set to the current directory (`"./"`).
641 /// - `sender`: `None`, indicating no sender is associated initially.
642 /// - `receiver`: `None`, indicating no receiver is associated initially.
643 /// - `input_queue`: An empty `VecDeque`, representing no items in the input queue.
644 ///
645 /// # Example
646 ///
647 /// ```
648 /// let instance = MyStruct::new("example.txt");
649 /// assert_eq!(instance.filename, "example.txt");
650 /// assert!(instance.pid.is_none());
651 /// assert!(instance.arguments.is_empty());
652 /// assert_eq!(instance.working_directory, PathBuf::from("./"));
653 /// assert!(instance.sender.is_none());
654 /// assert!(instance.receiver.is_none());
655 /// assert!(instance.input_queue.is_empty());
656 /// ```
657 pub fn new(filename: impl Into<String>) -> Self {
658 Self {
659 pid: None,
660 filename: filename.into(),
661 arguments: Vec::new(),
662 working_directory: PathBuf::from("./"),
663 sender: None,
664 output_broadcaster: None,
665 _keep_alive_receiver: None,
666 input_queue: VecDeque::new(),
667 exit_callback: None,
668 }
669 }
670
671 /// Sets the arguments for the current instance by converting a vector of items
672 /// implementing `Into<String>` into a `Vec<String>`.
673 ///
674 /// # Parameters
675 /// - `args`: A `Vec` containing items that implement the `Into<String>` trait. These
676 /// items will be converted into `String` and used to set the `arguments` field
677 /// of the instance.
678 ///
679 /// # Returns
680 /// - `Self`: The current instance (`self`) after updating its `arguments` field.
681 ///
682 /// # Example
683 /// ```rust
684 /// let instance = MyStruct::new()
685 /// .with_arguments(vec!["arg1", "arg2", String::from("arg3")]);
686 /// ```
687 ///
688 /// This method allows chaining, as it returns the updated instance
689 /// after setting the `arguments` field.
690 pub fn with_arguments(mut self, args: Vec<impl Into<String>>) -> Self {
691 self.arguments = args.into_iter().map(|arg| arg.into()).collect();
692 self
693 }
694
695 /// Adds an argument to the `arguments` vector and returns the modified instance.
696 ///
697 /// # Parameters
698 ///
699 /// * `arg` - A value that implements the `Into<String>` trait, which will be converted into a `String`
700 /// and added to the `arguments` vector.
701 ///
702 /// # Returns
703 ///
704 /// Returns the modified instance of the implementing struct (`Self`) with the new argument added.
705 ///
706 /// # Example
707 ///
708 /// ```rust
709 /// let instance = SomeStruct::new().with_argument("example");
710 /// ```
711 ///
712 /// This adds the string `"example"` to the `arguments` vector of `SomeStruct`.
713 pub fn with_argument(mut self, arg: impl Into<String>) -> Self {
714 self.arguments.push(arg.into());
715 self
716 }
717
718 /// Sets the working directory for the instance and returns the modified instance.
719 ///
720 /// # Arguments
721 ///
722 /// * `dir` - A value that can be converted into a `PathBuf`, representing the desired working directory.
723 ///
724 /// # Returns
725 ///
726 /// The instance of the struct with the updated working directory.
727 ///
728 /// # Example
729 ///
730 /// ```rust
731 /// let instance = MyStruct::new()
732 /// .with_working_directory("/path/to/directory");
733 /// ```
734 pub fn with_working_directory(mut self, dir: impl Into<PathBuf>) -> Self {
735 self.working_directory = dir.into();
736 self
737 }
738
739 pub fn process_exit_callback<F>(mut self, callback: F) -> Self
740 where
741 F: Fn(i32) + Send + Sync + 'static,
742 {
743 self.exit_callback = Some(Arc::new(callback));
744 self
745 }
746
747 /// Starts a new process based on the configuration stored in the struct, manages
748 /// its I/O streams asynchronously, and tracks its lifecycle in a shared process pool.
749 ///
750 /// # Returns
751 /// - `Ok<u32>`: Returns the process ID (PID) if the process starts successfully.
752 /// - `Err(std::io::Error)`: Returns an error if any part of the process start operation fails.
753 ///
754 /// # Process Configuration
755 /// - The process is launched using the executable specified in `self.filename`.
756 /// - Command-line arguments are passed via `self.arguments`.
757 /// - The process will run in the directory specified in `self.working_directory`.
758 ///
759 /// # Standard I/O Management
760 /// - The process's `stdin` is assigned a `tokio::sync::mpsc::channel` for communication.
761 /// - `stdout` and `stderr` are read asynchronously. Each line from these streams is sent
762 /// to an `mpsc::channel`, where `stdout`/`stderr` data can be consumed.
763 ///
764 /// # Shared Process Pool
765 /// - The process metadata, including its PID and I/O channels, is stored in a global,
766 /// thread-safe `PROCESS_POOL`.
767 /// - The process pool is managed using a `tokio::sync::Mutex` and stores processes in a
768 /// `HashMap` with their PID as the key.
769 ///
770 /// # Asynchronous I/O
771 /// - A background task is spawned to write data from an internal input queue to the process's
772 /// `stdin`.
773 /// - Another background task continuously reads and forwards `stdout` and `stderr` messages
774 /// from the process.
775 /// - If the process exits or encounters an error, its PID and metadata are removed from the
776 /// process pool.
777 ///
778 /// # Example Use Case
779 /// ```no_run
780 /// let mut my_process = MyProcess {
781 /// filename: "my_program".to_string(),
782 /// arguments: vec!["--arg1", "value1".to_string()],
783 /// working_directory: "/path/to/dir".to_string(),
784 /// pid: None,
785 /// sender: None,
786 /// receiver: None,
787 /// input_queue: VecDeque::new(),
788 /// };
789 ///
790 /// let pid = my_process.start().await.unwrap();
791 /// println!("Started process with PID: {}", pid);
792 /// ```
793 ///
794 /// # Notes
795 /// - The process's `stdin`, `stdout`, and `stderr` are piped to capture and manage
796 /// communication asynchronously.
797 /// - Each spawned background task is independently responsible for managing a specific
798 /// stream or part of the process lifecycle.
799 /// - Errors during I/O operations or spawning the process are logged using the `log` crate
800 /// macros (e.g., `error!`, `debug!`).
801 ///
802 /// # Potential Errors
803 /// - Failure to spawn the process (e.g., if `self.filename` is invalid or inaccessible).
804 /// - Errors during communication with the process's I/O streams (e.g., writing to a closed `stdin`).
805 /// - Mutex locking failure on the shared process pool due to an internal inconsistency.
806 pub async fn start(&mut self) -> Result<u32> {
807 let mut command = Command::new(&self.filename);
808 command.args(&self.arguments);
809
810 // Convert UNC path to regular Windows path if needed
811 let working_dir = if cfg!(windows) {
812 // Remove UNC prefix if present
813 let path_str = self.working_directory.to_string_lossy();
814 if path_str.starts_with(r"\\?\") {
815 PathBuf::from(&path_str[4..]) // Remove the \\?\ prefix
816 } else {
817 self.working_directory.clone()
818 }
819 } else {
820 self.working_directory.clone()
821 };
822
823 // Debug the working directory
824 debug!("tokio-interactive: filename = {}", self.filename);
825 debug!("tokio-interactive: arguments = {:?}", self.arguments);
826 debug!("tokio-interactive: working_directory = {:?}", working_dir);
827 debug!("tokio-interactive: working_directory exists = {}", working_dir.exists());
828 debug!("tokio-interactive: working_directory is_dir = {}", working_dir.is_dir());
829
830 // Check if working directory is absolute
831 debug!("tokio-interactive: working_directory is_absolute = {}", working_dir.is_absolute());
832
833 // Get current directory before setting
834 if let Ok(current_dir) = std::env::current_dir() {
835 debug!("tokio-interactive: current working directory before setting = {:?}", current_dir);
836 }
837
838 command.current_dir(working_dir);
839
840 command.stdin(std::process::Stdio::piped());
841 command.stdout(std::process::Stdio::piped());
842 command.stderr(std::process::Stdio::piped());
843
844 // Debug the final command that will be executed
845 debug!("tokio-interactive: Final command = {:?}", command);
846
847 let mut child = command.spawn()?;
848 let pid = child.id().unwrap_or(0);
849
850 debug!("tokio-interactive: Process spawned with PID = {}", pid);
851
852 let (stdin_sender, mut stdin_receiver) = tokio::sync::mpsc::channel::<String>(100);
853 let (stdout_broadcaster, _keep_alive_receiver) = broadcast::channel::<String>(100);
854
855 if let Some(mut stdin) = child.stdin.take() {
856 tokio::spawn(async move {
857 while let Some(input) = stdin_receiver.recv().await {
858 if let Err(e) = stdin.write_all(input.as_bytes()).await {
859 error!("Failed to write to process stdin: {}", e);
860 break;
861 }
862 if let Err(e) = stdin.write_all(b"\n").await {
863 error!("Failed to write newline to process stdin: {}", e);
864 break;
865 }
866 if let Err(e) = stdin.flush().await {
867 error!("Failed to flush process stdin: {}", e);
868 break;
869 }
870 }
871 });
872 }
873
874 if let Some(stdout) = child.stdout.take() {
875 let stdout_broadcaster_clone = stdout_broadcaster.clone();
876 tokio::spawn(async move {
877 let mut reader = BufReader::new(stdout);
878 let mut line = String::new();
879 while let Ok(bytes_read) = reader.read_line(&mut line).await {
880 if bytes_read == 0 {
881 break;
882 }
883 if let Err(e) = stdout_broadcaster_clone.send(line.trim_end().to_string()) {
884 error!("Failed to broadcast stdout message: {}", e);
885 break;
886 }
887 line.clear();
888 }
889 });
890 }
891
892 if let Some(stderr) = child.stderr.take() {
893 let stderr_broadcaster = stdout_broadcaster.clone();
894 tokio::spawn(async move {
895 let mut reader = BufReader::new(stderr);
896 let mut line = String::new();
897 while let Ok(bytes_read) = reader.read_line(&mut line).await {
898 if bytes_read == 0 {
899 break;
900 }
901 if let Err(e) = stderr_broadcaster.send(format!("STDERR: {}", line.trim_end())) {
902 error!("Failed to broadcast stderr message: {}", e);
903 break;
904 }
905 line.clear();
906 }
907 });
908 }
909
910 let queue_pid = pid;
911 tokio::spawn(async move {
912 loop {
913 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
914
915 let process_pool = match PROCESS_POOL.get() {
916 Some(pool) => pool,
917 None => break,
918 };
919
920 let mut pool = process_pool.lock().await;
921
922 let process = match pool.get_mut(&queue_pid) {
923 Some(p) => p,
924 None => break,
925 };
926
927 while let Some(input) = process.input_queue.pop_front() {
928 if let Some(sender) = &process.sender {
929 if let Err(_) = sender.try_send(input.clone()) {
930 process.input_queue.push_front(input);
931 break;
932 }
933 } else {
934 process.input_queue.clear();
935 break;
936 }
937 }
938
939 drop(pool);
940 }
941 });
942
943 let exit_callback = self.exit_callback.clone();
944 tokio::spawn(async move {
945 match child.wait().await {
946 Ok(exit_status) => {
947 let exit_code = exit_status.code().unwrap_or(-1); // Use -1 for unknown exit codes
948 debug!("Process {} exited with code: {}", pid, exit_code);
949
950 if let Some(exit_callback) = exit_callback {
951 exit_callback(exit_code);
952 }
953 }
954 Err(e) => {
955 error!("Process {} exited with error: {}", pid, e);
956
957 // Still trigger callback with error code
958 if let Some(exit_callback) = exit_callback {
959 exit_callback(-1); // or some other error indicator
960 }
961 }
962 }
963
964 // Cleanup
965 if let Some(process_pool) = PROCESS_POOL.get() {
966 let mut pool = process_pool.lock().await;
967 pool.remove(&pid);
968 debug!("Process {} has exited and been removed from the pool", pid);
969 }
970 });
971
972 let process_pool = PROCESS_POOL.get_or_init(|| Arc::new(Mutex::new(HashMap::new())));
973
974 let mut pool = process_pool.lock().await;
975 pool.insert(
976 pid,
977 Self {
978 pid: Some(pid),
979 filename: self.filename.clone(),
980 arguments: self.arguments.clone(),
981 working_directory: self.working_directory.clone(),
982 sender: Some(stdin_sender),
983 output_broadcaster: Some(stdout_broadcaster.clone()),
984 _keep_alive_receiver: Some(_keep_alive_receiver),
985 input_queue: VecDeque::new(),
986 exit_callback: self.exit_callback.clone(),
987 },
988 );
989
990 self.pid = Some(pid);
991
992 Ok(pid)
993 }
994
995 /// Asynchronously retrieves a handle to a process for a given process ID (PID).
996 ///
997 /// This function checks if a process with the specified PID exists in a shared
998 /// process pool. If the process is found, it returns an `Option` containing a
999 /// `ProcessHandle` for the process. Otherwise, it returns `None`.
1000 ///
1001 /// # Arguments
1002 ///
1003 /// * `pid` - A 32-bit unsigned integer representing the process ID of the
1004 /// desired process.
1005 ///
1006 /// # Returns
1007 ///
1008 /// * `Some(ProcessHandle)` - If a process with the given PID is found in the
1009 /// process pool.
1010 /// * `None` - If the process does not exist in the process pool or if the
1011 /// process pool is uninitialized.
1012 ///
1013 /// # Example
1014 ///
1015 /// ```rust
1016 /// if let Some(handle) = get_process_by_pid(12345).await {
1017 /// println!("Process found with PID: {}", handle.pid);
1018 /// } else {
1019 /// println!("Process not found.");
1020 /// }
1021 /// ```
1022 ///
1023 /// # Errors
1024 ///
1025 /// This function will return `None` if the global process pool (`PROCESS_POOL`)
1026 /// has not been initialized or is unavailable.
1027 ///
1028 /// # Note
1029 ///
1030 /// This is an asynchronous function and must be awaited to complete its operation.
1031 pub async fn get_process_by_pid(pid: u32) -> Option<ProcessHandle> {
1032 let process_pool = PROCESS_POOL.get()?;
1033 let pool = process_pool.lock().await;
1034 if let Some(process) = pool.get(&pid) {
1035 if let Some(broadcaster) = &process.output_broadcaster {
1036 let receiver = broadcaster.subscribe();
1037 Some(ProcessHandle { pid, receiver })
1038 } else {
1039 None
1040 }
1041 } else {
1042 None
1043 }
1044 }
1045
1046 /// Asynchronously checks if a process identified by its `pid` is currently running.
1047 ///
1048 /// # Returns
1049 /// - `true` if the process with the stored `pid` is found in the `PROCESS_POOL`.
1050 /// - `false` if the stored `pid` is `None`, the `PROCESS_POOL` is not initialized,
1051 /// or the `pid` does not exist in the `PROCESS_POOL`.
1052 ///
1053 /// The function first checks if the `pid` instance variable is set. If so, it attempts
1054 /// to access the global `PROCESS_POOL`. If `PROCESS_POOL` is initialized, it acquires
1055 /// a lock and checks if the `pid` exists in the pool.
1056 ///
1057 /// # Examples
1058 /// ```rust
1059 /// let result = my_instance.is_process_running().await;
1060 /// if result {
1061 /// println!("The process is running.");
1062 /// } else {
1063 /// println!("The process is not running.");
1064 /// }
1065 /// ```
1066 ///
1067 /// # Async Behavior
1068 /// This function acquires an asynchronous lock on the `PROCESS_POOL` to safely access
1069 /// its contents and will yield if the lock is currently held elsewhere.
1070 ///
1071 /// # Panics
1072 /// This function may panic if the async lock on the `PROCESS_POOL` fails unexpectedly.
1073 ///
1074 /// # Dependencies
1075 /// - `PROCESS_POOL` should be a globally accessible and lazily initialized structure
1076 /// (e.g., using `once_cell` or similar patterns) that maintains a mapping of currently
1077 /// active processes.
1078 pub async fn is_process_running(&self) -> bool {
1079 if let Some(pid) = self.pid {
1080 if let Some(process_pool) = PROCESS_POOL.get() {
1081 let pool = process_pool.lock().await;
1082 return pool.contains_key(&pid);
1083 }
1084 }
1085 false
1086 }
1087}