tcrm_task/tasks/tokio/coordinate/start.rs
1use tokio::sync::oneshot;
2
3use crate::tasks::{
4 config::StreamSource, error::TaskError, event::TaskTerminateReason, state::TaskState,
5 tokio::executor::TaskExecutor,
6};
7
8impl TaskExecutor {
9 /// Start execution in a coordinated async event loop.
10 ///
11 /// This is the main execution method that spawns the process, sets up event monitoring,
12 /// and manages the complete process lifecycle. It handles stdout/stderr streaming,
13 /// timeout management, termination signals, and process cleanup in a coordinated
14 /// async event loop using `tokio::select!``.
15 ///
16 /// # Arguments
17 ///
18 /// * `event_tx` - Channel sender for emitting [`TaskEvent`]s during process execution
19 ///
20 /// # Returns
21 ///
22 /// * `Ok(())` - Process coordination started successfully
23 /// * `Err(TaskError)` - Configuration validation or process spawning failed
24 ///
25 /// # Errors
26 ///
27 /// Returns [`TaskError`] for:
28 /// - [`TaskError::InvalidConfiguration`] - Configuration validation failed
29 /// - [`TaskError::IO`] - Process spawning failed
30 /// - [`TaskError::Handle`] - Process handle or watcher setup failed
31 ///
32 /// # Events Emitted
33 ///
34 /// During execution, the following events are sent via `event_tx`:
35 /// - [`TaskEvent::Started`] - Process spawned successfully
36 /// - [`TaskEvent::Output`] - Lines from stdout/stderr
37 /// - [`TaskEvent::Ready`] - Ready indicator detected (if configured)
38 /// - [`TaskEvent::Stopped`] - Process completed with exit code
39 /// - [`TaskEvent::Error`] - Errors during execution
40 ///
41 /// # Examples
42 ///
43 /// ## Basic Usage
44 /// ```rust
45 /// use tcrm_task::tasks::{config::TaskConfig, tokio::executor::TaskExecutor};
46 /// use tokio::sync::mpsc;
47 ///
48 /// #[tokio::main]
49 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
50 /// #[cfg(windows)]
51 /// let config = TaskConfig::new("cmd").args(["/C", "echo", "test"]);
52 /// #[cfg(unix)]
53 /// let config = TaskConfig::new("echo").args(["test"]);
54 ///
55 /// config.validate()?;
56 ///
57 /// let (tx, mut rx) = mpsc::channel(100);
58 /// let mut executor = TaskExecutor::new(config, tx);
59 ///
60 /// // Start coordination - returns immediately, process runs in background
61 /// executor.coordinate_start().await?;
62 ///
63 /// // Process events until completion
64 /// while let Some(event) = rx.recv().await {
65 /// println!("Event: {:?}", event);
66 /// if matches!(event, tcrm_task::tasks::event::TaskEvent::Stopped { .. }) {
67 /// break;
68 /// }
69 /// }
70 ///
71 /// Ok(())
72 /// }
73 /// ```
74 ///
75 /// ## With Ready Indicator
76 /// ```rust
77 /// use tcrm_task::tasks::{
78 /// config::{TaskConfig, StreamSource},
79 /// tokio::executor::TaskExecutor,
80 /// event::TaskEvent
81 /// };
82 /// use tokio::sync::mpsc;
83 ///
84 /// #[tokio::main]
85 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
86 /// #[cfg(windows)]
87 /// let config = TaskConfig::new("cmd")
88 /// .args(["/C", "echo", "Server ready"])
89 /// .ready_indicator("Server ready")
90 /// .ready_indicator_source(StreamSource::Stdout);
91 ///
92 /// #[cfg(unix)]
93 /// let config = TaskConfig::new("echo")
94 /// .args(["Server ready"])
95 /// .ready_indicator("Server ready")
96 /// .ready_indicator_source(StreamSource::Stdout);
97 ///
98 /// let (tx, mut rx) = mpsc::channel(100);
99 /// let mut executor = TaskExecutor::new(config, tx);
100 ///
101 /// executor.coordinate_start().await?;
102 ///
103 /// while let Some(event) = rx.recv().await {
104 /// match event {
105 /// TaskEvent::Ready => {
106 /// println!("Process is ready!");
107 /// // Can now interact with the running process
108 /// }
109 /// TaskEvent::Stopped { .. } => break,
110 /// _ => {}
111 /// }
112 /// }
113 ///
114 /// Ok(())
115 /// }
116 /// ```
117 pub async fn coordinate_start(&mut self) -> Result<(), TaskError> {
118 let event_tx = match self.shared_context.get_event_tx().await {
119 Some(tx) => tx,
120 None => {
121 let msg = "Event channel sender not available".to_string();
122 #[cfg(feature = "tracing")]
123 tracing::error!("{}", msg);
124 return Err(TaskError::Channel(msg));
125 }
126 };
127
128 Self::update_state(&self.shared_context, TaskState::Initiating);
129
130 self.validate_config(&event_tx).await?;
131
132 let cmd = self.setup_command();
133
134 #[cfg(feature = "process-group")]
135 let cmd = self.setup_process_group(cmd).await?;
136
137 let mut child = self.spawn_child(cmd, &event_tx).await?;
138 self.store_stdin(&mut child, &event_tx).await?;
139
140 let (mut stdout, mut stderr) = self.take_std_output_reader(&mut child, &event_tx).await?;
141 let (terminate_tx, mut terminate_rx) = oneshot::channel::<TaskTerminateReason>();
142 self.terminate_tx = Some(terminate_tx);
143
144 let (internal_terminate_tx, mut internal_terminate_rx) =
145 oneshot::channel::<TaskTerminateReason>();
146 self.shared_context
147 .set_internal_terminate_tx(internal_terminate_tx)
148 .await;
149
150 let shared_context = self.shared_context.clone();
151
152 tokio::spawn(async move {
153 let mut process_exited = false;
154 let mut termination_requested = false;
155 let mut stdout_eof = false;
156 let mut stderr_eof = false;
157
158 // Create timeout future once outside the loop to prevent reset on each iteration.
159 // This fixes a bug where the timeout would never trigger if other select!
160 // branches (stdout/stderr) were frequently selected, causing the timeout future
161 // to be dropped and recreated on each iteration.
162 let mut timeout_future = Self::create_timeout_future(shared_context.clone());
163 let mut timeout_triggered = false;
164
165 loop {
166 // Exit conditions
167 if process_exited && stdout_eof && stderr_eof {
168 break;
169 }
170
171 // Force exit if termination was requested and streams are taking too long
172 if termination_requested && stdout_eof && stderr_eof {
173 break;
174 }
175 tokio::select! {
176 line = stdout.next_line(), if !stdout_eof =>
177 stdout_eof = Self::handle_output(shared_context.clone(), line, &event_tx, StreamSource::Stdout).await,
178 line = stderr.next_line(), if !stderr_eof =>
179 stderr_eof = Self::handle_output(shared_context.clone(), line, &event_tx, StreamSource::Stderr).await,
180
181 _ = &mut timeout_future, if !timeout_triggered => {
182 timeout_triggered = true;
183 Self::handle_timeout(shared_context.clone()).await;
184 },
185
186 reason = Self::await_oneshot(&mut terminate_rx, termination_requested) =>
187 Self::handle_terminate(shared_context.clone(), &mut child, reason, &mut termination_requested, &event_tx).await,
188 reason = Self::await_oneshot(&mut internal_terminate_rx, termination_requested) =>
189 Self::handle_terminate(shared_context.clone(), &mut child, reason, &mut termination_requested, &event_tx).await,
190
191 result = child.wait() => Self::handle_wait_result(shared_context.clone(), result,&mut process_exited).await,
192 }
193 }
194 Self::handle_result(shared_context.clone(), &event_tx).await;
195 });
196 Ok(())
197 }
198 async fn await_oneshot<T>(
199 rx: &mut oneshot::Receiver<T>,
200 termination_requested: bool,
201 ) -> Result<T, oneshot::error::RecvError> {
202 if termination_requested {
203 std::future::pending().await
204 } else {
205 rx.await
206 }
207 }
208}