tcrm_task/tasks/async_tokio/spawner.rs
1use std::sync::Arc;
2use std::time::Duration;
3use tokio::sync::{Mutex, RwLock, mpsc, oneshot};
4use tokio::task::JoinHandle;
5use tokio::time::{Instant, timeout};
6
7use crate::tasks::error::TaskError;
8use crate::tasks::event::TaskTerminateReason;
9use crate::tasks::{config::TaskConfig, state::TaskState};
10
11/// Information about a running or completed task
12///
13/// Provides metadata about the task execution including timing, state, and lifecycle information.
14///
15/// # Examples
16///
17/// ```rust
18/// use tcrm_task::tasks::async_tokio::spawner::{TaskSpawner, TaskInfo};
19/// use tcrm_task::tasks::config::TaskConfig;
20///
21/// #[tokio::main]
22/// async fn main() {
23/// let config = TaskConfig::new("cmd").args(["/C", "echo", "hello"]);
24/// let spawner = TaskSpawner::new("test".to_string(), config);
25///
26/// let info: TaskInfo = spawner.get_task_info().await;
27/// println!("Task {} is in state {:?}", info.name, info.state);
28/// }
29/// ```
30#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
31#[derive(Debug, Clone)]
32pub struct TaskInfo {
33 /// Name of the task
34 pub name: String,
35 /// Current execution state
36 pub state: TaskState,
37 /// How long the task has been running
38 pub uptime: Duration,
39 /// When the task was created
40 #[cfg_attr(feature = "serde", serde(skip, default = "default_instant"))]
41 pub created_at: Instant,
42 /// When the task finished (if completed)
43 #[cfg_attr(feature = "serde", serde(skip, default))]
44 pub finished_at: Option<Instant>,
45}
46
47#[cfg(feature = "serde")]
48/// Returns the current instant for serde default value.
49fn default_instant() -> Instant {
50 Instant::now()
51}
52
53/// Spawns and manages the lifecycle of a task
54///
55/// `TaskSpawner` handles the execution of system processes with comprehensive
56/// monitoring, state management, and event emission. It provides both
57/// synchronous and asynchronous interfaces for process management.
58///
59/// # Features
60///
61/// - **State Management**: Track task execution through Pending, Running, Ready, and Finished states
62/// - **Event Emission**: Real-time events for output, state changes, and lifecycle events
63/// - **Timeout Handling**: Automatic termination when tasks exceed configured timeouts
64/// - **Stdin Support**: Send input to running processes when enabled
65/// - **Ready Detection**: Automatic detection when long-running processes are ready
66/// - **Process Control**: Start, stop, and terminate processes with proper cleanup
67///
68/// # Examples
69///
70/// ## Simple Command Execution
71/// ```rust
72/// use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
73/// use tokio::sync::mpsc;
74///
75/// #[tokio::main]
76/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
77/// let config = TaskConfig::new("cmd")
78/// .args(["/C", "echo", "Hello World"]);
79///
80/// let (tx, mut rx) = mpsc::channel(100);
81/// let mut spawner = TaskSpawner::new("hello".to_string(), config);
82///
83/// spawner.start_direct(tx).await?;
84///
85/// // Process events
86/// while let Some(event) = rx.recv().await {
87/// println!("Event: {:?}", event);
88/// if matches!(event, tcrm_task::tasks::event::TaskEvent::Stopped { .. }) {
89/// break;
90/// }
91/// }
92///
93/// Ok(())
94/// }
95/// ```
96///
97/// ## Long-running Process with Ready Detection
98/// ```rust
99/// use tcrm_task::tasks::{config::{TaskConfig, StreamSource}, async_tokio::spawner::TaskSpawner};
100/// use tokio::sync::mpsc;
101///
102/// #[tokio::main]
103/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
104/// let config = TaskConfig::new("cmd")
105/// .args(["/C", "echo", "Server listening"])
106/// .ready_indicator("Server listening")
107/// .ready_indicator_source(StreamSource::Stdout)
108/// .timeout_ms(30000);
109///
110/// let (tx, mut rx) = mpsc::channel(100);
111/// let mut spawner = TaskSpawner::new("server".to_string(), config);
112///
113/// spawner.start_direct(tx).await?;
114///
115/// // Wait for ready event
116/// while let Some(event) = rx.recv().await {
117/// if matches!(event, tcrm_task::tasks::event::TaskEvent::Ready { .. }) {
118/// println!("Server is ready to accept requests!");
119/// // Server is now ready, can start sending requests
120/// break;
121/// }
122/// }
123///
124/// Ok(())
125/// }
126/// ```
127///
128/// ## Interactive Process with Stdin
129/// ```rust
130/// use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
131/// use tokio::sync::mpsc;
132///
133/// #[tokio::main]
134/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
135/// let config = TaskConfig::new("cmd")
136/// .args(["/C", "echo", "Hello"])
137/// .enable_stdin(true);
138///
139/// let (tx, mut rx) = mpsc::channel(100);
140/// let (stdin_tx, stdin_rx) = mpsc::channel(10);
141/// let mut spawner = TaskSpawner::new("cmd".to_string(), config);
142///
143/// // Set up stdin channel - note: set_stdin consumes self and returns Self
144/// spawner = spawner.set_stdin(stdin_rx);
145///
146/// spawner.start_direct(tx).await?;
147///
148/// // Send input to the process
149/// stdin_tx.send("print('Hello from stdin!')".to_string()).await?;
150/// stdin_tx.send("exit()".to_string()).await?;
151///
152/// // Process events
153/// while let Some(event) = rx.recv().await {
154/// match event {
155/// tcrm_task::tasks::event::TaskEvent::Output { line, .. } => {
156/// println!("Output: {}", line);
157/// }
158/// tcrm_task::tasks::event::TaskEvent::Stopped { .. } => break,
159/// _ => {}
160/// }
161/// }
162///
163/// Ok(())
164/// }
165/// ```
166#[derive(Debug)]
167pub struct TaskSpawner {
168 pub(crate) config: TaskConfig,
169 pub(crate) task_name: String,
170 pub(crate) state: Arc<RwLock<TaskState>>,
171 pub(crate) terminate_tx: Arc<Mutex<Option<oneshot::Sender<TaskTerminateReason>>>>,
172 pub(crate) process_id: Arc<RwLock<Option<u32>>>,
173 pub(crate) created_at: Instant,
174 pub(crate) finished_at: Arc<RwLock<Option<Instant>>>,
175 pub(crate) stdin_rx: Option<mpsc::Receiver<String>>,
176}
177
178impl TaskSpawner {
179 /// Create a new task spawner for the given task name and configuration
180 ///
181 /// Creates a new `TaskSpawner` instance in the Pending state. The configuration
182 /// is not validated until `start_direct` is called.
183 ///
184 /// # Arguments
185 ///
186 /// * `task_name` - Unique identifier for this task instance
187 /// * `config` - Task configuration defining command, arguments, environment, etc.
188 ///
189 /// # Examples
190 /// ```rust
191 /// use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
192 ///
193 /// let config = TaskConfig::new("echo").args(["hello"]);
194 /// let spawner = TaskSpawner::new("my-task".to_string(), config);
195 /// ```
196 #[must_use]
197 pub fn new(task_name: String, config: TaskConfig) -> Self {
198 Self {
199 task_name,
200 config,
201 state: Arc::new(RwLock::new(TaskState::Pending)),
202 terminate_tx: Arc::new(Mutex::new(None)),
203 process_id: Arc::new(RwLock::new(None)),
204 created_at: Instant::now(),
205 finished_at: Arc::new(RwLock::new(None)),
206 stdin_rx: None,
207 }
208 }
209
210 /// Set the stdin receiver for the task, enabling asynchronous input
211 ///
212 /// Configures a channel for sending input to the process stdin. This method
213 /// has no effect if `enable_stdin` is false in the task configuration.
214 ///
215 /// # Arguments
216 ///
217 /// * `stdin_rx` - Receiver channel for stdin input strings
218 ///
219 /// # Examples
220 /// ```rust
221 /// use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
222 /// use tokio::sync::mpsc;
223 ///
224 /// let config = TaskConfig::new("python")
225 /// .args(["-i"])
226 /// .enable_stdin(true);
227 ///
228 /// let (stdin_tx, stdin_rx) = mpsc::channel(10);
229 /// let spawner = TaskSpawner::new("interactive".to_string(), config)
230 /// .set_stdin(stdin_rx);
231 /// ```
232 #[must_use]
233 pub fn set_stdin(mut self, stdin_rx: mpsc::Receiver<String>) -> Self {
234 if self.config.enable_stdin.unwrap_or_default() {
235 self.stdin_rx = Some(stdin_rx);
236 }
237 self
238 }
239
240 /// Get the current state of the task
241 ///
242 /// Returns the current execution state of the task. States progress through:
243 /// Pending → Initiating → Running → (Ready) → Finished
244 ///
245 /// # Examples
246 /// ```rust
247 /// use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner, state::TaskState};
248 ///
249 /// #[tokio::main]
250 /// async fn main() {
251 /// let config = TaskConfig::new("echo");
252 /// let spawner = TaskSpawner::new("test".to_string(), config);
253 ///
254 /// assert_eq!(spawner.get_state().await, TaskState::Pending);
255 /// }
256 /// ```
257 pub async fn get_state(&self) -> TaskState {
258 self.state.read().await.clone()
259 }
260
261 /// Check if the task is currently running
262 ///
263 /// Returns true if the task state is Running, false otherwise.
264 ///
265 /// # Examples
266 /// ```rust
267 /// use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
268 ///
269 /// #[tokio::main]
270 /// async fn main() {
271 /// let config = TaskConfig::new("echo");
272 /// let spawner = TaskSpawner::new("test".to_string(), config);
273 ///
274 /// assert!(!spawner.is_running().await); // Not running initially
275 /// }
276 /// ```
277 pub async fn is_running(&self) -> bool {
278 let state = self.state.read().await.clone();
279 state == TaskState::Running
280 }
281
282 /// Check if the task is currently ready
283 ///
284 /// Returns true if the task state is Ready, false otherwise.
285 /// The Ready state indicates a long-running process has signaled it's
286 /// ready to accept requests (via the ready indicator).
287 ///
288 /// # Examples
289 /// ```rust
290 /// use tcrm_task::tasks::{config::{TaskConfig, StreamSource}, async_tokio::spawner::TaskSpawner};
291 ///
292 /// #[tokio::main]
293 /// async fn main() {
294 /// let config = TaskConfig::new("my-server")
295 /// .ready_indicator("Server ready")
296 /// .ready_indicator_source(StreamSource::Stdout);
297 /// let spawner = TaskSpawner::new("server".to_string(), config);
298 ///
299 /// assert!(!spawner.is_ready().await); // Not ready initially
300 /// }
301 /// ```
302 pub async fn is_ready(&self) -> bool {
303 let state = self.state.read().await.clone();
304 state == TaskState::Ready
305 }
306
307 /// Get the uptime of the task since creation
308 ///
309 /// Returns the duration since the `TaskSpawner` was created, regardless
310 /// of the current execution state.
311 ///
312 /// # Examples
313 /// ```rust
314 /// use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
315 /// use std::time::Duration;
316 ///
317 /// #[tokio::main]
318 /// async fn main() {
319 /// let config = TaskConfig::new("echo");
320 /// let spawner = TaskSpawner::new("test".to_string(), config);
321 ///
322 /// let uptime = spawner.uptime();
323 /// assert!(uptime < Duration::from_secs(1)); // Just created
324 /// }
325 /// ```
326 #[must_use]
327 pub fn uptime(&self) -> Duration {
328 self.created_at.elapsed()
329 }
330
331 /// Get comprehensive information about the task
332 ///
333 /// Returns a `TaskInfo` struct containing the task name, current state,
334 /// uptime, creation time, and completion time (if finished).
335 ///
336 /// # Examples
337 /// ```rust
338 /// use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
339 ///
340 /// #[tokio::main]
341 /// async fn main() {
342 /// let config = TaskConfig::new("echo").args(["hello"]);
343 /// let spawner = TaskSpawner::new("info-test".to_string(), config);
344 ///
345 /// let info = spawner.get_task_info().await;
346 /// println!("Task '{}' has been running for {:?}", info.name, info.uptime);
347 /// }
348 /// ```
349 pub async fn get_task_info(&self) -> TaskInfo {
350 TaskInfo {
351 name: self.task_name.clone(),
352 state: self.get_state().await,
353 uptime: self.uptime(),
354 created_at: self.created_at,
355 finished_at: *self.finished_at.read().await,
356 }
357 }
358
359 /// Get the process ID of the running task (if any)
360 ///
361 /// Returns the system process ID if the task is currently running,
362 /// or None if the task hasn't started or has finished.
363 ///
364 /// # Examples
365 /// ```rust
366 /// use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
367 /// use tokio::sync::mpsc;
368 ///
369 /// #[tokio::main]
370 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
371 /// let config = TaskConfig::new("cmd").args(["/C", "ping", "127.0.0.1", "-n", "2"]);
372 /// let mut spawner = TaskSpawner::new("pid-test".to_string(), config);
373 ///
374 /// assert_eq!(spawner.get_process_id().await, None); // Not started yet
375 ///
376 /// let (tx, _rx) = mpsc::channel(100);
377 /// spawner.start_direct(tx).await?;
378 ///
379 /// // Now should have a process ID
380 /// let pid = spawner.get_process_id().await;
381 /// assert!(pid.is_some());
382 ///
383 /// Ok(())
384 /// }
385 /// ```
386 pub async fn get_process_id(&self) -> Option<u32> {
387 *self.process_id.read().await
388 }
389
390 /// Update the state of the task
391 ///
392 /// Internal method used by the spawner to update task state during execution.
393 pub(crate) async fn update_state(&self, new_state: TaskState) {
394 let mut state = self.state.write().await;
395 *state = new_state;
396 }
397
398 /// Send a termination signal to the running task
399 ///
400 /// Requests graceful termination of the running process with the specified reason.
401 /// The process may take some time to respond to the termination signal.
402 ///
403 /// # Arguments
404 ///
405 /// * `reason` - The reason for termination (Timeout, Cleanup, etc.)
406 ///
407 /// # Returns
408 ///
409 /// - `Ok(())` if the termination signal was sent successfully
410 /// - `Err(TaskError::Channel)` if the signal could not be sent
411 ///
412 /// # Errors
413 ///
414 /// Returns a [`TaskError::Channel`] if the internal termination channel
415 /// has been closed and the signal cannot be delivered to the task.
416 ///
417 /// # Examples
418 /// ```rust
419 /// use tcrm_task::tasks::{
420 /// config::TaskConfig,
421 /// async_tokio::spawner::TaskSpawner,
422 /// event::TaskTerminateReason
423 /// };
424 /// use tokio::sync::mpsc;
425 ///
426 /// #[tokio::main]
427 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
428 /// let config = TaskConfig::new("cmd").args(["/C", "ping", "127.0.0.1", "-n", "10"]); // Long-running task
429 /// let mut spawner = TaskSpawner::new("terminate-test".to_string(), config);
430 ///
431 /// let (tx, mut rx) = mpsc::channel(100);
432 /// spawner.start_direct(tx).await?;
433 ///
434 /// // Wait a bit, then terminate
435 /// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
436 /// spawner.send_terminate_signal(TaskTerminateReason::Cleanup).await?;
437 ///
438 /// // Process events until stopped
439 /// while let Some(event) = rx.recv().await {
440 /// if matches!(event, tcrm_task::tasks::event::TaskEvent::Stopped { .. }) {
441 /// break;
442 /// }
443 /// }
444 ///
445 /// Ok(())
446 /// }
447 /// ```
448 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
449 pub async fn send_terminate_signal(
450 &self,
451 reason: TaskTerminateReason,
452 ) -> Result<(), TaskError> {
453 if let Some(tx) = self.terminate_tx.lock().await.take() {
454 if tx.send(reason.clone()).is_err() {
455 let msg = "Terminate channel closed while sending signal";
456 #[cfg(feature = "tracing")]
457 tracing::warn!(terminate_reason=?reason, msg);
458 return Err(TaskError::Channel(msg.to_string()));
459 }
460 } else {
461 let msg = "Terminate signal already sent or channel missing";
462 #[cfg(feature = "tracing")]
463 tracing::warn!(msg);
464 return Err(TaskError::Channel(msg.to_string()));
465 }
466
467 Ok(())
468 }
469}
470
471/// Waits for all spawned task handles to complete, with a timeout
472///
473/// Returns an error if any handle fails or times out
474pub(crate) async fn join_all_handles(
475 task_handles: &mut Vec<JoinHandle<()>>,
476) -> Result<(), TaskError> {
477 if task_handles.is_empty() {
478 return Ok(());
479 }
480
481 let handles = std::mem::take(task_handles);
482 let mut errors = Vec::new();
483
484 for mut handle in handles {
485 match timeout(Duration::from_secs(5), &mut handle).await {
486 Ok(Ok(())) => {}
487 Ok(Err(join_err)) => {
488 let err_msg = format!("Handle [{}] join failed: {:?}", handle.id(), join_err);
489
490 errors.push(err_msg);
491 }
492 Err(_) => {
493 let err_msg = format!("Handle [{}] join timeout, aborting", handle.id());
494 handle.abort(); // ensure it’s killed
495 errors.push(err_msg);
496 }
497 }
498 }
499
500 if !errors.is_empty() {
501 return Err(TaskError::Handle(format!(
502 "Multiple task handles join failures: {}",
503 errors.join("; ")
504 )));
505 }
506
507 Ok(())
508}
509#[cfg(test)]
510mod tests {
511 use std::time::Duration;
512 use tokio::sync::mpsc;
513 use tokio::time::sleep;
514
515 use crate::tasks::{
516 async_tokio::spawner::{TaskInfo, TaskSpawner},
517 config::TaskConfig,
518 error::TaskError,
519 event::TaskTerminateReason,
520 state::TaskState,
521 };
522
523 #[tokio::test]
524 async fn task_spawner_is_running_returns_true_when_state_running() {
525 let config = TaskConfig::new("echo");
526 let spawner = TaskSpawner::new("running_task".to_string(), config);
527 assert!(
528 !spawner.is_running().await,
529 "Should not be running initially"
530 );
531 spawner.update_state(TaskState::Running).await;
532 assert!(spawner.is_running().await, "Should be running after update");
533 }
534
535 #[tokio::test]
536 async fn task_spawner_is_ready_returns_true_when_state_ready() {
537 let config = TaskConfig::new("echo");
538 let spawner = TaskSpawner::new("ready_task".to_string(), config);
539 assert!(!spawner.is_ready().await, "Should not be ready initially");
540 spawner.update_state(TaskState::Ready).await;
541 assert!(spawner.is_ready().await, "Should be ready after update");
542 }
543
544 #[tokio::test]
545 async fn task_spawner_initial_state_is_pending() {
546 let config = TaskConfig::new("echo");
547 let spawner = TaskSpawner::new("pending_task".to_string(), config);
548 let state = spawner.get_state().await;
549 assert_eq!(state, TaskState::Pending, "Initial state should be Pending");
550 }
551
552 #[tokio::test]
553 async fn task_spawner_update_state_changes_state() {
554 let config = TaskConfig::new("echo");
555 let spawner = TaskSpawner::new("update_task".to_string(), config);
556 spawner.update_state(TaskState::Running).await;
557 let state = spawner.get_state().await;
558 assert_eq!(
559 state,
560 TaskState::Running,
561 "State should be Running after update"
562 );
563 }
564
565 #[tokio::test]
566 async fn task_spawner_uptime_increases_over_time() {
567 let config = TaskConfig::new("echo");
568 let spawner = TaskSpawner::new("uptime_task".to_string(), config);
569 let uptime1 = spawner.uptime();
570 sleep(Duration::from_millis(20)).await;
571 let uptime2 = spawner.uptime();
572 assert!(uptime2 > uptime1, "Uptime should increase after sleep");
573 }
574
575 #[tokio::test]
576 async fn task_spawner_get_task_info_returns_correct_info() {
577 let config = TaskConfig::new("echo");
578 let spawner = TaskSpawner::new("info_task".to_string(), config);
579 let info: TaskInfo = spawner.get_task_info().await;
580 assert_eq!(info.name, "info_task");
581 assert_eq!(info.state, TaskState::Pending);
582 assert!(info.uptime >= Duration::ZERO);
583 }
584
585 #[tokio::test]
586 async fn task_spawner_process_id_initially_none() {
587 let config = TaskConfig::new("echo");
588 let spawner = TaskSpawner::new("process_id_task".to_string(), config);
589 assert_eq!(spawner.get_process_id().await, None);
590 }
591
592 #[tokio::test]
593 async fn task_spawner_stdin_disabled_ignores_channel() {
594 let config = TaskConfig::new("echo").enable_stdin(false);
595 let (_, rx) = mpsc::channel(100);
596
597 let spawner = TaskSpawner::new("no_stdin".to_string(), config).set_stdin(rx);
598 assert!(spawner.stdin_rx.is_none());
599 }
600
601 #[tokio::test]
602 async fn task_spawner_send_terminate_signal_no_channel() {
603 let config = TaskConfig::new("echo");
604 let spawner = TaskSpawner::new("no_channel".to_string(), config);
605
606 let result = spawner
607 .send_terminate_signal(TaskTerminateReason::Cleanup)
608 .await;
609 assert!(result.is_err());
610 if let Err(TaskError::Channel(msg)) = result {
611 assert_eq!(msg, "Terminate signal already sent or channel missing");
612 } else {
613 panic!("Expected Channel error");
614 }
615 }
616
617 // TODO: Consider adding serde support for TaskInfo, not skipping Instant fields
618 #[cfg(feature = "serde")]
619 #[tokio::test]
620 async fn task_info_serde() {
621 use serde_json;
622
623 let config = TaskConfig::new("echo");
624 let spawner = TaskSpawner::new("serde_task".to_string(), config);
625 let info = spawner.get_task_info().await;
626
627 // This should work even with Instant fields skipped
628 let serialized = serde_json::to_string(&info).unwrap();
629 println!("Serialized TaskInfo: {}", serialized);
630 assert!(serialized.contains("serde_task"));
631 assert!(serialized.contains("Pending"));
632 }
633}