pub struct TaskMonitor {
pub tasks: TcrmTasks,
pub tasks_spawner: HashMap<String, TaskSpawner>,
pub dependencies: HashMap<String, Vec<String>>,
pub dependents: HashMap<String, Vec<String>>,
pub stdin_senders: HashMap<String, Sender<String>>,
}
Expand description
Main task monitor for managing and executing task graphs.
The TaskMonitor
is responsible for:
- Validating task dependencies and detecting circular dependencies
- Managing task spawners and their lifecycle
- Handling stdin communication for interactive tasks
- Tracking dependency relationships for proper execution order
§Examples
§Basic Usage
use std::collections::HashMap;
use tcrm_monitor::monitor::{TaskMonitor, config::{TaskSpec, TaskShell}};
use tcrm_task::tasks::config::TaskConfig;
let mut tasks = HashMap::new();
tasks.insert(
"compile".to_string(),
TaskSpec::new(TaskConfig::new("cargo").args(["build"]))
.shell(TaskShell::Auto)
);
let monitor = TaskMonitor::new(tasks)?;
§With Dependencies
use std::collections::HashMap;
use tcrm_monitor::monitor::{TaskMonitor, config::{TaskSpec, TaskShell}};
use tcrm_task::tasks::config::TaskConfig;
let mut tasks = HashMap::new();
tasks.insert(
"test".to_string(),
TaskSpec::new(TaskConfig::new("cargo").args(["test"]))
.shell(TaskShell::Auto)
);
tasks.insert(
"build".to_string(),
TaskSpec::new(TaskConfig::new("cargo").args(["build", "--release"]))
.dependencies(["test"])
.shell(TaskShell::Auto)
);
let monitor = TaskMonitor::new(tasks)?;
§Interactive Tasks with Stdin
use std::collections::HashMap;
use tcrm_monitor::monitor::{TaskMonitor, config::{TaskSpec, TaskShell}};
use tcrm_task::tasks::config::TaskConfig;
let mut tasks = HashMap::new();
tasks.insert(
"interactive".to_string(),
TaskSpec::new(
TaskConfig::new("python")
.args(["-c", "input('Enter something: ')"])
.enable_stdin(true)
)
.shell(TaskShell::Auto)
);
let monitor = TaskMonitor::new(tasks)?;
// The monitor automatically sets up stdin channels for tasks that need them
Fields§
§tasks: TcrmTasks
Collection of task specifications indexed by task name
tasks_spawner: HashMap<String, TaskSpawner>
Task spawners for managing individual task execution
dependencies: HashMap<String, Vec<String>>
Mapping of tasks to their direct dependencies (tasks they depend on)
dependents: HashMap<String, Vec<String>>
Mapping of tasks to their dependents (tasks that depend on them)
stdin_senders: HashMap<String, Sender<String>>
Stdin senders for tasks that have stdin enabled
Implementations§
Source§impl TaskMonitor
impl TaskMonitor
Sourcepub async fn execute_all_direct(&mut self, event_tx: Option<Sender<TaskEvent>>)
pub async fn execute_all_direct(&mut self, event_tx: Option<Sender<TaskEvent>>)
Execute all tasks using the direct execution strategy.
This method executes tasks in dependency order, running independent tasks in parallel. It processes task events and manages the execution flow until all tasks complete.
§Arguments
event_tx
- Optional sender to forward task events to external listeners
§Behavior
- Starts all tasks that have no dependencies
- Listens for task events (started, output, ready, stopped, error)
- When a task becomes ready, starts its dependents if their dependencies are satisfied
- Continues until all tasks have completed or failed
- Handles task termination based on
terminate_after_dependents_finished
flag
§Examples
use std::collections::HashMap;
use tokio::sync::mpsc;
use tcrm_monitor::monitor::{TaskMonitor, config::TaskSpec};
use tcrm_task::tasks::{config::TaskConfig, event::TaskEvent};
use tcrm_monitor::monitor::config::TaskShell;
let mut tasks = HashMap::new();
tasks.insert(
"test".to_string(),
TaskSpec::new(TaskConfig::new("echo").args(["Running tests"])).shell(TaskShell::Auto)
);
let mut monitor = TaskMonitor::new(tasks)?;
// Execute without event monitoring
monitor.execute_all_direct(None).await;
// Create a new monitor for the second example
let mut tasks2 = HashMap::new();
tasks2.insert(
"test2".to_string(),
TaskSpec::new(TaskConfig::new("echo").args(["Running tests 2"])).shell(TaskShell::Auto)
);
let mut monitor2 = TaskMonitor::new(tasks2)?;
// Or with event monitoring
let (event_tx, mut event_rx) = mpsc::channel(100);
let event_handler = tokio::spawn(async move {
while let Some(event) = event_rx.recv().await {
println!("Task event: {:?}", event);
// Break on certain events to avoid hanging
if matches!(event, TaskEvent::Stopped { .. }) {
break;
}
}
});
monitor2.execute_all_direct(Some(event_tx)).await;
let _ = event_handler.await;
Sourcepub async fn execute_all_direct_with_control(
&mut self,
event_tx: Option<Sender<TaskMonitorEvent>>,
control_rx: Receiver<TaskMonitorControlCommand>,
)
pub async fn execute_all_direct_with_control( &mut self, event_tx: Option<Sender<TaskMonitorEvent>>, control_rx: Receiver<TaskMonitorControlCommand>, )
Execute all tasks with real-time control capabilities.
This method provides advanced task execution with real-time control capabilities including the ability to send stdin to running tasks, request task termination, and gracefully stop all execution.
§Parameters
event_tx
- Optional channel to receive task execution eventscontrol_rx
- Channel to receive control commands during execution
§Control Commands
The control channel accepts TaskMonitorControlCommand
commands:
SendStdin { task_name, data }
- Send input to a specific task’s stdinTerminateTask { task_name }
- Request termination of a specific taskTerminateAll
- Request termination of all running tasks
§Examples
use std::collections::HashMap;
use tokio::sync::mpsc;
use tcrm_monitor::monitor::{
tasks::TaskMonitor,
config::{TaskSpec, TaskShell},
event::{TaskMonitorControlCommand, TaskMonitorEvent}
};
use tcrm_task::tasks::config::TaskConfig;
let mut tasks = HashMap::new();
tasks.insert(
"interactive_task".to_string(),
TaskSpec::new(TaskConfig::new("cat")) // cat reads from stdin
.shell(TaskShell::Auto)
);
let mut monitor = TaskMonitor::new(tasks)?;
let (event_tx, mut event_rx) = mpsc::channel(100);
let (control_tx, control_rx) = mpsc::channel(10);
// Spawn control task
let control_handle = tokio::spawn(async move {
// Send some input to the task
control_tx.send(TaskMonitorControlCommand::SendStdin {
task_name: "interactive_task".to_string(),
input: "Hello, World!\n".to_string(),
}).await.unwrap();
// Wait a bit then stop
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
control_tx.send(TaskMonitorControlCommand::TerminateAllTasks).await.unwrap();
});
// Spawn event listener
let event_handle = tokio::spawn(async move {
while let Some(event) = event_rx.recv().await {
match event {
TaskMonitorEvent::Completed { .. } => break,
_ => {}
}
}
});
// Execute with control
monitor.execute_all_direct_with_control(Some(event_tx), control_rx).await;
// Wait for background tasks
control_handle.await?;
event_handle.await?;
Source§impl TaskMonitor
impl TaskMonitor
Sourcepub fn new(tasks: TcrmTasks) -> Result<Self, TaskMonitorError>
pub fn new(tasks: TcrmTasks) -> Result<Self, TaskMonitorError>
Creates a new task monitor from a collection of task specifications.
This method performs several important initialization steps:
- Builds dependency maps for both dependencies and dependents
- Validates for circular dependencies
- Applies shell configuration to tasks
- Creates task spawners for each task
- Sets up stdin channels for interactive tasks
§Arguments
tasks
- AHashMap
of task names to task specifications
§Returns
Ok(TaskMonitor)
- Successfully created task monitor with all spawners initializedErr(TaskMonitorError)
- If dependency validation fails or circular dependencies detected
§Errors
TaskMonitorError::CircularDependency
- If circular dependencies are detectedTaskMonitorError::DependencyNotFound
- If a task depends on a non-existent task
§Examples
use std::collections::HashMap;
use tcrm_monitor::monitor::{TaskMonitor, config::TaskSpec};
use tcrm_task::tasks::config::TaskConfig;
let mut tasks = HashMap::new();
tasks.insert(
"test".to_string(),
TaskSpec::new(TaskConfig::new("cargo").args(["test"]))
);
let monitor = TaskMonitor::new(tasks)?;
// Task spawners and stdin channels are automatically set up
§Interactive Tasks
Tasks with stdin enabled automatically get stdin channels:
use std::collections::HashMap;
use tcrm_monitor::monitor::{TaskMonitor, config::TaskSpec};
use tcrm_task::tasks::config::TaskConfig;
let mut tasks = HashMap::new();
tasks.insert(
"interactive".to_string(),
TaskSpec::new(
TaskConfig::new("read").args(["input"])
.enable_stdin(true)
)
);
let monitor = TaskMonitor::new(tasks)?;
// monitor.stdin_senders now contains a channel for "interactive"