tcrm_task/tasks/tokio/
executor.rs

1use std::{process::Stdio, sync::Arc};
2
3use tokio::{
4    process::{ChildStdin, Command},
5    sync::{mpsc, oneshot},
6};
7
8use crate::tasks::{
9    config::TaskConfig,
10    error::TaskError,
11    event::{TaskEvent, TaskStopReason, TaskTerminateReason},
12    state::TaskState,
13    tokio::context::TaskExecutorContext,
14};
15
16/// Task executor for managing process lifecycle
17///
18/// `TaskExecutor` is the main entry point for executing system processes with real-time
19/// event monitoring, timeout management, and cross-platform process control.
20/// It coordinates process spawning, I/O handling, and termination through an event-driven
21/// architecture built on tokio.
22///
23/// # Examples
24///
25/// ## Basic Process Execution
26/// ```rust
27/// use tcrm_task::tasks::{config::TaskConfig, tokio::executor::TaskExecutor};
28/// use tokio::sync::mpsc;
29///
30/// #[tokio::main]
31/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
32///     #[cfg(windows)]
33///     let config = TaskConfig::new("cmd").args(["/C", "echo", "Hello, World!"]);
34///     #[cfg(unix)]
35///     let config = TaskConfig::new("echo").args(["Hello, World!"]);
36///     
37///     config.validate()?;
38///     
39///     let (tx, mut rx) = mpsc::channel(100);
40///     let mut executor = TaskExecutor::new(config, tx);
41///     
42///     executor.coordinate_start().await?;
43///     
44///     while let Some(event) = rx.recv().await {
45///         match event {
46///             tcrm_task::tasks::event::TaskEvent::Output { line, .. } => {
47///                 println!("Output: {}", line);
48///             }
49///             tcrm_task::tasks::event::TaskEvent::Stopped { .. } => break,
50///             _ => {}
51///         }
52///     }
53///     
54///     Ok(())
55/// }
56/// ```
57///
58/// ## Process with Timeout and Termination
59/// ```rust
60/// use tcrm_task::tasks::{
61///     config::TaskConfig,
62///     tokio::executor::TaskExecutor,
63///     control::TaskControl,
64///     event::TaskTerminateReason
65/// };
66/// use tokio::sync::mpsc;
67///
68/// #[tokio::main]
69/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
70///     #[cfg(windows)]
71///     let config = TaskConfig::new("cmd")
72///         .args(["/C", "timeout", "/t", "10"])
73///         .timeout_ms(5000);
74///     #[cfg(unix)]
75///     let config = TaskConfig::new("sleep")
76///         .args(["10"])
77///         .timeout_ms(5000);
78///     
79///     let (tx, mut rx) = mpsc::channel(100);
80///     let mut executor = TaskExecutor::new(config, tx);
81///     
82///     executor.coordinate_start().await?;
83///     
84///     // Terminate after 2 seconds
85///     tokio::spawn(async move {
86///         tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
87///         let _ = executor.terminate_task(TaskTerminateReason::UserRequested);
88///     });
89///     
90///     while let Some(event) = rx.recv().await {
91///         if matches!(event, tcrm_task::tasks::event::TaskEvent::Stopped { .. }) {
92///             break;
93///         }
94///     }
95///     
96///     Ok(())
97/// }
98/// ```
99#[derive(Debug)]
100pub struct TaskExecutor {
101    pub(crate) shared_context: Arc<TaskExecutorContext>,
102    pub(crate) stdin: Option<ChildStdin>,
103    pub(crate) terminate_tx: Option<oneshot::Sender<TaskTerminateReason>>,
104}
105
106impl TaskExecutor {
107    /// Create a new task executor with the given configuration
108    ///
109    /// # Arguments
110    ///
111    /// * `config` - Validated task configuration containing command, arguments, and options
112    /// * `event_tx` - Channel for sending task events
113    ///
114    /// # Examples
115    ///
116    /// ```rust
117    /// use tcrm_task::tasks::{config::TaskConfig, tokio::executor::TaskExecutor};
118    /// use tokio::sync::mpsc;
119    ///
120    /// #[cfg(windows)]
121    /// let config = TaskConfig::new("cmd").args(["/C", "dir"]);
122    /// #[cfg(unix)]
123    /// let config = TaskConfig::new("ls").args(["-la"]);
124    ///
125    /// let (tx, _rx) = mpsc::channel(100);
126    /// let executor = TaskExecutor::new(config, tx);
127    /// ```
128    pub fn new(config: TaskConfig, event_tx: mpsc::Sender<TaskEvent>) -> Self {
129        Self {
130            shared_context: Arc::new(TaskExecutorContext::new(config, event_tx)),
131            stdin: None,
132            terminate_tx: None,
133        }
134    }
135
136    /// Validates the task configuration before execution.
137    ///
138    /// Checks if the task configuration is valid and sends appropriate
139    /// events if validation fails.
140    ///
141    /// # Arguments
142    ///
143    /// * `event_tx` - Channel for sending task events
144    ///
145    /// # Returns
146    ///
147    /// * `Ok(())` - If configuration is valid
148    /// * `Err(TaskError)` - If configuration validation fails
149    ///
150    /// # Errors
151    ///
152    /// Returns [`TaskError::InvalidConfiguration`] if configuration is invalid
153    pub(crate) async fn validate_config(
154        &mut self,
155        event_tx: &mpsc::Sender<TaskEvent>,
156    ) -> Result<(), TaskError> {
157        match self.shared_context.config.validate() {
158            Ok(()) => Ok(()),
159            Err(e) => {
160                #[cfg(feature = "tracing")]
161                tracing::error!(error = %e, "Invalid task configuration");
162
163                let time = Self::update_state(&self.shared_context, TaskState::Finished);
164                let error_event = TaskEvent::Error { error: e.clone() };
165                Self::send_event(event_tx, error_event).await;
166
167                let finish_event = TaskEvent::Stopped {
168                    exit_code: None,
169                    finished_at: time,
170                    reason: TaskStopReason::Error(e.clone()),
171                    #[cfg(unix)]
172                    signal: None,
173                };
174                Self::send_event(event_tx, finish_event).await;
175
176                return Err(e);
177            }
178        }
179    }
180    /// Setup a command for execution based on the task configuration.
181    ///
182    /// Creates a tokio Command with all the configured parameters in TaskConfig.
183    ///
184    /// # Returns
185    ///
186    /// A configured `tokio::process::Command` ready for spawning
187    pub(crate) fn setup_command(&self) -> Command {
188        let mut cmd = Command::new(&self.shared_context.config.command);
189
190        cmd.kill_on_drop(true);
191
192        // Setup additional arguments
193        if let Some(args) = &self.shared_context.config.args {
194            cmd.args(args);
195        }
196
197        // Setup working directory with validation
198        if let Some(dir) = &self.shared_context.config.working_dir {
199            cmd.current_dir(dir);
200        }
201
202        // Setup environment variables
203        if let Some(envs) = &self.shared_context.config.env {
204            cmd.envs(envs);
205        }
206
207        // Setup stdio
208        cmd.stdout(Stdio::piped()).stderr(Stdio::piped()).stdin(
209            if self.shared_context.config.enable_stdin.unwrap_or_default() {
210                Stdio::piped()
211            } else {
212                Stdio::null()
213            },
214        );
215        cmd
216    }
217
218    /// Configures whether to drop (close) the event channel when the task finishes.
219    ///
220    /// By default, the event channel (`event_tx`) is dropped when the task finishes,
221    /// signaling to receivers that no more events will be sent.
222    /// This method allows you to override that behavior,
223    /// which is useful if you want to keep the event channel open for multiple tasks
224    /// or for manual control.
225    ///
226    /// # Arguments
227    ///
228    /// * `drop` - If `true`, the event channel will be dropped when the task finishes (default behavior).
229    ///            If `false`, the event channel will remain open after task completion.
230    ///
231    /// # Example
232    ///
233    /// ```rust
234    /// # use tcrm_task::tasks::{config::TaskConfig, tokio::executor::TaskExecutor};
235    /// # use tokio::sync::mpsc;
236    /// # let config = TaskConfig::new("echo");
237    /// # let (tx, _rx) = mpsc::channel(100);
238    /// # let executor = TaskExecutor::new(config, tx);
239    /// executor.set_drop_event_tx_on_finished(false); // Keep event channel open after task finishes
240    /// ```
241    pub fn set_drop_event_tx_on_finished(&self, drop: bool) {
242        self.shared_context.set_drop_event_tx_on_finished(drop);
243    }
244}