tokio_interactive/lib.rs
1#![doc = include_str!("../README.md")]
2use anyhow::{Result, anyhow};
3use log::*;
4use serde::{Deserialize, Serialize};
5use std::collections::{HashMap, VecDeque};
6use std::path::PathBuf;
7use std::sync::{Arc, OnceLock};
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
9use tokio::process::Command;
10use tokio::sync::Mutex;
11use tokio::sync::mpsc::{Receiver, Sender}; // Change this line
12
13/// A static, lazily-initialized process pool used to manage asynchronous interactive processes.
14///
15/// This global variable utilizes the `OnceLock` to ensure it is initialized only once during
16/// the program's lifetime. It holds an `Arc<Mutex<HashMap<u32, AsynchronousInteractiveProcess>>>`
17/// to allow safe concurrent access and modification of the process pool.
18///
19/// - `OnceLock`: Provides thread-safe, one-time initialization of the process pool.
20/// - `Arc`: Ensures the `Mutex<HashMap<...>>` can be shared across threads safely.
21/// - `Mutex`: Protects the `HashMap` from concurrent modification, ensuring thread safety.
22/// - `HashMap<u32, AsynchronousInteractiveProcess>`: Maps process IDs (`u32`) to
23/// corresponding `AsynchronousInteractiveProcess` instances.
24///
25/// Usage:
26///
27/// The process pool is intended to store and manage asynchronous interactive processes by their IDs.
28/// Access needs to ensure proper locking with the `Mutex` guard to guarantee thread safety.
29///
30/// Example initialization:
31/// ```rust
32/// PROCESS_POOL.get_or_init(|| Arc::new(Mutex::new(HashMap::new())));
33/// ```
34///
35/// Example access:
36/// ```rust
37/// if let Some(pool) = PROCESS_POOL.get() {
38/// let mut guard = pool.lock().unwrap();
39/// guard.insert(process_id, new_process);
40/// }
41/// ```
42/// Ensure that any operations on the pool respect the locking mechanism provided by the `Mutex`.
43///
44/// Once initialized, `PROCESS_POOL` cannot be re-initialized or re-assigned.
45static PROCESS_POOL: OnceLock<Arc<Mutex<HashMap<u32, AsynchronousInteractiveProcess>>>> = OnceLock::new();
46
47/// Represents a handle to a process identified by its process ID (PID).
48///
49/// The `ProcessHandle` struct is used to encapsulate the process identifier (PID)
50/// of a running process. It is designed to be lightweight and can be cloned
51/// or debugged as required.
52///
53/// # Fields
54/// - `pid`: A `u32` value that represents the process ID of the targeted process.
55///
56/// # Traits
57/// - `Debug`: Allows instances of `ProcessHandle` to be formatted using the `{:?}` formatter
58/// for debugging purposes.
59/// - `Clone`: Allows the `ProcessHandle` to be cloned, creating a new instance with the
60/// same `pid`.
61///
62/// # Example
63/// ```
64/// let handle = ProcessHandle { pid: 1234 };
65/// println!("{:?}", handle); // Outputs: ProcessHandle { pid: 1234 }
66/// let cloned_handle = handle.clone();
67/// println!("{:?}", cloned_handle); // Outputs: ProcessHandle { pid: 1234 }
68/// ```
69#[derive(Debug, Clone)]
70pub struct ProcessHandle {
71 pid: u32,
72}
73
74/// `AsynchronousInteractiveProcess` is a structure that represents a non-blocking,
75/// interactive process. It provides metadata and mechanisms to interact with a
76/// process asynchronously using sender and receiver channels.
77///
78/// # Fields
79///
80/// * `pid` - An optional process ID (`pid`) of the interactive process. If the process
81/// is not yet started, this field will remain `None`.
82///
83/// * `filename` - The name of the executable file that represents the interactive process.
84/// This is required to identify and initiate the process.
85///
86/// * `arguments` - A vector containing the arguments to pass to the executable file
87/// when launching the process.
88///
89/// * `working_directory` - The directory where the interactive process will be executed.
90/// This is represented as a `PathBuf`.
91///
92/// * `sender` - An optional `Sender` channel used to send messages or data to the
93/// interactive process. This field is skipped during serialization and deserialization
94/// because it holds runtime-related data.
95///
96/// * `receiver` - An optional `Receiver` channel used to receive messages or data
97/// from the interactive process. This field is also skipped during serialization
98/// and deserialization for the same reasons as `sender`.
99///
100/// * `input_queue` - A deque (double-ended queue) that maintains an in-memory buffer
101/// of strings representing input for the interactive process. This field is skipped
102/// during serialization as it only contains transient runtime-related data.
103///
104/// # Trait Implementations
105///
106/// - `Debug`: Allows instances of this struct to be formatted and logged for debugging purposes.
107/// - `Serialize`: Makes the struct serializable, excluding fields marked with `#[serde(skip)]`.
108/// - `Deserialize`: Allows deserialization to create instances of this struct from serialized data.
109/// - `Default`: Provides a default implementation where optional fields are set to `None`,
110/// collections are empty, and `filename` is an empty string.
111///
112/// # Example
113/// ```rust
114/// use std::path::PathBuf;
115/// use std::collections::VecDeque;
116/// use serde::{Serialize, Deserialize};
117/// use crossbeam_channel::{Sender, Receiver};
118///
119/// let process = AsynchronousInteractiveProcess {
120/// pid: None,
121/// filename: "my_program".to_string(),
122/// arguments: vec!["--help".to_string()],
123/// working_directory: PathBuf::from("/path/to/dir"),
124/// sender: None,
125/// receiver: None,
126/// input_queue: VecDeque::new(),
127/// };
128///
129/// println!("{:?}", process);
130/// ```
131///
132/// This structure is ideal for scenarios where processes need to be controlled
133/// asynchronously with multiple input or output channels.
134#[derive(Debug, Serialize, Deserialize, Default)]
135pub struct AsynchronousInteractiveProcess {
136 pub pid: Option<u32>,
137 pub filename: String,
138 pub arguments: Vec<String>,
139 pub working_directory: PathBuf,
140 #[serde(skip)]
141 sender: Option<Sender<String>>,
142 #[serde(skip)]
143 receiver: Option<Receiver<String>>,
144 #[serde(skip)]
145 input_queue: VecDeque<String>,
146}
147
148impl ProcessHandle {
149 /// Receives an output message asynchronously from the process associated with the instance's `pid`
150 /// if available.
151 ///
152 /// This function attempts to fetch a message by accessing the process pool and checking for a
153 /// message in the associated process's receiver.
154 ///
155 /// The process follows these steps:
156 /// 1. Check if the global process pool (`PROCESS_POOL`) is initialized.
157 /// 2. Attempt to acquire the lock on the pool asynchronously.
158 /// 3. Look for the process associated with `self.pid`.
159 /// 4. If the process has a receiver channel, attempt to get a message using `try_recv`.
160 /// 5. If a message is found, return it as `Ok(Some(String))`.
161 /// 6. If no message is received, retry up to `MAX_RETRIES` times, with a delay of `RETRY_DELAY_MS`
162 /// milliseconds between each retry.
163 ///
164 /// If no message is received after all retries, the function returns `Ok(None)`.
165 ///
166 /// # Returns
167 ///
168 /// * `Ok(Some(String))` - If a message is successfully received from the receiver channel.
169 /// * `Ok(None)` - If no message is received after all retries.
170 /// * `Err(Error)` - If an error occurs during the process, such as:
171 /// - The process pool is not initialized
172 /// - The process with the specified PID is not found
173 /// - The receiver channel is not available
174 /// - The receiver channel is disconnected
175 ///
176 /// # Behavior
177 ///
178 /// - This function uses `tokio::time::sleep` for introducing delays between retries and works
179 /// asynchronously.
180 /// - The function uses constants `MAX_RETRIES` (10) and `RETRY_DELAY_MS` (10) to control the
181 /// retry behavior.
182 /// - The function properly propagates errors that occur during the process.
183 ///
184 /// # Example
185 ///
186 /// ```rust
187 /// match instance.receive_output().await {
188 /// Ok(Some(output)) => println!("Received output: {}", output),
189 /// Ok(None) => println!("No output received."),
190 /// Err(e) => eprintln!("Error receiving output: {}", e),
191 /// }
192 /// ```
193 ///
194 /// # Note
195 ///
196 /// - The function assumes the existence of a global process pool (`PROCESS_POOL`) which is safe
197 /// to access concurrently using mutex-based locking.
198 /// - The function makes use of `tokio::sync::Mutex` to handle concurrent executions across
199 /// asynchronous tasks.
200 /// Default timeout for receive_output in milliseconds
201 const DEFAULT_TIMEOUT_MS: u64 = 100;
202
203 /// Helper function to try receiving a message
204 async fn try_receive_message(
205 pid: u32,
206 pool: &mut tokio::sync::MutexGuard<'_, HashMap<u32, AsynchronousInteractiveProcess>>,
207 ) -> Result<Option<String>> {
208 if let Some(process) = pool.get_mut(&pid) {
209 if let Some(receiver) = &mut process.receiver {
210 match receiver.try_recv() {
211 Ok(msg) => return Ok(Some(msg)),
212 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {}
213 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
214 return Err(anyhow!("Channel disconnected for process {}", pid));
215 }
216 }
217 } else {
218 return Err(anyhow!("Receiver not available for process {}", pid));
219 }
220 } else {
221 return Err(anyhow!("Process {} not found", pid));
222 }
223 Ok(None)
224 }
225
226 pub async fn receive_output(&self) -> Result<Option<String>> {
227 // Use the default timeout
228 self.receive_output_with_timeout(std::time::Duration::from_millis(Self::DEFAULT_TIMEOUT_MS)).await
229 }
230
231 /// Receives an output message asynchronously from the process associated with the instance's `pid`
232 /// if available, with a specified timeout.
233 ///
234 /// This function is similar to `receive_output()` but allows specifying a custom timeout duration
235 /// instead of using the default timeout.
236 ///
237 /// # Arguments
238 ///
239 /// * `timeout` - A `std::time::Duration` specifying how long to wait for a message before giving up.
240 ///
241 /// # Returns
242 ///
243 /// * `Ok(Some(String))` - If a message is successfully received from the receiver channel.
244 /// * `Ok(None)` - If no message is received before the timeout expires.
245 /// * `Err(Error)` - If an error occurs during the process, such as:
246 /// - The process pool is not initialized
247 /// - The process with the specified PID is not found
248 /// - The receiver channel is not available
249 /// - The receiver channel is disconnected
250 ///
251 /// # Example
252 ///
253 /// ```rust
254 /// // Wait for up to 5 seconds for a message
255 /// match instance.receive_output_with_timeout(std::time::Duration::from_secs(5)).await {
256 /// Ok(Some(output)) => println!("Received output: {}", output),
257 /// Ok(None) => println!("No output received within timeout."),
258 /// Err(e) => eprintln!("Error receiving output: {}", e),
259 /// }
260 /// ```
261 pub async fn receive_output_with_timeout(&self, timeout: std::time::Duration) -> Result<Option<String>> {
262 // Define constants for retry parameters
263 const RETRY_DELAY_MS: u64 = 10;
264
265 if let Some(process_pool) = PROCESS_POOL.get() {
266 // Try to receive a message immediately
267 {
268 let mut pool = process_pool.lock().await;
269 if let Some(msg) = Self::try_receive_message(self.pid, &mut pool).await? {
270 return Ok(Some(msg));
271 }
272 }
273
274 // If no message is available, retry with delay until timeout is reached
275 let start_time = std::time::Instant::now();
276 while start_time.elapsed() < timeout {
277 tokio::time::sleep(tokio::time::Duration::from_millis(RETRY_DELAY_MS)).await;
278
279 let mut pool = process_pool.lock().await;
280 if let Some(msg) = Self::try_receive_message(self.pid, &mut pool).await? {
281 return Ok(Some(msg));
282 }
283 }
284
285 // No message after timeout
286 return Ok(None);
287 }
288
289 Err(anyhow!("Process pool not initialized"))
290 }
291
292 /// Sends the provided input to a process associated with this instance's PID asynchronously.
293 ///
294 /// # Arguments
295 ///
296 /// * `input` - An input of any type that can be converted into a `String`. This is the data
297 /// that will be sent to the respective process's input queue.
298 ///
299 /// # Returns
300 ///
301 /// * `Ok(())` if the input was successfully sent or queued for sending.
302 /// * `Err` - Returns an error in the following cases:
303 /// - If the process pool is not initialized.
304 /// - If the process associated with this PID is not found.
305 /// - If the process was not started or its sender channel is not available.
306 /// - If the sender channel is closed and input cannot be sent.
307 ///
308 /// # Details
309 ///
310 /// This function retrieves the process associated with the current instance's `pid`
311 /// from a global process pool. If a valid process is found:
312 /// - The input is sent via an asynchronous channel to the process.
313 /// - If the channel is full, the input is queued for later sending.
314 /// - If the channel is closed, an error is returned.
315 ///
316 /// The function also ensures thread safety by acquiring a lock on the process pool before attempting
317 /// any operations related to the process.
318 ///
319 /// # Errors
320 ///
321 /// This function propagates several potential issues as errors:
322 /// - If the process pool is uninitialized (`PROCESS_POOL.get()` returns `None`).
323 /// - If the process associated with the PID is missing in the pool.
324 /// - If the sender channel was never initialized or is unavailable.
325 /// - If the sender channel is closed and no new messages can be sent.
326 ///
327 /// # Example
328 ///
329 /// ```rust
330 /// use anyhow::Result;
331 ///
332 /// #[tokio::main]
333 /// async fn main() -> Result<()> {
334 /// let manager = ProcessManager::new(1); // Assumes a struct is managing process with ID 1.
335 /// manager.send_input("Some input").await?;
336 /// Ok(())
337 /// }
338 /// ```
339 ///
340 /// # Note
341 ///
342 /// This function expects a global process pool (`PROCESS_POOL`) to be properly initialized before being called.
343 /// Additionally, the associated process must have a valid `sender` channel to accept input.
344 pub async fn send_input(&self, input: impl Into<String>) -> Result<()> {
345 let input_str = input.into();
346 if let Some(process_pool) = PROCESS_POOL.get() {
347 let mut pool = process_pool.lock().await;
348 if let Some(process) = pool.get_mut(&self.pid) {
349 if let Some(sender) = &process.sender {
350 match sender.try_send(input_str.clone()) {
351 Ok(_) => Ok(()),
352 Err(e) => match e {
353 tokio::sync::mpsc::error::TrySendError::Full(_) => {
354 process.input_queue.push_back(input_str);
355 Ok(())
356 }
357 tokio::sync::mpsc::error::TrySendError::Closed(_) => {
358 Err(anyhow!("Failed to send input: channel closed"))
359 }
360 },
361 }
362 } else {
363 Err(anyhow!("Process not started or sender not available"))
364 }
365 } else {
366 Err(anyhow!("Process not found"))
367 }
368 } else {
369 Err(anyhow!("Process pool not initialized"))
370 }
371 }
372
373 /// Checks if the process associated with the current instance is running.
374 ///
375 /// This method attempts to determine if a process with the `pid` of the current instance
376 /// exists in the global `PROCESS_POOL`.
377 ///
378 /// # Returns
379 /// * `true` - If the process with the associated `pid` is currently present in the global process pool.
380 /// * `false` - If the process is not found in the global process pool or if the pool is not initialized.
381 ///
382 /// # Async Behavior
383 /// This method is asynchronous because it acquires a lock on the `PROCESS_POOL`.
384 ///
385 /// # Notes
386 /// - The `PROCESS_POOL` must be initialized before calling this function.
387 /// If `PROCESS_POOL` is not set, the function will return `false`.
388 /// - The `PROCESS_POOL` is expected to be a globally accessible, asynchronous, and thread-safe
389 /// data structure that tracks active processes.
390 ///
391 /// # Example
392 /// ```rust
393 /// let result = instance.is_process_running().await;
394 /// if result {
395 /// println!("Process is running.");
396 /// } else {
397 /// println!("Process is not running.");
398 /// }
399 /// ```
400 pub async fn is_process_running(&self) -> bool {
401 if let Some(process_pool) = PROCESS_POOL.get() {
402 let pool = process_pool.lock().await;
403 return pool.contains_key(&self.pid);
404 }
405 false
406 }
407
408 /// Asynchronously terminates a process identified by its `pid`.
409 ///
410 /// This method performs the following operations tailored to the target operating system:
411 ///
412 /// - **Windows**: Opens a handle to the process using its process ID (`pid`) and forcefully terminates it
413 /// using the `TerminateProcess` function from the WinAPI.
414 /// - **Linux**: Utilizes the `kill` system call with the `SIGKILL` signal to forcefully terminate the process.
415 ///
416 /// ## Platform-specific Notes:
417 ///
418 /// - On **Windows**, the process is identified and terminated using the `OpenProcess` and `TerminateProcess`
419 /// functions from the WinAPI.
420 /// - On **Linux**, the `kill` system call is used with the signal `SIGKILL` (9) to ensure the process is terminated.
421 ///
422 /// # Errors
423 ///
424 /// - Returns an error if the process termination fails (on Linux) due to system call errors or invalid process IDs.
425 /// On failure, the error contains details about the `pid` and the last OS error encountered.
426 ///
427 /// # Safety
428 ///
429 /// This method uses unsafe code blocks to interact with system APIs (`libc` on Linux, WinAPI on Windows). Ensure
430 /// that the provided `pid` corresponds to a valid process, and consider the implications of forcefully
431 /// terminating processes.
432 ///
433 /// # Example
434 ///
435 /// ```rust
436 /// let process_manager = SomeProcessManager::new(pid); // Example struct containing the pid
437 /// if let Err(e) = process_manager.kill().await {
438 /// eprintln!("Failed to terminate process: {}", e);
439 /// }
440 /// ```
441 /// Attempts to gracefully shut down the process, falling back to forceful termination if needed.
442 ///
443 /// This method first tries to gracefully shut down the process by:
444 /// - On Linux: Sending a SIGTERM signal
445 /// - On Windows: Sending a WM_CLOSE message to the main window
446 ///
447 /// If the process doesn't exit within the specified timeout, it will forcefully terminate
448 /// the process using the `kill()` method.
449 ///
450 /// # Arguments
451 ///
452 /// * `timeout` - A `std::time::Duration` specifying how long to wait for the process to exit
453 /// gracefully before forcefully terminating it.
454 ///
455 /// # Returns
456 ///
457 /// * `Ok(())` - If the process was successfully shut down (either gracefully or forcefully).
458 /// * `Err(Error)` - If an error occurred during the shutdown process.
459 ///
460 /// # Example
461 ///
462 /// ```rust
463 /// // Try to shut down gracefully, waiting up to 5 seconds before force killing
464 /// if let Err(e) = process.shutdown(std::time::Duration::from_secs(5)).await {
465 /// eprintln!("Failed to shut down process: {}", e);
466 /// }
467 /// ```
468 pub async fn shutdown(&self, timeout: std::time::Duration) -> Result<()> {
469 // First, try to gracefully shut down the process
470 let graceful_shutdown_result = self.graceful_shutdown().await;
471
472 // If graceful shutdown failed or isn't implemented for this platform, log it but continue
473 if let Err(e) = &graceful_shutdown_result {
474 debug!("Graceful shutdown attempt failed: {}", e);
475 return self.kill().await;
476 }
477
478 // Wait for the process to exit for the specified timeout
479 let start_time = std::time::Instant::now();
480 while start_time.elapsed() < timeout {
481 if !self.is_process_running().await {
482 return Ok(());
483 }
484 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
485 }
486
487 // If we're here, the process didn't exit gracefully within the timeout
488 // Fall back to forceful termination
489 debug!("Process did not exit gracefully within timeout, forcing termination");
490 self.kill().await
491 }
492
493 /// Attempts to gracefully shut down the process without forceful termination.
494 ///
495 /// This method is platform-specific:
496 /// - On Linux: Sends a SIGTERM signal to request graceful termination
497 /// - On Windows: First tries to send a Ctrl+C event to the process using GenerateConsoleCtrlEvent.
498 /// If that fails, it attempts to send an "exit" command to the process via stdin.
499 ///
500 /// # Returns
501 ///
502 /// * `Ok(())` - If the graceful shutdown signal was successfully sent.
503 /// * `Err(Error)` - If an error occurred while sending the graceful shutdown signal.
504 ///
505 /// # Notes
506 ///
507 /// - A successful return doesn't guarantee the process will actually exit.
508 /// Use `shutdown()` with a timeout to ensure the process exits.
509 /// - On Windows, the GenerateConsoleCtrlEvent function works with process groups, not individual
510 /// processes, so it may not work for all processes. The fallback "exit" command approach
511 /// works for many command-line applications that accept such commands.
512 /// - For GUI applications on Windows, neither approach may work. In such cases, the `shutdown()`
513 /// method will fall back to forceful termination after the timeout.
514 async fn graceful_shutdown(&self) -> Result<()> {
515 #[cfg(target_os = "windows")]
516 {
517 unsafe {
518 // First, try to send Ctrl+C event to the process
519 // CTRL_C_EVENT = 0, CTRL_BREAK_EVENT = 1
520 // Note: GenerateConsoleCtrlEvent works with process groups, not individual processes
521 // For console applications, we can try sending Ctrl+C to the process
522 let result = winapi::um::wincon::GenerateConsoleCtrlEvent(0, self.pid);
523 if result == 0 {
524 return Err(anyhow!(
525 "Failed to send Ctrl+C to process {}: {}",
526 self.pid,
527 std::io::Error::last_os_error()
528 ));
529 }
530 }
531 return Ok(());
532 }
533
534 #[cfg(target_os = "linux")]
535 {
536 unsafe {
537 // Use the kill system call with SIGTERM (15) to request graceful termination
538 let result = libc::kill(self.pid as libc::pid_t, libc::SIGTERM);
539 if result != 0 {
540 return Err(anyhow!(
541 "Failed to send SIGTERM to process {}: {}",
542 self.pid,
543 std::io::Error::last_os_error()
544 ));
545 }
546 }
547 return Ok(());
548 }
549
550 #[cfg(not(any(target_os = "windows", target_os = "linux")))]
551 {
552 return Err(anyhow!("Graceful shutdown not implemented for this platform"));
553 }
554 }
555
556 /// Forcefully terminates the process immediately.
557 ///
558 /// This method should be used as a last resort when a process needs to be terminated
559 /// immediately. For a more graceful approach, consider using `shutdown()` first.
560 ///
561 /// # Returns
562 ///
563 /// * `Ok(())` - If the process was successfully terminated.
564 /// * `Err(Error)` - If an error occurred during the termination process.
565 ///
566 /// # Example
567 ///
568 /// ```rust
569 /// if let Err(e) = process.kill().await {
570 /// eprintln!("Failed to terminate process: {}", e);
571 /// }
572 /// ```
573 pub async fn kill(&self) -> Result<()> {
574 #[cfg(target_os = "windows")]
575 {
576 unsafe {
577 // PROCESS_TERMINATE (0x00010000) access right is required to terminate a process
578 let handle = winapi::um::processthreadsapi::OpenProcess(0x00010000, 0, self.pid);
579 if handle.is_null() {
580 return Err(anyhow!("Failed to open process {}: {}", self.pid, std::io::Error::last_os_error()));
581 }
582
583 // Attempt to terminate the process
584 let result = winapi::um::processthreadsapi::TerminateProcess(handle, 0);
585
586 // Always close the handle to prevent resource leaks
587 let close_result = winapi::um::handleapi::CloseHandle(handle);
588
589 // Check if termination was successful
590 if result == 0 {
591 return Err(anyhow!(
592 "Failed to terminate process {}: {}",
593 self.pid,
594 std::io::Error::last_os_error()
595 ));
596 }
597
598 // Check if handle was closed successfully
599 if close_result == 0 {
600 warn!(
601 "Failed to close process handle for process {}: {}",
602 self.pid,
603 std::io::Error::last_os_error()
604 );
605 // We don't return an error here as the process was terminated successfully
606 }
607 }
608 }
609 #[cfg(target_os = "linux")]
610 {
611 unsafe {
612 // Use the kill system call with SIGKILL (9) to forcefully terminate the process
613 let result = libc::kill(self.pid as libc::pid_t, libc::SIGKILL);
614 if result != 0 {
615 return Err(anyhow!("Failed to kill process {}: {}", self.pid, std::io::Error::last_os_error()));
616 }
617 }
618 }
619
620 Ok(())
621 }
622}
623
624impl AsynchronousInteractiveProcess {
625 /// Creates a new instance of the struct with default values.
626 ///
627 /// # Arguments
628 ///
629 /// * `filename` - A value that can be converted into a `String`. This typically represents the
630 /// name or path of the file associated with the instance.
631 ///
632 /// # Returns
633 ///
634 /// Returns a new instance of the struct populated with default fields:
635 /// - `pid`: `None` (indicating no process ID is associated yet).
636 /// - `filename`: The provided `filename` converted into a `String`.
637 /// - `arguments`: An empty vector, representing no initial arguments.
638 /// - `working_directory`: Set to the current directory (`"./"`).
639 /// - `sender`: `None`, indicating no sender is associated initially.
640 /// - `receiver`: `None`, indicating no receiver is associated initially.
641 /// - `input_queue`: An empty `VecDeque`, representing no items in the input queue.
642 ///
643 /// # Example
644 ///
645 /// ```
646 /// let instance = MyStruct::new("example.txt");
647 /// assert_eq!(instance.filename, "example.txt");
648 /// assert!(instance.pid.is_none());
649 /// assert!(instance.arguments.is_empty());
650 /// assert_eq!(instance.working_directory, PathBuf::from("./"));
651 /// assert!(instance.sender.is_none());
652 /// assert!(instance.receiver.is_none());
653 /// assert!(instance.input_queue.is_empty());
654 /// ```
655 pub fn new(filename: impl Into<String>) -> Self {
656 Self {
657 pid: None,
658 filename: filename.into(),
659 arguments: Vec::new(),
660 working_directory: PathBuf::from("./"),
661 sender: None,
662 receiver: None,
663 input_queue: VecDeque::new(),
664 }
665 }
666
667 /// Sets the arguments for the current instance by converting a vector of items
668 /// implementing `Into<String>` into a `Vec<String>`.
669 ///
670 /// # Parameters
671 /// - `args`: A `Vec` containing items that implement the `Into<String>` trait. These
672 /// items will be converted into `String` and used to set the `arguments` field
673 /// of the instance.
674 ///
675 /// # Returns
676 /// - `Self`: The current instance (`self`) after updating its `arguments` field.
677 ///
678 /// # Example
679 /// ```rust
680 /// let instance = MyStruct::new()
681 /// .with_arguments(vec!["arg1", "arg2", String::from("arg3")]);
682 /// ```
683 ///
684 /// This method allows chaining, as it returns the updated instance
685 /// after setting the `arguments` field.
686 pub fn with_arguments(mut self, args: Vec<impl Into<String>>) -> Self {
687 self.arguments = args.into_iter().map(|arg| arg.into()).collect();
688 self
689 }
690
691 /// Adds an argument to the `arguments` vector and returns the modified instance.
692 ///
693 /// # Parameters
694 ///
695 /// * `arg` - A value that implements the `Into<String>` trait, which will be converted into a `String`
696 /// and added to the `arguments` vector.
697 ///
698 /// # Returns
699 ///
700 /// Returns the modified instance of the implementing struct (`Self`) with the new argument added.
701 ///
702 /// # Example
703 ///
704 /// ```rust
705 /// let instance = SomeStruct::new().with_argument("example");
706 /// ```
707 ///
708 /// This adds the string `"example"` to the `arguments` vector of `SomeStruct`.
709 pub fn with_argument(mut self, arg: impl Into<String>) -> Self {
710 self.arguments.push(arg.into());
711 self
712 }
713
714 /// Sets the working directory for the instance and returns the modified instance.
715 ///
716 /// # Arguments
717 ///
718 /// * `dir` - A value that can be converted into a `PathBuf`, representing the desired working directory.
719 ///
720 /// # Returns
721 ///
722 /// The instance of the struct with the updated working directory.
723 ///
724 /// # Example
725 ///
726 /// ```rust
727 /// let instance = MyStruct::new()
728 /// .with_working_directory("/path/to/directory");
729 /// ```
730 pub fn with_working_directory(mut self, dir: impl Into<PathBuf>) -> Self {
731 self.working_directory = dir.into();
732 self
733 }
734
735 /// Starts a new process based on the configuration stored in the struct, manages
736 /// its I/O streams asynchronously, and tracks its lifecycle in a shared process pool.
737 ///
738 /// # Returns
739 /// - `Ok<u32>`: Returns the process ID (PID) if the process starts successfully.
740 /// - `Err(std::io::Error)`: Returns an error if any part of the process start operation fails.
741 ///
742 /// # Process Configuration
743 /// - The process is launched using the executable specified in `self.filename`.
744 /// - Command-line arguments are passed via `self.arguments`.
745 /// - The process will run in the directory specified in `self.working_directory`.
746 ///
747 /// # Standard I/O Management
748 /// - The process's `stdin` is assigned a `tokio::sync::mpsc::channel` for communication.
749 /// - `stdout` and `stderr` are read asynchronously. Each line from these streams is sent
750 /// to an `mpsc::channel`, where `stdout`/`stderr` data can be consumed.
751 ///
752 /// # Shared Process Pool
753 /// - The process metadata, including its PID and I/O channels, is stored in a global,
754 /// thread-safe `PROCESS_POOL`.
755 /// - The process pool is managed using a `tokio::sync::Mutex` and stores processes in a
756 /// `HashMap` with their PID as the key.
757 ///
758 /// # Asynchronous I/O
759 /// - A background task is spawned to write data from an internal input queue to the process's
760 /// `stdin`.
761 /// - Another background task continuously reads and forwards `stdout` and `stderr` messages
762 /// from the process.
763 /// - If the process exits or encounters an error, its PID and metadata are removed from the
764 /// process pool.
765 ///
766 /// # Example Use Case
767 /// ```no_run
768 /// let mut my_process = MyProcess {
769 /// filename: "my_program".to_string(),
770 /// arguments: vec!["--arg1", "value1".to_string()],
771 /// working_directory: "/path/to/dir".to_string(),
772 /// pid: None,
773 /// sender: None,
774 /// receiver: None,
775 /// input_queue: VecDeque::new(),
776 /// };
777 ///
778 /// let pid = my_process.start().await.unwrap();
779 /// println!("Started process with PID: {}", pid);
780 /// ```
781 ///
782 /// # Notes
783 /// - The process's `stdin`, `stdout`, and `stderr` are piped to capture and manage
784 /// communication asynchronously.
785 /// - Each spawned background task is independently responsible for managing a specific
786 /// stream or part of the process lifecycle.
787 /// - Errors during I/O operations or spawning the process are logged using the `log` crate
788 /// macros (e.g., `error!`, `debug!`).
789 ///
790 /// # Potential Errors
791 /// - Failure to spawn the process (e.g., if `self.filename` is invalid or inaccessible).
792 /// - Errors during communication with the process's I/O streams (e.g., writing to a closed `stdin`).
793 /// - Mutex locking failure on the shared process pool due to an internal inconsistency.
794 pub async fn start(&mut self) -> Result<u32> {
795 let mut command = Command::new(&self.filename);
796 command.args(&self.arguments);
797 command.current_dir(&self.working_directory);
798
799 command.stdin(std::process::Stdio::piped());
800 command.stdout(std::process::Stdio::piped());
801 command.stderr(std::process::Stdio::piped());
802
803 let mut child = command.spawn()?;
804 let pid = child.id().unwrap_or(0);
805
806 let (stdin_sender, mut stdin_receiver) = tokio::sync::mpsc::channel::<String>(100);
807 let (stdout_sender, stdout_receiver) = tokio::sync::mpsc::channel::<String>(100);
808
809 if let Some(mut stdin) = child.stdin.take() {
810 tokio::spawn(async move {
811 while let Some(input) = stdin_receiver.recv().await {
812 if let Err(e) = stdin.write_all(input.as_bytes()).await {
813 error!("Failed to write to process stdin: {}", e);
814 break;
815 }
816 if let Err(e) = stdin.write_all(b"\n").await {
817 error!("Failed to write newline to process stdin: {}", e);
818 break;
819 }
820 if let Err(e) = stdin.flush().await {
821 error!("Failed to flush process stdin: {}", e);
822 break;
823 }
824 }
825 });
826 }
827
828 if let Some(stdout) = child.stdout.take() {
829 let stdout_sender_clone = stdout_sender.clone();
830 tokio::spawn(async move {
831 let mut reader = BufReader::new(stdout);
832 let mut line = String::new();
833 while let Ok(bytes_read) = reader.read_line(&mut line).await {
834 if bytes_read == 0 {
835 break;
836 }
837 if let Err(e) = stdout_sender_clone.send(line.trim_end().to_string()).await {
838 error!("Failed to send stdout message: {}", e);
839 break;
840 }
841 line.clear();
842 }
843 });
844 }
845
846 if let Some(stderr) = child.stderr.take() {
847 let stderr_sender = stdout_sender.clone();
848 tokio::spawn(async move {
849 let mut reader = BufReader::new(stderr);
850 let mut line = String::new();
851 while let Ok(bytes_read) = reader.read_line(&mut line).await {
852 if bytes_read == 0 {
853 break;
854 }
855 if let Err(e) = stderr_sender.send(format!("STDERR: {}", line.trim_end())).await {
856 error!("Failed to send stderr message: {}", e);
857 break;
858 }
859 line.clear();
860 }
861 });
862 }
863
864 let queue_pid = pid;
865 tokio::spawn(async move {
866 loop {
867 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
868
869 let process_pool = match PROCESS_POOL.get() {
870 Some(pool) => pool,
871 None => break,
872 };
873
874 let mut pool = process_pool.lock().await;
875
876 let process = match pool.get_mut(&queue_pid) {
877 Some(p) => p,
878 None => break,
879 };
880
881 while let Some(input) = process.input_queue.pop_front() {
882 if let Some(sender) = &process.sender {
883 if let Err(_) = sender.try_send(input.clone()) {
884 process.input_queue.push_front(input);
885 break;
886 }
887 } else {
888 process.input_queue.clear();
889 break;
890 }
891 }
892
893 drop(pool);
894 }
895 });
896
897 tokio::spawn(async move {
898 if let Err(e) = child.wait().await {
899 error!("Process {} exited with error: {}", pid, e);
900 } else {
901 debug!("Process {} exited successfully", pid);
902 }
903
904 if let Some(process_pool) = PROCESS_POOL.get() {
905 let mut pool = process_pool.lock().await;
906 pool.remove(&pid);
907 debug!("Process {} has exited and been removed from the pool", pid);
908 }
909 });
910
911 let process_pool = PROCESS_POOL.get_or_init(|| Arc::new(Mutex::new(HashMap::new())));
912
913 let mut pool = process_pool.lock().await;
914 pool.insert(
915 pid,
916 Self {
917 pid: Some(pid),
918 filename: self.filename.clone(),
919 arguments: self.arguments.clone(),
920 working_directory: self.working_directory.clone(),
921 sender: Some(stdin_sender),
922 receiver: Some(stdout_receiver),
923 input_queue: VecDeque::new(),
924 },
925 );
926
927 self.pid = Some(pid);
928
929 Ok(pid)
930 }
931
932 /// Asynchronously retrieves a handle to a process for a given process ID (PID).
933 ///
934 /// This function checks if a process with the specified PID exists in a shared
935 /// process pool. If the process is found, it returns an `Option` containing a
936 /// `ProcessHandle` for the process. Otherwise, it returns `None`.
937 ///
938 /// # Arguments
939 ///
940 /// * `pid` - A 32-bit unsigned integer representing the process ID of the
941 /// desired process.
942 ///
943 /// # Returns
944 ///
945 /// * `Some(ProcessHandle)` - If a process with the given PID is found in the
946 /// process pool.
947 /// * `None` - If the process does not exist in the process pool or if the
948 /// process pool is uninitialized.
949 ///
950 /// # Example
951 ///
952 /// ```rust
953 /// if let Some(handle) = get_process_by_pid(12345).await {
954 /// println!("Process found with PID: {}", handle.pid);
955 /// } else {
956 /// println!("Process not found.");
957 /// }
958 /// ```
959 ///
960 /// # Errors
961 ///
962 /// This function will return `None` if the global process pool (`PROCESS_POOL`)
963 /// has not been initialized or is unavailable.
964 ///
965 /// # Note
966 ///
967 /// This is an asynchronous function and must be awaited to complete its operation.
968 pub async fn get_process_by_pid(pid: u32) -> Option<ProcessHandle> {
969 let process_pool = PROCESS_POOL.get()?;
970 let pool = process_pool.lock().await;
971 if pool.contains_key(&pid) { Some(ProcessHandle { pid }) } else { None }
972 }
973
974 /// Asynchronously checks if a process identified by its `pid` is currently running.
975 ///
976 /// # Returns
977 /// - `true` if the process with the stored `pid` is found in the `PROCESS_POOL`.
978 /// - `false` if the stored `pid` is `None`, the `PROCESS_POOL` is not initialized,
979 /// or the `pid` does not exist in the `PROCESS_POOL`.
980 ///
981 /// The function first checks if the `pid` instance variable is set. If so, it attempts
982 /// to access the global `PROCESS_POOL`. If `PROCESS_POOL` is initialized, it acquires
983 /// a lock and checks if the `pid` exists in the pool.
984 ///
985 /// # Examples
986 /// ```rust
987 /// let result = my_instance.is_process_running().await;
988 /// if result {
989 /// println!("The process is running.");
990 /// } else {
991 /// println!("The process is not running.");
992 /// }
993 /// ```
994 ///
995 /// # Async Behavior
996 /// This function acquires an asynchronous lock on the `PROCESS_POOL` to safely access
997 /// its contents and will yield if the lock is currently held elsewhere.
998 ///
999 /// # Panics
1000 /// This function may panic if the async lock on the `PROCESS_POOL` fails unexpectedly.
1001 ///
1002 /// # Dependencies
1003 /// - `PROCESS_POOL` should be a globally accessible and lazily initialized structure
1004 /// (e.g., using `once_cell` or similar patterns) that maintains a mapping of currently
1005 /// active processes.
1006 pub async fn is_process_running(&self) -> bool {
1007 if let Some(pid) = self.pid {
1008 if let Some(process_pool) = PROCESS_POOL.get() {
1009 let pool = process_pool.lock().await;
1010 return pool.contains_key(&pid);
1011 }
1012 }
1013 false
1014 }
1015}