TaskSpawner

Struct TaskSpawner 

Source
pub struct TaskSpawner { /* private fields */ }
Expand description

Spawns and manages the lifecycle of a task

TaskSpawner handles the execution of system processes with comprehensive monitoring, state management, and event emission. It provides both synchronous and asynchronous interfaces for process management.

§Features

  • State Management: Track task execution through Pending, Running, Ready, and Finished states
  • Event Emission: Real-time events for output, state changes, and lifecycle events
  • Timeout Handling: Automatic termination when tasks exceed configured timeouts
  • Stdin Support: Send input to running processes when enabled
  • Ready Detection: Automatic detection when long-running processes are ready
  • Process Control: Start, stop, and terminate processes with proper cleanup

§Examples

§Simple Command Execution

use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = TaskConfig::new("cmd")
        .args(["/C", "echo", "Hello World"]);

    let (tx, mut rx) = mpsc::channel(100);
    let mut spawner = TaskSpawner::new("hello".to_string(), config);
     
    spawner.start_direct(tx).await?;

    // Process events
    while let Some(event) = rx.recv().await {
        println!("Event: {:?}", event);
        if matches!(event, tcrm_task::tasks::event::TaskEvent::Stopped { .. }) {
            break;
        }
    }

    Ok(())
}

§Long-running Process with Ready Detection

use tcrm_task::tasks::{config::{TaskConfig, StreamSource}, async_tokio::spawner::TaskSpawner};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = TaskConfig::new("cmd")
        .args(["/C", "echo", "Server listening"])
        .ready_indicator("Server listening")
        .ready_indicator_source(StreamSource::Stdout)
        .timeout_ms(30000);

    let (tx, mut rx) = mpsc::channel(100);
    let mut spawner = TaskSpawner::new("server".to_string(), config);
     
    spawner.start_direct(tx).await?;

    // Wait for ready event
    while let Some(event) = rx.recv().await {
        if matches!(event, tcrm_task::tasks::event::TaskEvent::Ready { .. }) {
            println!("Server is ready to accept requests!");
            // Server is now ready, can start sending requests
            break;
        }
    }

    Ok(())
}

§Interactive Process with Stdin

use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = TaskConfig::new("cmd")
        .args(["/C", "echo", "Hello"])
        .enable_stdin(true);

    let (tx, mut rx) = mpsc::channel(100);
    let (stdin_tx, stdin_rx) = mpsc::channel(10);
    let mut spawner = TaskSpawner::new("cmd".to_string(), config);
     
    // Set up stdin channel - note: set_stdin consumes self and returns Self
    spawner = spawner.set_stdin(stdin_rx);
     
    spawner.start_direct(tx).await?;

    // Send input to the process
    stdin_tx.send("print('Hello from stdin!')".to_string()).await?;
    stdin_tx.send("exit()".to_string()).await?;

    // Process events
    while let Some(event) = rx.recv().await {
        match event {
            tcrm_task::tasks::event::TaskEvent::Output { line, .. } => {
                println!("Output: {}", line);
            }
            tcrm_task::tasks::event::TaskEvent::Stopped { .. } => break,
            _ => {}
        }
    }

    Ok(())
}

Implementations§

Source§

impl TaskSpawner

Source

pub async fn start_direct( &mut self, event_tx: Sender<TaskEvent>, ) -> Result<u32, TaskError>

Start the task and execute it directly with real-time event monitoring

Validates the configuration, spawns the process, and sets up comprehensive monitoring including output capture, timeout handling, stdin support, and ready detection. Events are sent through the provided channel as the task executes.

§Process Lifecycle
  1. Validation: Configuration is validated for security and correctness
  2. Process Spawn: System process is created with configured parameters
  3. Monitoring Setup: Watchers are spawned for stdout/stderr, stdin, timeouts, and process completion
  4. Event Emission: Real-time events are sent as the process executes
  5. Cleanup: Process and resources are cleaned up when execution completes
§Arguments
  • event_tx - Channel sender for receiving task events in real-time
§Returns
  • Ok(process_id) - The system process ID if the task was started successfully
  • Err(TaskError) - Configuration validation error, spawn failure, or other issues
§Events Emitted
  • TaskEvent::Started - Process has been spawned and is running
  • TaskEvent::Output - Output line received from stdout/stderr
  • TaskEvent::Ready - Ready indicator detected (for long-running processes)
  • TaskEvent::Stopped - Process has completed with exit code and reason
  • TaskEvent::Error - An error occurred during execution
§Examples
§Simple Command
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = TaskConfig::new("cmd").args(["/C", "echo", "Hello, World!"]);
    let mut spawner = TaskSpawner::new("greeting".to_string(), config);
     
    let (tx, mut rx) = mpsc::channel(100);
    let process_id = spawner.start_direct(tx).await?;
    println!("Started process with ID: {}", process_id);

    // Process all events until completion
    while let Some(event) = rx.recv().await {
        match event {
            tcrm_task::tasks::event::TaskEvent::Output { line, .. } => {
                println!("Output: {}", line);
            }
            tcrm_task::tasks::event::TaskEvent::Stopped { exit_code, .. } => {
                println!("Process finished with exit code: {:?}", exit_code);
                break;
            }
            _ => {}
        }
    }

    Ok(())
}
§Long-running Process with Ready Detection
use tcrm_task::tasks::{
    config::{TaskConfig, StreamSource},
    async_tokio::spawner::TaskSpawner,
    event::TaskEvent
};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = TaskConfig::new("cmd")
        .args(["/C", "echo", "Server listening on"])
        .ready_indicator("Server listening on")
        .ready_indicator_source(StreamSource::Stdout)
        .timeout_ms(30000); // 30 second timeout

    let mut spawner = TaskSpawner::new("web-server".to_string(), config);
    let (tx, mut rx) = mpsc::channel(100);
     
    spawner.start_direct(tx).await?;

    // Wait for the server to be ready
    while let Some(event) = rx.recv().await {
        match event {
            TaskEvent::Ready { task_name } => {
                println!("Server '{}' is ready to accept requests!", task_name);
                // Now you can start sending requests to the server
                break;
            }
            TaskEvent::Error { error, .. } => {
                eprintln!("Server failed to start: {}", error);
                return Err(error.into());
            }
            TaskEvent::Output { line, .. } => {
                println!("Server log: {}", line);
            }
            _ => {}
        }
    }

    Ok(())
}
§Interactive Process with Stdin
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = TaskConfig::new("python")
        .args(["-i"])  // Interactive mode
        .enable_stdin(true);

    let (stdin_tx, stdin_rx) = mpsc::channel(10);
    let mut spawner = TaskSpawner::new("python-repl".to_string(), config)
        .set_stdin(stdin_rx);
     
    let (tx, mut rx) = mpsc::channel(100);
    spawner.start_direct(tx).await?;

    // Send some Python commands
    stdin_tx.send("print('Hello from Python!')".to_string()).await?;
    stdin_tx.send("2 + 2".to_string()).await?;
    stdin_tx.send("exit()".to_string()).await?;

    // Process output
    while let Some(event) = rx.recv().await {
        match event {
            tcrm_task::tasks::event::TaskEvent::Output { line, .. } => {
                println!("Python: {}", line);
            }
            tcrm_task::tasks::event::TaskEvent::Stopped { .. } => break,
            _ => {}
        }
    }

    Ok(())
}
§Validation

This method validates the configuration before execution

§Watchers

The method spawns multiple async watchers for different aspects of process monitoring:

  • Output watchers (stdout/stderr)
  • Stdin watcher (if enabled)
  • Timeout watcher (if configured)
  • Process completion watcher
  • Result aggregation watcher

All watchers run concurrently for responsiveness.

§Errors

Returns a TaskError if:

  • Task configuration validation fails
  • Process fails to start due to invalid command or working directory
  • Unable to obtain process ID from started child process
Source§

impl TaskSpawner

Source

pub fn new(task_name: String, config: TaskConfig) -> TaskSpawner

Create a new task spawner for the given task name and configuration

Creates a new TaskSpawner instance in the Pending state. The configuration is not validated until start_direct is called.

§Arguments
  • task_name - Unique identifier for this task instance
  • config - Task configuration defining command, arguments, environment, etc.
§Examples
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};

let config = TaskConfig::new("echo").args(["hello"]);
let spawner = TaskSpawner::new("my-task".to_string(), config);
Source

pub fn set_stdin(self, stdin_rx: Receiver<String>) -> TaskSpawner

Set the stdin receiver for the task, enabling asynchronous input

Configures a channel for sending input to the process stdin. This method has no effect if enable_stdin is false in the task configuration.

§Arguments
  • stdin_rx - Receiver channel for stdin input strings
§Examples
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
use tokio::sync::mpsc;

let config = TaskConfig::new("python")
    .args(["-i"])
    .enable_stdin(true);

let (stdin_tx, stdin_rx) = mpsc::channel(10);
let spawner = TaskSpawner::new("interactive".to_string(), config)
    .set_stdin(stdin_rx);
Source

pub async fn get_state(&self) -> TaskState

Get the current state of the task

Returns the current execution state of the task. States progress through: Pending → Initiating → Running → (Ready) → Finished

§Examples
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner, state::TaskState};

#[tokio::main]
async fn main() {
    let config = TaskConfig::new("echo");
    let spawner = TaskSpawner::new("test".to_string(), config);
     
    assert_eq!(spawner.get_state().await, TaskState::Pending);
}
Source

pub async fn is_running(&self) -> bool

Check if the task is currently running

Returns true if the task state is Running, false otherwise.

§Examples
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};

#[tokio::main]
async fn main() {
    let config = TaskConfig::new("echo");
    let spawner = TaskSpawner::new("test".to_string(), config);
     
    assert!(!spawner.is_running().await); // Not running initially
}
Source

pub async fn is_ready(&self) -> bool

Check if the task is currently ready

Returns true if the task state is Ready, false otherwise. The Ready state indicates a long-running process has signaled it’s ready to accept requests (via the ready indicator).

§Examples
use tcrm_task::tasks::{config::{TaskConfig, StreamSource}, async_tokio::spawner::TaskSpawner};

#[tokio::main]
async fn main() {
    let config = TaskConfig::new("my-server")
        .ready_indicator("Server ready")
        .ready_indicator_source(StreamSource::Stdout);
    let spawner = TaskSpawner::new("server".to_string(), config);
     
    assert!(!spawner.is_ready().await); // Not ready initially
}
Source

pub fn uptime(&self) -> Duration

Get the uptime of the task since creation

Returns the duration since the TaskSpawner was created, regardless of the current execution state.

§Examples
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
use std::time::Duration;

#[tokio::main]
async fn main() {
    let config = TaskConfig::new("echo");
    let spawner = TaskSpawner::new("test".to_string(), config);
     
    let uptime = spawner.uptime();
    assert!(uptime < Duration::from_secs(1)); // Just created
}
Source

pub async fn get_task_info(&self) -> TaskInfo

Get comprehensive information about the task

Returns a TaskInfo struct containing the task name, current state, uptime, creation time, and completion time (if finished).

§Examples
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};

#[tokio::main]
async fn main() {
    let config = TaskConfig::new("echo").args(["hello"]);
    let spawner = TaskSpawner::new("info-test".to_string(), config);
     
    let info = spawner.get_task_info().await;
    println!("Task '{}' has been running for {:?}", info.name, info.uptime);
}
Source

pub async fn get_process_id(&self) -> Option<u32>

Get the process ID of the running task (if any)

Returns the system process ID if the task is currently running, or None if the task hasn’t started or has finished.

§Examples
use tcrm_task::tasks::{config::TaskConfig, async_tokio::spawner::TaskSpawner};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = TaskConfig::new("cmd").args(["/C", "ping", "127.0.0.1", "-n", "2"]);
    let mut spawner = TaskSpawner::new("pid-test".to_string(), config);
     
    assert_eq!(spawner.get_process_id().await, None); // Not started yet
     
    let (tx, _rx) = mpsc::channel(100);
    spawner.start_direct(tx).await?;
     
    // Now should have a process ID
    let pid = spawner.get_process_id().await;
    assert!(pid.is_some());
     
    Ok(())
}
Source

pub async fn send_terminate_signal( &self, reason: TaskTerminateReason, ) -> Result<(), TaskError>

Send a termination signal to the running task

Requests graceful termination of the running process with the specified reason. The process may take some time to respond to the termination signal.

§Arguments
  • reason - The reason for termination (Timeout, Cleanup, etc.)
§Returns
  • Ok(()) if the termination signal was sent successfully
  • Err(TaskError::Channel) if the signal could not be sent
§Errors

Returns a TaskError::Channel if the internal termination channel has been closed and the signal cannot be delivered to the task.

§Examples
use tcrm_task::tasks::{
    config::TaskConfig,
    async_tokio::spawner::TaskSpawner,
    event::TaskTerminateReason
};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = TaskConfig::new("cmd").args(["/C", "ping", "127.0.0.1", "-n", "10"]); // Long-running task
    let mut spawner = TaskSpawner::new("terminate-test".to_string(), config);
     
    let (tx, mut rx) = mpsc::channel(100);
    spawner.start_direct(tx).await?;
     
    // Wait a bit, then terminate
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
    spawner.send_terminate_signal(TaskTerminateReason::Cleanup).await?;
     
    // Process events until stopped
    while let Some(event) = rx.recv().await {
        if matches!(event, tcrm_task::tasks::event::TaskEvent::Stopped { .. }) {
            break;
        }
    }
     
    Ok(())
}

Trait Implementations§

Source§

impl Debug for TaskSpawner

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.