tcrm_task/tasks/tokio/
executor.rs

1use std::{process::Stdio, sync::Arc};
2
3use tokio::{
4    process::{ChildStdin, Command},
5    sync::{mpsc, oneshot},
6};
7
8use crate::tasks::{
9    config::TaskConfig,
10    error::TaskError,
11    event::{TaskEvent, TaskEventEnvelope, TaskStopReason, TaskTerminateReason},
12    state::TaskState,
13    tokio::context::TaskExecutorContext,
14};
15
16/// Task executor for managing process lifecycle
17///
18/// `TaskExecutor` is the main entry point for executing system processes with real-time
19/// event monitoring, timeout management, and cross-platform process control.
20/// It coordinates process spawning, I/O handling, and termination through an event-driven
21/// architecture built on tokio.
22///
23/// # Examples
24///
25/// ## Basic Process Execution
26/// ```rust
27/// use tcrm_task::tasks::{config::TaskConfig, tokio::executor::TaskExecutor};
28/// use tokio::sync::mpsc;
29///
30/// #[tokio::main]
31/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
32///     #[cfg(windows)]
33///     let config = TaskConfig::new("cmd").args(["/C", "echo", "Hello, World!"]);
34///     #[cfg(unix)]
35///     let config = TaskConfig::new("echo").args(["Hello, World!"]);
36///     
37///     config.validate()?;
38///     
39///     let (tx, mut rx) = mpsc::channel(100);
40///     let mut executor = TaskExecutor::new(config, tx);
41///     
42///     executor.coordinate_start().await?;
43///     
44///     while let Some(envelope) = rx.recv().await {
45///         match envelope.event {
46///             tcrm_task::tasks::event::TaskEvent::Output { line, .. } => {
47///                 println!("Output: {}", line);
48///             }
49///             tcrm_task::tasks::event::TaskEvent::Stopped { .. } => break,
50///             _ => {}
51///         }
52///     }
53///     
54///     Ok(())
55/// }
56/// ```
57///
58/// ## Process with Timeout and Termination
59/// ```rust
60/// use tcrm_task::tasks::{
61///     config::TaskConfig,
62///     tokio::executor::TaskExecutor,
63///     control::TaskControl,
64///     event::TaskTerminateReason
65/// };
66/// use tokio::sync::mpsc;
67///
68/// #[tokio::main]
69/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
70///     #[cfg(windows)]
71///     let config = TaskConfig::new("cmd")
72///         .args(["/C", "timeout", "/t", "10"])
73///         .timeout_ms(5000);
74///     #[cfg(unix)]
75///     let config = TaskConfig::new("sleep")
76///         .args(["10"])
77///         .timeout_ms(5000);
78///     
79///     let (tx, mut rx) = mpsc::channel(100);
80///     let mut executor = TaskExecutor::new(config, tx);
81///     
82///     executor.coordinate_start().await?;
83///     
84///     // Terminate after 2 seconds
85///     tokio::spawn(async move {
86///         tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
87///         let _ = executor.terminate_task(TaskTerminateReason::UserRequested);
88///     });
89///     
90///     while let Some(envelope) = rx.recv().await {
91///         if matches!(envelope.event, tcrm_task::tasks::event::TaskEvent::Stopped { .. }) {
92///             break;
93///         }
94///     }
95///     
96///     Ok(())
97/// }
98/// ```
99#[derive(Debug)]
100pub struct TaskExecutor {
101    pub(crate) shared_context: Arc<TaskExecutorContext>,
102    pub(crate) stdin: Option<ChildStdin>,
103    pub(crate) terminate_tx: Option<oneshot::Sender<TaskTerminateReason>>,
104}
105
106impl TaskExecutor {
107    /// Create a new task executor with the given configuration
108    ///
109    /// # Arguments
110    ///
111    /// * `config` - Validated task configuration containing command, arguments, and options
112    /// * `event_tx` - Channel for sending task events
113    ///
114    /// # Examples
115    ///
116    /// ```rust
117    /// use tcrm_task::tasks::{config::TaskConfig, tokio::executor::TaskExecutor};
118    /// use tokio::sync::mpsc;
119    ///
120    /// #[cfg(windows)]
121    /// let config = TaskConfig::new("cmd").args(["/C", "dir"]);
122    /// #[cfg(unix)]
123    /// let config = TaskConfig::new("ls").args(["-la"]);
124    ///
125    /// let (tx, _rx) = mpsc::channel(100);
126    /// let executor = TaskExecutor::new(config, tx);
127    /// ```
128    pub fn new(config: TaskConfig, event_tx: mpsc::Sender<TaskEventEnvelope>) -> Self {
129        Self {
130            shared_context: Arc::new(TaskExecutorContext::new(config, event_tx)),
131            stdin: None,
132            terminate_tx: None,
133        }
134    }
135
136    /// Validates the task configuration before execution.
137    ///
138    /// Checks if the task configuration is valid and sends appropriate
139    /// events if validation fails.
140    ///
141    /// # Arguments
142    ///
143    /// * `event_tx` - Channel for sending task events
144    ///
145    /// # Returns
146    ///
147    /// * `Ok(())` - If configuration is valid
148    /// * `Err(TaskError)` - If configuration validation fails
149    ///
150    /// # Errors
151    ///
152    /// Returns [`TaskError::InvalidConfiguration`] if configuration is invalid
153    pub(crate) async fn validate_config(&mut self) -> Result<(), TaskError> {
154        match self.shared_context.config.validate() {
155            Ok(()) => Ok(()),
156            Err(e) => {
157                #[cfg(feature = "tracing")]
158                tracing::error!(error = %e, "Invalid task configuration");
159
160                let time = Self::update_state(&self.shared_context, TaskState::Finished);
161                let error_event = TaskEvent::Error { error: e.clone() };
162                let _ = self.shared_context.send_event(error_event).await;
163
164                let finish_event = TaskEvent::Stopped {
165                    exit_code: None,
166                    finished_at: time,
167                    reason: TaskStopReason::Error(e.clone()),
168                    #[cfg(unix)]
169                    signal: None,
170                };
171                let _ = self.shared_context.send_event(finish_event).await;
172
173                return Err(e);
174            }
175        }
176    }
177    /// Setup a command for execution based on the task configuration.
178    ///
179    /// Creates a tokio Command with all the configured parameters in TaskConfig.
180    ///
181    /// # Returns
182    ///
183    /// A configured `tokio::process::Command` ready for spawning
184    pub(crate) fn setup_command(&self) -> Command {
185        let mut cmd = Command::new(&self.shared_context.config.command);
186
187        cmd.kill_on_drop(true);
188
189        // Setup additional arguments
190        if let Some(args) = &self.shared_context.config.args {
191            cmd.args(args);
192        }
193
194        // Setup working directory with validation
195        if let Some(dir) = &self.shared_context.config.working_dir {
196            cmd.current_dir(dir);
197        }
198
199        // Setup environment variables
200        if let Some(envs) = &self.shared_context.config.env {
201            cmd.envs(envs);
202        }
203
204        // Setup stdio
205        cmd.stdout(Stdio::piped()).stderr(Stdio::piped()).stdin(
206            if self.shared_context.config.enable_stdin.unwrap_or_default() {
207                Stdio::piped()
208            } else {
209                Stdio::null()
210            },
211        );
212        cmd
213    }
214
215    /// Configures whether to drop (close) the event channel when the task finishes.
216    ///
217    /// By default, the event channel (`event_tx`) is dropped when the task finishes,
218    /// signaling to receivers that no more events will be sent.
219    /// This method allows you to override that behavior,
220    /// which is useful if you want to keep the event channel open for multiple tasks
221    /// or for manual control.
222    ///
223    /// # Arguments
224    ///
225    /// * `drop` - If `true`, the event channel will be dropped when the task finishes (default behavior).
226    ///            If `false`, the event channel will remain open after task completion.
227    ///
228    /// # Example
229    ///
230    /// ```rust
231    /// # use tcrm_task::tasks::{config::TaskConfig, tokio::executor::TaskExecutor};
232    /// # use tokio::sync::mpsc;
233    /// # let config = TaskConfig::new("echo");
234    /// # let (tx, _rx) = mpsc::channel(100);
235    /// # let executor = TaskExecutor::new(config, tx);
236    /// executor.set_drop_event_tx_on_finished(false); // Keep event channel open after task finishes
237    /// ```
238    pub fn set_drop_event_tx_on_finished(&self, drop: bool) {
239        self.shared_context.set_drop_event_tx_on_finished(drop);
240    }
241}