TaskMonitor

Struct TaskMonitor 

Source
pub struct TaskMonitor {
    pub tasks: TcrmTasks,
    pub tasks_spawner: HashMap<String, TaskSpawner>,
    pub dependencies: HashMap<String, Vec<String>>,
    pub dependents: HashMap<String, Vec<String>>,
    pub stdin_senders: HashMap<String, Sender<String>>,
}
Expand description

Main task monitor for managing and executing task graphs.

The TaskMonitor is responsible for:

  • Validating task dependencies and detecting circular dependencies
  • Managing task spawners and their lifecycle
  • Handling stdin communication for interactive tasks
  • Tracking dependency relationships for proper execution order

§Examples

§Basic Usage

use std::collections::HashMap;
use tcrm_monitor::monitor::{TaskMonitor, config::{TaskSpec, TaskShell}};
use tcrm_task::tasks::config::TaskConfig;

let mut tasks = HashMap::new();

tasks.insert(
    "compile".to_string(),
    TaskSpec::new(TaskConfig::new("cargo").args(["build"]))
        .shell(TaskShell::Auto)
);

let monitor = TaskMonitor::new(tasks)?;

§With Dependencies

use std::collections::HashMap;
use tcrm_monitor::monitor::{TaskMonitor, config::{TaskSpec, TaskShell}};
use tcrm_task::tasks::config::TaskConfig;

let mut tasks = HashMap::new();

tasks.insert(
    "test".to_string(),
    TaskSpec::new(TaskConfig::new("cargo").args(["test"]))
        .shell(TaskShell::Auto)
);

tasks.insert(
    "build".to_string(),
    TaskSpec::new(TaskConfig::new("cargo").args(["build", "--release"]))
        .dependencies(["test"])
        .shell(TaskShell::Auto)
);

let monitor = TaskMonitor::new(tasks)?;

§Interactive Tasks with Stdin

use std::collections::HashMap;
use tcrm_monitor::monitor::{TaskMonitor, config::{TaskSpec, TaskShell}};
use tcrm_task::tasks::config::TaskConfig;

let mut tasks = HashMap::new();

tasks.insert(
    "interactive".to_string(),
    TaskSpec::new(
        TaskConfig::new("python")
            .args(["-c", "input('Enter something: ')"])
            .enable_stdin(true)
    )
    .shell(TaskShell::Auto)
);

let monitor = TaskMonitor::new(tasks)?;
// The monitor automatically sets up stdin channels for tasks that need them

Fields§

§tasks: TcrmTasks

Collection of task specifications indexed by task name

§tasks_spawner: HashMap<String, TaskSpawner>

Task spawners for managing individual task execution

§dependencies: HashMap<String, Vec<String>>

Mapping of tasks to their direct dependencies (tasks they depend on)

§dependents: HashMap<String, Vec<String>>

Mapping of tasks to their dependents (tasks that depend on them)

§stdin_senders: HashMap<String, Sender<String>>

Stdin senders for tasks that have stdin enabled

Implementations§

Source§

impl TaskMonitor

Source

pub async fn execute_all_direct(&mut self, event_tx: Option<Sender<TaskEvent>>)

Execute all tasks using the direct execution strategy.

This method executes tasks in dependency order, running independent tasks in parallel. It processes task events and manages the execution flow until all tasks complete.

§Arguments
  • event_tx - Optional sender to forward task events to external listeners
§Behavior
  1. Starts all tasks that have no dependencies
  2. Listens for task events (started, output, ready, stopped, error)
  3. When a task becomes ready, starts its dependents if their dependencies are satisfied
  4. Continues until all tasks have completed or failed
  5. Handles task termination based on terminate_after_dependents_finished flag
§Examples
use std::collections::HashMap;
use tokio::sync::mpsc;
use tcrm_monitor::monitor::{TaskMonitor, config::TaskSpec};
use tcrm_task::tasks::{config::TaskConfig, event::TaskEvent};
use tcrm_monitor::monitor::config::TaskShell;

let mut tasks = HashMap::new();
tasks.insert(
    "test".to_string(),
    TaskSpec::new(TaskConfig::new("echo").args(["Running tests"])).shell(TaskShell::Auto)
);

let mut monitor = TaskMonitor::new(tasks)?;

// Execute without event monitoring
monitor.execute_all_direct(None).await;

// Create a new monitor for the second example
let mut tasks2 = HashMap::new();
tasks2.insert(
    "test2".to_string(),
    TaskSpec::new(TaskConfig::new("echo").args(["Running tests 2"])).shell(TaskShell::Auto)
);
let mut monitor2 = TaskMonitor::new(tasks2)?;

// Or with event monitoring
let (event_tx, mut event_rx) = mpsc::channel(100);
let event_handler = tokio::spawn(async move {
    while let Some(event) = event_rx.recv().await {
        println!("Task event: {:?}", event);
        // Break on certain events to avoid hanging
        if matches!(event, TaskEvent::Stopped { .. }) {
            break;
        }
    }
});

monitor2.execute_all_direct(Some(event_tx)).await;
let _ = event_handler.await;
Source

pub async fn execute_all_direct_with_control( &mut self, event_tx: Option<Sender<TaskMonitorEvent>>, control_rx: Receiver<TaskMonitorControlCommand>, )

Execute all tasks with real-time control capabilities.

This method provides advanced task execution with real-time control capabilities including the ability to send stdin to running tasks, request task termination, and gracefully stop all execution.

§Parameters
  • event_tx - Optional channel to receive task execution events
  • control_rx - Channel to receive control commands during execution
§Control Commands

The control channel accepts TaskMonitorControlCommand commands:

  • SendStdin { task_name, data } - Send input to a specific task’s stdin
  • TerminateTask { task_name } - Request termination of a specific task
  • TerminateAll - Request termination of all running tasks
§Examples
use std::collections::HashMap;
use tokio::sync::mpsc;
use tcrm_monitor::monitor::{
    tasks::TaskMonitor,
    config::{TaskSpec, TaskShell},
    event::{TaskMonitorControlCommand, TaskMonitorEvent}
};
use tcrm_task::tasks::config::TaskConfig;

let mut tasks = HashMap::new();
tasks.insert(
    "interactive_task".to_string(),
    TaskSpec::new(TaskConfig::new("cat"))  // cat reads from stdin
        .shell(TaskShell::Auto)
);

let mut monitor = TaskMonitor::new(tasks)?;
let (event_tx, mut event_rx) = mpsc::channel(100);
let (control_tx, control_rx) = mpsc::channel(10);

// Spawn control task
let control_handle = tokio::spawn(async move {
    // Send some input to the task
    control_tx.send(TaskMonitorControlCommand::SendStdin {
        task_name: "interactive_task".to_string(),
        input: "Hello, World!\n".to_string(),
    }).await.unwrap();

    // Wait a bit then stop
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
    control_tx.send(TaskMonitorControlCommand::TerminateAllTasks).await.unwrap();
});

// Spawn event listener
let event_handle = tokio::spawn(async move {
    while let Some(event) = event_rx.recv().await {
        match event {
            TaskMonitorEvent::Completed { .. } => break,
            _ => {}
        }
    }
});

// Execute with control
monitor.execute_all_direct_with_control(Some(event_tx), control_rx).await;

// Wait for background tasks
control_handle.await?;
event_handle.await?;
Source§

impl TaskMonitor

Source

pub fn new(tasks: TcrmTasks) -> Result<Self, TaskMonitorError>

Creates a new task monitor from a collection of task specifications.

This method performs several important initialization steps:

  1. Builds dependency maps for both dependencies and dependents
  2. Validates for circular dependencies
  3. Applies shell configuration to tasks
  4. Creates task spawners for each task
  5. Sets up stdin channels for interactive tasks
§Arguments
  • tasks - A HashMap of task names to task specifications
§Returns
  • Ok(TaskMonitor) - Successfully created task monitor with all spawners initialized
  • Err(TaskMonitorError) - If dependency validation fails or circular dependencies detected
§Errors
§Examples
use std::collections::HashMap;
use tcrm_monitor::monitor::{TaskMonitor, config::TaskSpec};
use tcrm_task::tasks::config::TaskConfig;

let mut tasks = HashMap::new();
tasks.insert(
    "test".to_string(),
    TaskSpec::new(TaskConfig::new("cargo").args(["test"]))
);

let monitor = TaskMonitor::new(tasks)?;
// Task spawners and stdin channels are automatically set up
§Interactive Tasks

Tasks with stdin enabled automatically get stdin channels:

use std::collections::HashMap;
use tcrm_monitor::monitor::{TaskMonitor, config::TaskSpec};
use tcrm_task::tasks::config::TaskConfig;

let mut tasks = HashMap::new();
tasks.insert(
    "interactive".to_string(),
    TaskSpec::new(
        TaskConfig::new("read").args(["input"])
            .enable_stdin(true)
    )
);

let monitor = TaskMonitor::new(tasks)?;
// monitor.stdin_senders now contains a channel for "interactive"

Trait Implementations§

Source§

impl Debug for TaskMonitor

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.