1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
use tokio::sync::oneshot;
use crate::tasks::{
config::StreamSource, error::TaskError, event::TaskTerminateReason, state::TaskState,
tokio::executor::TaskExecutor,
};
impl TaskExecutor {
/// Start execution in a coordinated async event loop.
///
/// This is the main execution method that spawns the process, sets up event monitoring,
/// and manages the complete process lifecycle. It handles stdout/stderr streaming,
/// timeout management, termination signals, and process cleanup in a coordinated
/// async event loop using `tokio::select!``.
///
/// # Arguments
///
/// * `event_tx` - Channel sender for emitting [`TaskEvent`]s during process execution
///
/// # Returns
///
/// * `Ok(())` - Process coordination started successfully
/// * `Err(TaskError)` - Configuration validation or process spawning failed
///
/// # Errors
///
/// Returns [`TaskError`] for:
/// - [`TaskError::InvalidConfiguration`] - Configuration validation failed
/// - [`TaskError::IO`] - Process spawning failed
/// - [`TaskError::Handle`] - Process handle or watcher setup failed
///
/// # Events Emitted
///
/// During execution, the following events are sent via `event_tx`:
/// - [`TaskEvent::Started`] - Process spawned successfully
/// - [`TaskEvent::Output`] - Lines from stdout/stderr
/// - [`TaskEvent::Ready`] - Ready indicator detected (if configured)
/// - [`TaskEvent::Stopped`] - Process completed with exit code
/// - [`TaskEvent::Error`] - Errors during execution
///
/// # Examples
///
/// ## Basic Usage
/// ```rust
/// use tcrm_task::tasks::{config::TaskConfig, tokio::executor::TaskExecutor};
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// #[cfg(windows)]
/// let config = TaskConfig::new("cmd").args(["/C", "echo", "test"]);
/// #[cfg(unix)]
/// let config = TaskConfig::new("echo").args(["test"]);
///
/// config.validate()?;
///
/// let (tx, mut rx) = mpsc::channel(100);
/// let mut executor = TaskExecutor::new(config, tx);
///
/// // Start coordination - returns immediately, process runs in background
/// executor.coordinate_start().await?;
///
/// // Process events until completion
/// while let Some(envelope) = rx.recv().await {
/// println!("Event: {:?}", envelope.event);
/// if matches!(envelope.event, tcrm_task::tasks::event::TaskEvent::Stopped { .. }) {
/// break;
/// }
/// }
///
/// Ok(())
/// }
/// ```
///
/// ## With Ready Indicator
/// ```rust
/// use tcrm_task::tasks::{
/// config::{TaskConfig, StreamSource},
/// tokio::executor::TaskExecutor,
/// event::TaskEvent
/// };
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// #[cfg(windows)]
/// let config = TaskConfig::new("cmd")
/// .args(["/C", "echo", "Server ready"])
/// .ready_indicator("Server ready")
/// .ready_indicator_source(StreamSource::Stdout);
///
/// #[cfg(unix)]
/// let config = TaskConfig::new("echo")
/// .args(["Server ready"])
/// .ready_indicator("Server ready")
/// .ready_indicator_source(StreamSource::Stdout);
///
/// let (tx, mut rx) = mpsc::channel(100);
/// let mut executor = TaskExecutor::new(config, tx);
///
/// executor.coordinate_start().await?;
///
/// while let Some(envelope) = rx.recv().await {
/// match envelope.event {
/// TaskEvent::Ready => {
/// println!("Process is ready!");
/// // Can now interact with the running process
/// }
/// TaskEvent::Stopped { .. } => break,
/// _ => {}
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub async fn coordinate_start(&mut self) -> Result<(), TaskError> {
Self::update_state(&self.shared_context, TaskState::Initiating);
self.validate_config().await?;
let cmd = self.setup_command();
#[cfg(feature = "process-group")]
let cmd = self.setup_process_group(cmd).await?;
let mut child = self.spawn_child(cmd).await?;
self.store_stdin(&mut child).await?;
let (mut stdout, mut stderr) = self.take_std_output_reader(&mut child).await?;
let (terminate_tx, mut terminate_rx) = oneshot::channel::<TaskTerminateReason>();
self.terminate_tx = Some(terminate_tx);
let (internal_terminate_tx, mut internal_terminate_rx) =
oneshot::channel::<TaskTerminateReason>();
self.shared_context
.set_internal_terminate_tx(internal_terminate_tx)
.await;
let shared_context = self.shared_context.clone();
tokio::spawn(async move {
let mut process_exited = false;
let mut termination_requested = false;
let mut stdout_eof = false;
let mut stderr_eof = false;
// Create timeout future once outside the loop to prevent reset on each iteration.
// This fixes a bug where the timeout would never trigger if other select!
// branches (stdout/stderr) were frequently selected, causing the timeout future
// to be dropped and recreated on each iteration.
let mut timeout_future = Self::create_timeout_future(shared_context.clone());
let mut timeout_triggered = false;
loop {
// Exit conditions
if process_exited && stdout_eof && stderr_eof {
break;
}
// Force exit if termination was requested and streams are taking too long
if termination_requested && stdout_eof && stderr_eof {
break;
}
tokio::select! {
line = stdout.next_line(), if !stdout_eof =>
stdout_eof = Self::handle_output(shared_context.clone(), line, StreamSource::Stdout).await,
line = stderr.next_line(), if !stderr_eof =>
stderr_eof = Self::handle_output(shared_context.clone(), line, StreamSource::Stderr).await,
_ = &mut timeout_future, if !timeout_triggered => {
timeout_triggered = true;
Self::handle_timeout(shared_context.clone()).await;
},
reason = Self::await_oneshot(&mut terminate_rx, termination_requested) =>
Self::handle_terminate(shared_context.clone(), &mut child, reason, &mut termination_requested, ).await,
reason = Self::await_oneshot(&mut internal_terminate_rx, termination_requested) =>
Self::handle_terminate(shared_context.clone(), &mut child, reason, &mut termination_requested, ).await,
result = child.wait() => Self::handle_wait_result(shared_context.clone(), result,&mut process_exited).await,
}
}
Self::handle_result(shared_context.clone()).await;
});
Ok(())
}
async fn await_oneshot<T>(
rx: &mut oneshot::Receiver<T>,
termination_requested: bool,
) -> Result<T, oneshot::error::RecvError> {
if termination_requested {
std::future::pending().await
} else {
rx.await
}
}
}