tcrm_task/tasks/tokio/executor.rs
1use std::{process::Stdio, sync::Arc};
2
3use tokio::{
4 process::{ChildStdin, Command},
5 sync::{mpsc, oneshot},
6};
7
8use crate::tasks::{
9 config::TaskConfig,
10 error::TaskError,
11 event::{TaskEvent, TaskEventEnvelope, TaskStopReason, TaskTerminateReason},
12 state::TaskState,
13 tokio::context::TaskExecutorContext,
14};
15
16/// Task executor for managing process lifecycle
17///
18/// `TaskExecutor` is the main entry point for executing system processes with real-time
19/// event monitoring, timeout management, and cross-platform process control.
20/// It coordinates process spawning, I/O handling, and termination through an event-driven
21/// architecture built on tokio.
22///
23/// # Examples
24///
25/// ## Basic Process Execution
26/// ```rust
27/// use tcrm_task::tasks::{config::TaskConfig, tokio::executor::TaskExecutor};
28/// use tokio::sync::mpsc;
29///
30/// #[tokio::main]
31/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
32/// #[cfg(windows)]
33/// let config = TaskConfig::new("cmd").args(["/C", "echo", "Hello, World!"]);
34/// #[cfg(unix)]
35/// let config = TaskConfig::new("echo").args(["Hello, World!"]);
36///
37/// config.validate()?;
38///
39/// let (tx, mut rx) = mpsc::channel(100);
40/// let mut executor = TaskExecutor::new(config, tx);
41///
42/// executor.coordinate_start().await?;
43///
44/// while let Some(envelope) = rx.recv().await {
45/// match envelope.event {
46/// tcrm_task::tasks::event::TaskEvent::Output { line, .. } => {
47/// println!("Output: {}", line);
48/// }
49/// tcrm_task::tasks::event::TaskEvent::Stopped { .. } => break,
50/// _ => {}
51/// }
52/// }
53///
54/// Ok(())
55/// }
56/// ```
57///
58/// ## Process with Timeout and Termination
59/// ```rust
60/// use tcrm_task::tasks::{
61/// config::TaskConfig,
62/// tokio::executor::TaskExecutor,
63/// control::TaskControl,
64/// event::TaskTerminateReason
65/// };
66/// use tokio::sync::mpsc;
67///
68/// #[tokio::main]
69/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
70/// #[cfg(windows)]
71/// let config = TaskConfig::new("cmd")
72/// .args(["/C", "timeout", "/t", "10"])
73/// .timeout_ms(5000);
74/// #[cfg(unix)]
75/// let config = TaskConfig::new("sleep")
76/// .args(["10"])
77/// .timeout_ms(5000);
78///
79/// let (tx, mut rx) = mpsc::channel(100);
80/// let mut executor = TaskExecutor::new(config, tx);
81///
82/// executor.coordinate_start().await?;
83///
84/// // Terminate after 2 seconds
85/// tokio::spawn(async move {
86/// tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
87/// let _ = executor.terminate_task(TaskTerminateReason::UserRequested);
88/// });
89///
90/// while let Some(envelope) = rx.recv().await {
91/// if matches!(envelope.event, tcrm_task::tasks::event::TaskEvent::Stopped { .. }) {
92/// break;
93/// }
94/// }
95///
96/// Ok(())
97/// }
98/// ```
99#[derive(Debug)]
100pub struct TaskExecutor {
101 pub(crate) shared_context: Arc<TaskExecutorContext>,
102 pub(crate) stdin: Option<ChildStdin>,
103 pub(crate) terminate_tx: Option<oneshot::Sender<TaskTerminateReason>>,
104}
105
106impl TaskExecutor {
107 /// Create a new task executor with the given configuration
108 ///
109 /// # Arguments
110 ///
111 /// * `config` - Validated task configuration containing command, arguments, and options
112 /// * `event_tx` - Channel for sending task events
113 ///
114 /// # Examples
115 ///
116 /// ```rust
117 /// use tcrm_task::tasks::{config::TaskConfig, tokio::executor::TaskExecutor};
118 /// use tokio::sync::mpsc;
119 ///
120 /// #[cfg(windows)]
121 /// let config = TaskConfig::new("cmd").args(["/C", "dir"]);
122 /// #[cfg(unix)]
123 /// let config = TaskConfig::new("ls").args(["-la"]);
124 ///
125 /// let (tx, _rx) = mpsc::channel(100);
126 /// let executor = TaskExecutor::new(config, tx);
127 /// ```
128 pub fn new(config: TaskConfig, event_tx: mpsc::Sender<TaskEventEnvelope>) -> Self {
129 Self {
130 shared_context: Arc::new(TaskExecutorContext::new(config, event_tx)),
131 stdin: None,
132 terminate_tx: None,
133 }
134 }
135
136 /// Validates the task configuration before execution.
137 ///
138 /// Checks if the task configuration is valid and sends appropriate
139 /// events if validation fails.
140 ///
141 /// # Arguments
142 ///
143 /// * `event_tx` - Channel for sending task events
144 ///
145 /// # Returns
146 ///
147 /// * `Ok(())` - If configuration is valid
148 /// * `Err(TaskError)` - If configuration validation fails
149 ///
150 /// # Errors
151 ///
152 /// Returns [`TaskError::InvalidConfiguration`] if configuration is invalid
153 pub(crate) async fn validate_config(&mut self) -> Result<(), TaskError> {
154 match self.shared_context.config.validate() {
155 Ok(()) => Ok(()),
156 Err(e) => {
157 #[cfg(feature = "tracing")]
158 tracing::error!(error = %e, "Invalid task configuration");
159
160 let time = Self::update_state(&self.shared_context, TaskState::Finished);
161 let error_event = TaskEvent::Error { error: e.clone() };
162 let _ = self.shared_context.send_event(error_event).await;
163
164 let finish_event = TaskEvent::Stopped {
165 exit_code: None,
166 finished_at: time,
167 reason: TaskStopReason::Error(e.clone()),
168 #[cfg(unix)]
169 signal: None,
170 };
171 let _ = self.shared_context.send_event(finish_event).await;
172
173 return Err(e);
174 }
175 }
176 }
177 /// Setup a command for execution based on the task configuration.
178 ///
179 /// Creates a tokio Command with all the configured parameters in TaskConfig.
180 ///
181 /// # Returns
182 ///
183 /// A configured `tokio::process::Command` ready for spawning
184 pub(crate) fn setup_command(&self) -> Command {
185 let mut cmd = Command::new(&self.shared_context.config.command);
186
187 cmd.kill_on_drop(true);
188
189 // Setup additional arguments
190 if let Some(args) = &self.shared_context.config.args {
191 cmd.args(args);
192 }
193
194 // Setup working directory with validation
195 if let Some(dir) = &self.shared_context.config.working_dir {
196 cmd.current_dir(dir);
197 }
198
199 // Setup environment variables
200 if let Some(envs) = &self.shared_context.config.env {
201 cmd.envs(envs);
202 }
203
204 // Setup stdio
205 cmd.stdout(Stdio::piped()).stderr(Stdio::piped()).stdin(
206 if self.shared_context.config.enable_stdin.unwrap_or_default() {
207 Stdio::piped()
208 } else {
209 Stdio::null()
210 },
211 );
212 cmd
213 }
214
215 /// Configures whether to drop (close) the event channel when the task finishes.
216 ///
217 /// By default, the event channel (`event_tx`) is dropped when the task finishes,
218 /// signaling to receivers that no more events will be sent.
219 /// This method allows you to override that behavior,
220 /// which is useful if you want to keep the event channel open for multiple tasks
221 /// or for manual control.
222 ///
223 /// # Arguments
224 ///
225 /// * `drop` - If `true`, the event channel will be dropped when the task finishes (default behavior).
226 /// If `false`, the event channel will remain open after task completion.
227 ///
228 /// # Example
229 ///
230 /// ```rust
231 /// # use tcrm_task::tasks::{config::TaskConfig, tokio::executor::TaskExecutor};
232 /// # use tokio::sync::mpsc;
233 /// # let config = TaskConfig::new("echo");
234 /// # let (tx, _rx) = mpsc::channel(100);
235 /// # let executor = TaskExecutor::new(config, tx);
236 /// executor.set_drop_event_tx_on_finished(false); // Keep event channel open after task finishes
237 /// ```
238 pub fn set_drop_event_tx_on_finished(&self, drop: bool) {
239 self.shared_context.set_drop_event_tx_on_finished(drop);
240 }
241}