tcrm_task/tasks/tokio/executor.rs
1use std::{process::Stdio, sync::Arc};
2
3use tokio::{
4 process::{ChildStdin, Command},
5 sync::{mpsc, oneshot},
6};
7
8use crate::tasks::{
9 config::TaskConfig,
10 error::TaskError,
11 event::{TaskEvent, TaskStopReason, TaskTerminateReason},
12 state::TaskState,
13 tokio::context::TaskExecutorContext,
14};
15
16/// Task executor for managing process lifecycle
17///
18/// `TaskExecutor` is the main entry point for executing system processes with real-time
19/// event monitoring, timeout management, and cross-platform process control.
20/// It coordinates process spawning, I/O handling, and termination through an event-driven
21/// architecture built on tokio.
22///
23/// # Examples
24///
25/// ## Basic Process Execution
26/// ```rust
27/// use tcrm_task::tasks::{config::TaskConfig, tokio::executor::TaskExecutor};
28/// use tokio::sync::mpsc;
29///
30/// #[tokio::main]
31/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
32/// #[cfg(windows)]
33/// let config = TaskConfig::new("cmd").args(["/C", "echo", "Hello, World!"]);
34/// #[cfg(unix)]
35/// let config = TaskConfig::new("echo").args(["Hello, World!"]);
36///
37/// config.validate()?;
38///
39/// let (tx, mut rx) = mpsc::channel(100);
40/// let mut executor = TaskExecutor::new(config, tx);
41///
42/// executor.coordinate_start().await?;
43///
44/// while let Some(event) = rx.recv().await {
45/// match event {
46/// tcrm_task::tasks::event::TaskEvent::Output { line, .. } => {
47/// println!("Output: {}", line);
48/// }
49/// tcrm_task::tasks::event::TaskEvent::Stopped { .. } => break,
50/// _ => {}
51/// }
52/// }
53///
54/// Ok(())
55/// }
56/// ```
57///
58/// ## Process with Timeout and Termination
59/// ```rust
60/// use tcrm_task::tasks::{
61/// config::TaskConfig,
62/// tokio::executor::TaskExecutor,
63/// control::TaskControl,
64/// event::TaskTerminateReason
65/// };
66/// use tokio::sync::mpsc;
67///
68/// #[tokio::main]
69/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
70/// #[cfg(windows)]
71/// let config = TaskConfig::new("cmd")
72/// .args(["/C", "timeout", "/t", "10"])
73/// .timeout_ms(5000);
74/// #[cfg(unix)]
75/// let config = TaskConfig::new("sleep")
76/// .args(["10"])
77/// .timeout_ms(5000);
78///
79/// let (tx, mut rx) = mpsc::channel(100);
80/// let mut executor = TaskExecutor::new(config, tx);
81///
82/// executor.coordinate_start().await?;
83///
84/// // Terminate after 2 seconds
85/// tokio::spawn(async move {
86/// tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
87/// let _ = executor.terminate_task(TaskTerminateReason::UserRequested);
88/// });
89///
90/// while let Some(event) = rx.recv().await {
91/// if matches!(event, tcrm_task::tasks::event::TaskEvent::Stopped { .. }) {
92/// break;
93/// }
94/// }
95///
96/// Ok(())
97/// }
98/// ```
99#[derive(Debug)]
100pub struct TaskExecutor {
101 pub(crate) shared_context: Arc<TaskExecutorContext>,
102 pub(crate) stdin: Option<ChildStdin>,
103 pub(crate) terminate_tx: Option<oneshot::Sender<TaskTerminateReason>>,
104}
105
106impl TaskExecutor {
107 /// Create a new task executor with the given configuration
108 ///
109 /// # Arguments
110 ///
111 /// * `config` - Validated task configuration containing command, arguments, and options
112 /// * `event_tx` - Channel for sending task events
113 ///
114 /// # Examples
115 ///
116 /// ```rust
117 /// use tcrm_task::tasks::{config::TaskConfig, tokio::executor::TaskExecutor};
118 /// use tokio::sync::mpsc;
119 ///
120 /// #[cfg(windows)]
121 /// let config = TaskConfig::new("cmd").args(["/C", "dir"]);
122 /// #[cfg(unix)]
123 /// let config = TaskConfig::new("ls").args(["-la"]);
124 ///
125 /// let (tx, _rx) = mpsc::channel(100);
126 /// let executor = TaskExecutor::new(config, tx);
127 /// ```
128 pub fn new(config: TaskConfig, event_tx: mpsc::Sender<TaskEvent>) -> Self {
129 Self {
130 shared_context: Arc::new(TaskExecutorContext::new(config, event_tx)),
131 stdin: None,
132 terminate_tx: None,
133 }
134 }
135
136 /// Validates the task configuration before execution.
137 ///
138 /// Checks if the task configuration is valid and sends appropriate
139 /// events if validation fails.
140 ///
141 /// # Arguments
142 ///
143 /// * `event_tx` - Channel for sending task events
144 ///
145 /// # Returns
146 ///
147 /// * `Ok(())` - If configuration is valid
148 /// * `Err(TaskError)` - If configuration validation fails
149 ///
150 /// # Errors
151 ///
152 /// Returns [`TaskError::InvalidConfiguration`] if configuration is invalid
153 pub(crate) async fn validate_config(
154 &mut self,
155 event_tx: &mpsc::Sender<TaskEvent>,
156 ) -> Result<(), TaskError> {
157 match self.shared_context.config.validate() {
158 Ok(()) => Ok(()),
159 Err(e) => {
160 #[cfg(feature = "tracing")]
161 tracing::error!(error = %e, "Invalid task configuration");
162
163 let time = Self::update_state(&self.shared_context, TaskState::Finished);
164 let error_event = TaskEvent::Error { error: e.clone() };
165 Self::send_event(event_tx, error_event).await;
166
167 let finish_event = TaskEvent::Stopped {
168 exit_code: None,
169 finished_at: time,
170 reason: TaskStopReason::Error(e.clone()),
171 #[cfg(unix)]
172 signal: None,
173 };
174 Self::send_event(event_tx, finish_event).await;
175
176 return Err(e);
177 }
178 }
179 }
180 /// Setup a command for execution based on the task configuration.
181 ///
182 /// Creates a tokio Command with all the configured parameters in TaskConfig.
183 ///
184 /// # Returns
185 ///
186 /// A configured `tokio::process::Command` ready for spawning
187 pub(crate) fn setup_command(&self) -> Command {
188 let mut cmd = Command::new(&self.shared_context.config.command);
189
190 cmd.kill_on_drop(true);
191
192 // Setup additional arguments
193 if let Some(args) = &self.shared_context.config.args {
194 cmd.args(args);
195 }
196
197 // Setup working directory with validation
198 if let Some(dir) = &self.shared_context.config.working_dir {
199 cmd.current_dir(dir);
200 }
201
202 // Setup environment variables
203 if let Some(envs) = &self.shared_context.config.env {
204 cmd.envs(envs);
205 }
206
207 // Setup stdio
208 cmd.stdout(Stdio::piped()).stderr(Stdio::piped()).stdin(
209 if self.shared_context.config.enable_stdin.unwrap_or_default() {
210 Stdio::piped()
211 } else {
212 Stdio::null()
213 },
214 );
215 cmd
216 }
217
218 /// Configures whether to drop (close) the event channel when the task finishes.
219 ///
220 /// By default, the event channel (`event_tx`) is dropped when the task finishes,
221 /// signaling to receivers that no more events will be sent.
222 /// This method allows you to override that behavior,
223 /// which is useful if you want to keep the event channel open for multiple tasks
224 /// or for manual control.
225 ///
226 /// # Arguments
227 ///
228 /// * `drop` - If `true`, the event channel will be dropped when the task finishes (default behavior).
229 /// If `false`, the event channel will remain open after task completion.
230 ///
231 /// # Example
232 ///
233 /// ```rust
234 /// # use tcrm_task::tasks::{config::TaskConfig, tokio::executor::TaskExecutor};
235 /// # use tokio::sync::mpsc;
236 /// # let config = TaskConfig::new("echo");
237 /// # let (tx, _rx) = mpsc::channel(100);
238 /// # let executor = TaskExecutor::new(config, tx);
239 /// executor.set_drop_event_tx_on_finished(false); // Keep event channel open after task finishes
240 /// ```
241 pub fn set_drop_event_tx_on_finished(&self, drop: bool) {
242 self.shared_context.set_drop_event_tx_on_finished(drop);
243 }
244}