pub struct TaskSpawner { /* private fields */ }
Expand description
Spawns and manages the lifecycle of a task
TaskSpawner
handles the execution of system processes with comprehensive
monitoring, state management, and event emission. It provides both
synchronous and asynchronous interfaces for process management.
§Features
- State Management: Track task execution through Pending, Running, Ready, and Finished states
- Event Emission: Real-time events for output, state changes, and lifecycle events
- Timeout Handling: Automatic termination when tasks exceed configured timeouts
- Stdin Support: Send input to running processes when enabled
- Ready Detection: Automatic detection when long-running processes are ready
- Process Control: Start, stop, and terminate processes with proper cleanup
§Examples
§Simple Command Execution
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = TaskConfig::new("cmd")
.args(["/C", "echo", "Hello World"]);
let (tx, mut rx) = mpsc::channel(100);
let mut spawner = TaskSpawner::new("hello".to_string(), config);
spawner.start_direct(tx).await?;
// Process events
while let Some(event) = rx.recv().await {
println!("Event: {:?}", event);
if matches!(event, tcrm_task::tasks::event::TaskEvent::Stopped { .. }) {
break;
}
}
Ok(())
}
§Long-running Process with Ready Detection
use tcrm_task::tasks::{config::{TaskConfig, StreamSource}, async_tokio::spawner::TaskSpawner};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = TaskConfig::new("cmd")
.args(["/C", "echo", "Server listening"])
.ready_indicator("Server listening")
.ready_indicator_source(StreamSource::Stdout)
.timeout_ms(30000);
let (tx, mut rx) = mpsc::channel(100);
let mut spawner = TaskSpawner::new("server".to_string(), config);
spawner.start_direct(tx).await?;
// Wait for ready event
while let Some(event) = rx.recv().await {
if matches!(event, tcrm_task::tasks::event::TaskEvent::Ready { .. }) {
println!("Server is ready to accept requests!");
// Server is now ready, can start sending requests
break;
}
}
Ok(())
}
§Interactive Process with Stdin
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = TaskConfig::new("cmd")
.args(["/C", "echo", "Hello"])
.enable_stdin(true);
let (tx, mut rx) = mpsc::channel(100);
let (stdin_tx, stdin_rx) = mpsc::channel(10);
let mut spawner = TaskSpawner::new("cmd".to_string(), config);
// Set up stdin channel - note: set_stdin consumes self and returns Self
spawner = spawner.set_stdin(stdin_rx);
spawner.start_direct(tx).await?;
// Send input to the process
stdin_tx.send("print('Hello from stdin!')".to_string()).await?;
stdin_tx.send("exit()".to_string()).await?;
// Process events
while let Some(event) = rx.recv().await {
match event {
tcrm_task::tasks::event::TaskEvent::Output { line, .. } => {
println!("Output: {}", line);
}
tcrm_task::tasks::event::TaskEvent::Stopped { .. } => break,
_ => {}
}
}
Ok(())
}
Implementations§
Source§impl TaskSpawner
impl TaskSpawner
Sourcepub async fn start_direct(
&mut self,
event_tx: Sender<TaskEvent>,
) -> Result<u32, TaskError>
pub async fn start_direct( &mut self, event_tx: Sender<TaskEvent>, ) -> Result<u32, TaskError>
Start the task and execute it directly with real-time event monitoring
Validates the configuration, spawns the process, and sets up comprehensive monitoring including output capture, timeout handling, stdin support, and ready detection. Events are sent through the provided channel as the task executes.
§Process Lifecycle
- Validation: Configuration is validated for security and correctness
- Process Spawn: System process is created with configured parameters
- Monitoring Setup: Watchers are spawned for stdout/stderr, stdin, timeouts, and process completion
- Event Emission: Real-time events are sent as the process executes
- Cleanup: Process and resources are cleaned up when execution completes
§Arguments
event_tx
- Channel sender for receiving task events in real-time
§Returns
Ok(process_id)
- The system process ID if the task was started successfullyErr(TaskError)
- Configuration validation error, spawn failure, or other issues
§Events Emitted
TaskEvent::Started
- Process has been spawned and is runningTaskEvent::Output
- Output line received from stdout/stderrTaskEvent::Ready
- Ready indicator detected (for long-running processes)TaskEvent::Stopped
- Process has completed with exit code and reasonTaskEvent::Error
- An error occurred during execution
§Examples
§Simple Command
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = TaskConfig::new("cmd").args(["/C", "echo", "Hello, World!"]);
let mut spawner = TaskSpawner::new("greeting".to_string(), config);
let (tx, mut rx) = mpsc::channel(100);
let process_id = spawner.start_direct(tx).await?;
println!("Started process with ID: {}", process_id);
// Process all events until completion
while let Some(event) = rx.recv().await {
match event {
tcrm_task::tasks::event::TaskEvent::Output { line, .. } => {
println!("Output: {}", line);
}
tcrm_task::tasks::event::TaskEvent::Stopped { exit_code, .. } => {
println!("Process finished with exit code: {:?}", exit_code);
break;
}
_ => {}
}
}
Ok(())
}
§Long-running Process with Ready Detection
use tcrm_task::tasks::{
config::{TaskConfig, StreamSource},
async_tokio::spawner::TaskSpawner,
event::TaskEvent
};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = TaskConfig::new("cmd")
.args(["/C", "echo", "Server listening on"])
.ready_indicator("Server listening on")
.ready_indicator_source(StreamSource::Stdout)
.timeout_ms(30000); // 30 second timeout
let mut spawner = TaskSpawner::new("web-server".to_string(), config);
let (tx, mut rx) = mpsc::channel(100);
spawner.start_direct(tx).await?;
// Wait for the server to be ready
while let Some(event) = rx.recv().await {
match event {
TaskEvent::Ready { task_name } => {
println!("Server '{}' is ready to accept requests!", task_name);
// Now you can start sending requests to the server
break;
}
TaskEvent::Error { error, .. } => {
eprintln!("Server failed to start: {}", error);
return Err(error.into());
}
TaskEvent::Output { line, .. } => {
println!("Server log: {}", line);
}
_ => {}
}
}
Ok(())
}
§Interactive Process with Stdin
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = TaskConfig::new("python")
.args(["-i"]) // Interactive mode
.enable_stdin(true);
let (stdin_tx, stdin_rx) = mpsc::channel(10);
let mut spawner = TaskSpawner::new("python-repl".to_string(), config)
.set_stdin(stdin_rx);
let (tx, mut rx) = mpsc::channel(100);
spawner.start_direct(tx).await?;
// Send some Python commands
stdin_tx.send("print('Hello from Python!')".to_string()).await?;
stdin_tx.send("2 + 2".to_string()).await?;
stdin_tx.send("exit()".to_string()).await?;
// Process output
while let Some(event) = rx.recv().await {
match event {
tcrm_task::tasks::event::TaskEvent::Output { line, .. } => {
println!("Python: {}", line);
}
tcrm_task::tasks::event::TaskEvent::Stopped { .. } => break,
_ => {}
}
}
Ok(())
}
§Validation
This method validates the configuration before execution
§Watchers
The method spawns multiple async watchers for different aspects of process monitoring:
- Output watchers (stdout/stderr)
- Stdin watcher (if enabled)
- Timeout watcher (if configured)
- Process completion watcher
- Result aggregation watcher
All watchers run concurrently for responsiveness.
§Errors
Returns a TaskError
if:
- Task configuration validation fails
- Process fails to start due to invalid command or working directory
- Unable to obtain process ID from started child process
Source§impl TaskSpawner
impl TaskSpawner
Sourcepub fn new(task_name: String, config: TaskConfig) -> TaskSpawner
pub fn new(task_name: String, config: TaskConfig) -> TaskSpawner
Create a new task spawner for the given task name and configuration
Creates a new TaskSpawner
instance in the Pending state. The configuration
is not validated until start_direct
is called.
§Arguments
task_name
- Unique identifier for this task instanceconfig
- Task configuration defining command, arguments, environment, etc.
§Examples
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
let config = TaskConfig::new("echo").args(["hello"]);
let spawner = TaskSpawner::new("my-task".to_string(), config);
Sourcepub fn set_stdin(self, stdin_rx: Receiver<String>) -> TaskSpawner
pub fn set_stdin(self, stdin_rx: Receiver<String>) -> TaskSpawner
Set the stdin receiver for the task, enabling asynchronous input
Configures a channel for sending input to the process stdin. This method
has no effect if enable_stdin
is false in the task configuration.
§Arguments
stdin_rx
- Receiver channel for stdin input strings
§Examples
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
use tokio::sync::mpsc;
let config = TaskConfig::new("python")
.args(["-i"])
.enable_stdin(true);
let (stdin_tx, stdin_rx) = mpsc::channel(10);
let spawner = TaskSpawner::new("interactive".to_string(), config)
.set_stdin(stdin_rx);
Sourcepub async fn get_state(&self) -> TaskState
pub async fn get_state(&self) -> TaskState
Get the current state of the task
Returns the current execution state of the task. States progress through: Pending → Initiating → Running → (Ready) → Finished
§Examples
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner, state::TaskState};
#[tokio::main]
async fn main() {
let config = TaskConfig::new("echo");
let spawner = TaskSpawner::new("test".to_string(), config);
assert_eq!(spawner.get_state().await, TaskState::Pending);
}
Sourcepub async fn is_running(&self) -> bool
pub async fn is_running(&self) -> bool
Check if the task is currently running
Returns true if the task state is Running, false otherwise.
§Examples
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
#[tokio::main]
async fn main() {
let config = TaskConfig::new("echo");
let spawner = TaskSpawner::new("test".to_string(), config);
assert!(!spawner.is_running().await); // Not running initially
}
Sourcepub async fn is_ready(&self) -> bool
pub async fn is_ready(&self) -> bool
Check if the task is currently ready
Returns true if the task state is Ready, false otherwise. The Ready state indicates a long-running process has signaled it’s ready to accept requests (via the ready indicator).
§Examples
use tcrm_task::tasks::{config::{TaskConfig, StreamSource}, async_tokio::spawner::TaskSpawner};
#[tokio::main]
async fn main() {
let config = TaskConfig::new("my-server")
.ready_indicator("Server ready")
.ready_indicator_source(StreamSource::Stdout);
let spawner = TaskSpawner::new("server".to_string(), config);
assert!(!spawner.is_ready().await); // Not ready initially
}
Sourcepub fn uptime(&self) -> Duration
pub fn uptime(&self) -> Duration
Get the uptime of the task since creation
Returns the duration since the TaskSpawner
was created, regardless
of the current execution state.
§Examples
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
use std::time::Duration;
#[tokio::main]
async fn main() {
let config = TaskConfig::new("echo");
let spawner = TaskSpawner::new("test".to_string(), config);
let uptime = spawner.uptime();
assert!(uptime < Duration::from_secs(1)); // Just created
}
Sourcepub async fn get_task_info(&self) -> TaskInfo
pub async fn get_task_info(&self) -> TaskInfo
Get comprehensive information about the task
Returns a TaskInfo
struct containing the task name, current state,
uptime, creation time, and completion time (if finished).
§Examples
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
#[tokio::main]
async fn main() {
let config = TaskConfig::new("echo").args(["hello"]);
let spawner = TaskSpawner::new("info-test".to_string(), config);
let info = spawner.get_task_info().await;
println!("Task '{}' has been running for {:?}", info.name, info.uptime);
}
Sourcepub async fn get_process_id(&self) -> Option<u32>
pub async fn get_process_id(&self) -> Option<u32>
Get the process ID of the running task (if any)
Returns the system process ID if the task is currently running, or None if the task hasn’t started or has finished.
§Examples
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = TaskConfig::new("cmd").args(["/C", "ping", "127.0.0.1", "-n", "2"]);
let mut spawner = TaskSpawner::new("pid-test".to_string(), config);
assert_eq!(spawner.get_process_id().await, None); // Not started yet
let (tx, _rx) = mpsc::channel(100);
spawner.start_direct(tx).await?;
// Now should have a process ID
let pid = spawner.get_process_id().await;
assert!(pid.is_some());
Ok(())
}
Sourcepub async fn send_terminate_signal(
&self,
reason: TaskTerminateReason,
) -> Result<(), TaskError>
pub async fn send_terminate_signal( &self, reason: TaskTerminateReason, ) -> Result<(), TaskError>
Send a termination signal to the running task
Requests graceful termination of the running process with the specified reason. The process may take some time to respond to the termination signal.
§Arguments
reason
- The reason for termination (Timeout, Cleanup, etc.)
§Returns
Ok(())
if the termination signal was sent successfullyErr(TaskError::Channel)
if the signal could not be sent
§Errors
Returns a TaskError::Channel
if the internal termination channel
has been closed and the signal cannot be delivered to the task.
§Examples
use tcrm_task::tasks::{
config::TaskConfig,
async_tokio::spawner::TaskSpawner,
event::TaskTerminateReason
};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = TaskConfig::new("cmd").args(["/C", "ping", "127.0.0.1", "-n", "10"]); // Long-running task
let mut spawner = TaskSpawner::new("terminate-test".to_string(), config);
let (tx, mut rx) = mpsc::channel(100);
spawner.start_direct(tx).await?;
// Wait a bit, then terminate
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
spawner.send_terminate_signal(TaskTerminateReason::Cleanup).await?;
// Process events until stopped
while let Some(event) = rx.recv().await {
if matches!(event, tcrm_task::tasks::event::TaskEvent::Stopped { .. }) {
break;
}
}
Ok(())
}