Expand description
Task execution framework for Genja.
This module provides the core task execution infrastructure for Genja, enabling structured task definition, execution, and result tracking across multiple hosts. It defines traits, types, and utilities for building task-based automation workflows with support for nested sub-tasks, rich result metadata, and flexible error handling.
§Overview
The task system is built around several key concepts:
- Task Definition: Tasks implement the
Tasktrait, which combines metadata (TaskInfo) with execution logic and optional sub-tasks. - Task Collections:
Tasksstores an ordered list of rootTaskDefinitionvalues. Each root task may own its own sub-task tree, so aTasksvalue represents a forest of task trees. - Task Execution: Tasks execute against hosts and return
HostTaskResultindicating success, failure, or skip status. - Result Tracking: The
TaskResultsstructure maintains a hierarchical tree of execution results for tasks and their sub-tasks across all hosts. - Rich Metadata: Tasks can attach detailed metadata including timing information, warnings, messages, diffs, and custom data to their results.
- Summaries and Serialization: Task results can be aggregated with
TaskResults::host_summaryandTaskResults::task_summary, then exported in either human-readable or raw JSON forms. - Execution Logging: Task execution emits structured log events for task start, skip, failure, and finish states, including per-host duration information.
§Task Lifecycle
The following diagram illustrates the complete task execution flow:
┌─────────────────────────────────────────────────────────────────────────┐
│ Task Execution Flow │
└─────────────────────────────────────────────────────────────────────────┘
1. Task Definition
┌──────────────┐
│ User defines │
│ Task struct │──────┐
└──────────────┘ │
│ implements Task trait
▼
┌─────────────┐
│ Task trait │
│ - TaskInfo │
│ - sub_tasks │
│ - start() │
└─────────────┘
2. Task Wrapping
┌──────────────────┐
│ TaskDefinition │
│ wraps Task impl │
└──────────────────┘
│
│ provides polymorphic interface
▼
3. Task Execution (via Runner Plugin)
┌─────────────────────────────────────────────────────────────┐
│ Runner Plugin (e.g., ThreadedRunner, SerialRunner) │
│ │
│ For each task result tree: │
│ ┌──────────────────────────────────────────────┐ │
│ │ 1. Resolve selected PluginProcessor values │ │
│ │ - From TaskInfo::processor_names() │ │
│ │ - Via TaskProcessorResolver │ │
│ └──────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────┐ │
│ │ 2. PluginProcessor hook: on_task_start │ │
│ │ - Runs before host results are collected │ │
│ │ - Can initialize or modify TaskResults │ │
│ └──────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────┐ │
│ │ 3. For each host in inventory │ │
│ │ │ │
│ │ PluginProcessor hook: │ │
│ │ on_instance_start │ │
│ │ - Receives task, parent, depth, host │ │
│ │ │ │
│ │ Execute task.start(...) │ │
│ │ - Record start timestamp │ │
│ │ - Optionally open task-scoped │ │
│ │ connection via TaskConnectionResolver │ │
│ │ - Call task implementation │ │
│ │ - Record finish timestamp │ │
│ │ │ │
│ │ Capture Result │ │
│ │ - Success, failure, or skip │ │
│ │ │ │
│ │ PluginProcessor hook: │ │
│ │ on_instance_finish │ │
│ │ - Can inspect or mutate HostTaskResult │ │
│ │ │ │
│ │ Process sub-tasks, if any │ │
│ │ - Each sub-task selects own processors │ │
│ │ - Recursively execute sub_tasks() │ │
│ └──────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────┐ │
│ │ 4. PluginProcessor hook: on_task_finish │ │
│ │ - Runs after host results are collected │ │
│ │ - Can finalize or modify TaskResults │ │
│ └──────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
4. Result Collection
┌──────────────────────────────────────────────────────────┐
│ TaskResults (per task) │
│ │
│ ┌─────────────────────────────────────────┐ │
│ │ Host Results (Map<hostname, result>) │ │
│ │ - router1 → HostTaskResult::Passed │ │
│ │ - router2 → HostTaskResult::Failed │ │
│ │ - router3 → HostTaskResult::Skipped │ │
│ └─────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────┐ │
│ │ Sub-task Results (Map<name, TaskResults>) │ │
│ │ - "validate" → TaskResults │ │
│ │ - "deploy" → TaskResults │ │
│ └───────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────┐ │
│ │ Execution Timing │ │
│ │ - started_at: SystemTime │ │
│ │ - finished_at: SystemTime │ │
│ │ - duration: calculated from timestamps │ │
│ └─────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────┘
5. Result Aggregation
┌────────────────────────────────────────────────────────┐
│ Final TaskResults tree │
│ │
│ root_task │
│ ├── host_results: {router1, router2, router3} │
│ ├── timing: {started_at, finished_at, duration} │
│ └── sub_tasks: │
│ ├── validate │
│ │ ├── host_results: {...} │
│ │ └── timing: {...} │
│ └── deploy │
│ ├── host_results: {...} │
│ ├── timing: {...} │
│ └── sub_tasks: │
│ └── verify │
│ ├── host_results: {...} │
│ └── timing: {...} │
└────────────────────────────────────────────────────────┘§Key Lifecycle Stages
-
Definition: A task struct implements the
Tasktrait, providing metadata (name, plugin), execution logic (start()), and optional sub-tasks. -
Wrapping: The task is wrapped in a
TaskDefinitionfor polymorphic handling, allowing heterogeneous collections of tasks to be stored and executed uniformly. -
Execution: A runner plugin (e.g.,
ThreadedRunner,SerialRunner) orchestrates task execution across selected hosts:- Optionally resolves a task-scoped connection via
TaskConnectionResolver - Calls
Task::startfor each host with aTaskRuntimeContext - Records timing information (start, finish, duration)
- Captures results (success, failure, or skip)
- Recursively processes sub-tasks up to
max_depth
- Optionally resolves a task-scoped connection via
-
Result Collection: Each host produces a
HostTaskResultthat is stored in the task’sTaskResultsstructure. Sub-tasks create nestedTaskResultsnodes, forming a tree structure that mirrors the task hierarchy. -
Aggregation: Results are collected into a
TaskResultstree that provides:- Per-host outcomes for each task
- Execution timing at each level
- Nested sub-task results
- Summary statistics and status
§Core Traits
§Task
The primary trait that all tasks must implement. It combines TaskInfo for
metadata with execution methods and optional hierarchical sub-task structures.
In the common workflow, the genja_task attribute macro
generates both TaskInfo and Task for you from an inherent impl block.
If you call generated metadata methods such as name() or
connection_plugin_name() directly, import TaskInfo so those trait methods
are in scope.
use genja_core::inventory::Host;
use genja_core::task::{
HostTaskResult, Task, TaskInfo, TaskRuntimeContext, TaskSuccess,
};
use genja_core::genja_task;
#[derive(Default)]
struct DeployTask {
options: Option<serde_json::Value>,
config_file: String,
}
#[genja_task(name = "deploy", connection_plugin_name = "ssh")]
impl DeployTask {
async fn start_async(
&self,
_host: &Host,
_context: &TaskRuntimeContext,
) -> Result<HostTaskResult, genja_core::task::TaskError> {
Ok(HostTaskResult::passed(
TaskSuccess::new()
.with_changed(true)
.with_summary("Configuration deployed successfully")
))
}
}
let task = DeployTask {
options: None,
config_file: "router.conf".to_string()
};
assert_eq!(task.name(), "deploy");
assert_eq!(task.connection_plugin_name(), Some("ssh"));§TaskInfo
Provides metadata about a task including its name, associated plugin, connection
requirements, and optional configuration. This trait is typically
auto-implemented by the genja_task attribute macro.
Static metadata comes from macro arguments such as name,
connection_plugin_name, and processors, while dynamic metadata can be
provided through helper methods such as options().
§Task Processors
TaskProcessor provides lifecycle hooks for processing task results without
changing the task’s execution implementation. A task selects processors by name,
and the runtime resolves those names through a TaskProcessorResolver. In the
full Genja runtime, the plugin manager implements the resolver, so invalid
processor names fail with GenjaError::PluginNotFound.
Processors can be selected in three ways:
use genja_core::inventory::Host;
use genja_core::task::{
HostTaskResult, Task, TaskDefinition, TaskRuntimeContext, TaskSuccess,
};
use genja_core::genja_task;
#[derive(Default)]
struct AttributeTask;
#[derive(Default)]
struct FieldTask {
processor_names: Vec<String>,
}
#[genja_task(name = "attribute", processors = ["audit"])]
impl AttributeTask {
async fn start_async(
&self,
_host: &Host,
_context: &TaskRuntimeContext,
) -> Result<HostTaskResult, genja_core::task::TaskError> {
Ok(HostTaskResult::passed(TaskSuccess::new()))
}
}
#[genja_task(name = "field")]
impl FieldTask {
async fn start_async(
&self,
_host: &Host,
_context: &TaskRuntimeContext,
) -> Result<HostTaskResult, genja_core::task::TaskError> {
Ok(HostTaskResult::passed(TaskSuccess::new()))
}
fn processor_names(&self) -> Vec<&str> {
self.processor_names.iter().map(String::as_str).collect()
}
}
let _attribute_task = AttributeTask;
let _field_task = FieldTask {
processor_names: vec!["audit".to_string()],
};
let _root_override = TaskDefinition::new(AttributeTask)
.with_processor("metrics");Processor selection is per task. Sub-tasks do not inherit their parent’s processor list unless the sub-task itself returns the same processor names.
§Sub Tasks
Tasks can define sub-tasks that execute after the parent task completes.
Sub-tasks receive their own runtime context and execution depth when they
run. Implement fn sub_tasks(&self) -> Vec<Arc<dyn Task>> inside a
#[genja_task(...)] impl block when a task needs child tasks.
§Behavioral Rules
The execution model is intentionally simple and deterministic:
- The parent task’s execution method runs before any of its sub-tasks.
- The parent task’s
HostTaskResultis inserted intoTaskResultsbefore sub-task execution begins. - Sub-tasks run in the order returned by
Task::sub_tasks(). - Each host is executed independently. When running a single task, the full task tree is executed once per selected host.
Taskspreserves insertion order for root tasks. Runners execute each root task definition in that order unless a runner explicitly documents a different scheduling policy, and each returnedTaskResultsentry corresponds to the root task at the same position.- Sub-task results are grouped by sub-task name. The
TaskResultsnode for a given sub-task contains per-host results accumulated across all hosts. - The framework does not automatically skip sub-tasks when a parent fails or is
skipped. If you want that behavior, return an explicit
HostTaskResult::Skippedfrom the sub-task or encode the condition in the task itself. - Processor hooks run only for tasks that selected processor names. The root
task can also be given processor names through
TaskDefinition::with_processororTaskDefinition::with_processors. - For a task result tree, processor order is:
on_task_start, then for each selected hoston_instance_start,Task::start,on_instance_finish, then any sub-task trees, thenon_task_finish. - If processor name resolution fails, execution returns
GenjaError::PluginNotFound. If a processor hook returns an error, execution stops and propagates that error. max_depthis checked usingdepth > max_depth. This meansmax_depth = 0still allows the root task at depth0, but rejects all sub-tasks at depth1.- Exceeding
max_depthrecords an internalHostTaskResult::Failedfor the host at that task node instead of returning an outer execution error. - If
Task::startreturnsTaskError, the framework captures it as aTaskFailurewith timing metadata and stores it in the host results tree.
§Task Results
§HostTaskResult
Represents the outcome of executing a task on a single host. It can be:
- Passed: Task completed successfully with optional metadata in
TaskSuccess - Failed: Task encountered an error with details in
TaskFailure - Skipped: Task was not executed with reason in
TaskSkip
use genja_core::task::{HostTaskResult, TaskSuccess, TaskFailure, TaskFailureKind};
use serde_json::json;
// Success with metadata
let success = HostTaskResult::passed(
TaskSuccess::new()
.with_result(json!({"status": "deployed"}))
.with_changed(true)
.with_diff("+ new_config_line")
);
// Failure with classification
let failure = HostTaskResult::failed(
TaskFailure::new(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
"connection refused"
))
.with_kind(TaskFailureKind::Connection)
.with_retryable(true)
);
// Skipped with reason
let skipped = HostTaskResult::skipped_with_reason("parent_failed");§TaskResults
A hierarchical structure that stores execution results for a task and all its sub-tasks across multiple hosts. It provides methods for querying results, tracking success/failure counts, computing summaries, serializing output, and navigating the task tree.
use genja_core::task::{TaskResults, HostTaskResult, TaskSuccess};
let mut results = TaskResults::new("deploy")
.with_summary("Deployment completed");
results.insert_host_result(
"router1",
HostTaskResult::passed(TaskSuccess::new().with_changed(true))
);
results.insert_host_result(
"router2",
HostTaskResult::skipped_with_reason("maintenance_mode")
);
// Query results
assert_eq!(results.passed_hosts().len(), 1);
assert!(results.host_result("router1").unwrap().is_passed());
assert!(results.host_result("router2").unwrap().is_skipped());In addition to raw host and sub-task access, TaskResults supports:
- aggregate host counts via
TaskResults::host_summary - recursive task-tree summaries via
TaskResults::task_summary - human-readable JSON via
TaskResults::to_json_stringandTaskResults::to_pretty_json_string - raw serde JSON via
TaskResults::to_raw_json_stringandTaskResults::to_raw_pretty_json_string
§Task Metadata
§TaskSuccess
Contains rich metadata about successful task execution:
- result: Structured data returned by the task (JSON)
- changed: Whether the task modified the target system
- diff: Text representation of changes made
- summary: Human-readable summary of execution
- warnings: Non-fatal issues encountered
- messages: Structured log messages with levels and codes
- metadata: Additional custom data
- timing: Start time, finish time, and duration
§TaskFailure
Contains detailed error information for failed tasks:
- error: The underlying error that caused the failure
- kind: Classification of the failure (
TaskFailureKind) - retryable: Whether the operation can be retried
- details: Additional context about the failure (JSON)
- warnings: Non-fatal issues that preceded the failure
- messages: Structured log messages
§TaskSkip
Contains information about why a task was skipped:
- reason: Machine-readable skip reason
- message: Human-readable explanation
§Failure Classification
The TaskFailureKind enum categorizes failures to enable appropriate error
handling and retry logic:
- Connection: Network or connectivity issues (often retryable)
- Authentication: Credential or permission problems
- Validation: Invalid input or configuration
- Timeout: Operation exceeded time limit (often retryable)
- Command: Remote command execution failed
- Unsupported: Operation not supported by target
- Internal: Genja/framework implementation error
- External: Error returned from a task, plugin, or external dependency
§Message System
Tasks can emit structured messages during execution using TaskMessage:
use genja_core::task::{TaskMessage, MessageLevel};
use std::time::SystemTime;
let message = TaskMessage::new(MessageLevel::Warning, "High latency detected")
.with_code("latency_warn")
.with_timestamp(SystemTime::now());Message levels include:
- Info: Informational messages
- Warning: Non-fatal issues
- Error: Error details
- Debug: Debugging information
The task framework itself also emits log records through the log crate during
execution, including task start, skip, failure, and finish events with host and
duration context.
§Task Execution
§TaskDefinition
A wrapper around task implementations that provides execution control and enforces the task execution flow. It handles recursive sub-task execution with depth limiting to prevent infinite recursion.
use genja_core::inventory::{BaseBuilderHost, Host};
use genja_core::genja_task;
use genja_core::task::{
HostTaskResult, Task, TaskDefinition, TaskResults, TaskRuntimeContext, TaskSuccess,
};
use tokio::runtime::Builder;
#[derive(Default)]
struct DeployTask;
#[genja_task(name = "deploy", connection_plugin_name = "ssh")]
impl DeployTask {
async fn start_async(
&self,
_host: &Host,
_context: &TaskRuntimeContext,
) -> Result<HostTaskResult, genja_core::task::TaskError> {
Ok(HostTaskResult::passed(
TaskSuccess::new().with_summary("deploy complete"),
))
}
}
let task = TaskDefinition::new(DeployTask);
let host = Host::builder().hostname("router1").build();
let mut results = TaskResults::new("deploy");
let runtime = Builder::new_current_thread().enable_all().build().unwrap();
runtime
.block_on(task.start("router1", &host, &mut results, 1))
.expect("task execution should succeed");
assert!(results.host_result("router1").unwrap().is_passed());§Tasks
A collection type for managing an ordered list of root task definitions. Each root task may have its own nested sub-task tree. Runtime runners execute the list as a forest of task trees and return results in the same root order.
use genja_core::inventory::Host;
use genja_core::genja_task;
use genja_core::task::{HostTaskResult, Task, TaskRuntimeContext, TaskSuccess, Tasks};
struct WorkflowTask {
name: &'static str,
}
#[genja_task(name = "workflow")]
impl WorkflowTask {
async fn start_async(
&self,
_host: &Host,
_context: &TaskRuntimeContext,
) -> Result<HostTaskResult, genja_core::task::TaskError> {
Ok(HostTaskResult::passed(TaskSuccess::new()))
}
fn options(&self) -> Option<&serde_json::Value> {
None
}
}
let mut tasks = Tasks::new();
tasks.add_task(WorkflowTask { name: "collect_facts" });
tasks.add_task(WorkflowTask { name: "deploy_changes" });
tasks.add_task(WorkflowTask { name: "verify_health" });
assert_eq!(tasks.len(), 3);§Advanced Usage
§Hierarchical Task Execution
Tasks can define sub-tasks that execute after the parent task completes. This
Structs§
- Blocking
Task Connection - Blocking
Task Runtime Context - Task
Definition - A wrapper around a task implementation that enforces the task trait flow.
- Task
Error - Represents an error that occurred during task execution.
- Task
Execution Context - Execution context passed into task implementations.
- Task
Failure - Represents a failed task execution with comprehensive error information and context.
- Task
Host Summary - Aggregate host outcome counts for a task result node.
- Task
Message - Represents a structured message generated during task execution.
- Task
Processor Context - Execution context passed to task processors.
- Task
Results - Results of a task execution, including timing, host outcomes, and nested sub-task results.
- Task
Results Summary - Recursive summary of a task result tree.
- Task
Runtime Context - Runtime context passed into task implementations.
- Task
Skip - Represents information about a skipped task execution.
- Task
Success - Represents the successful execution of a task on a host.
- Tasks
- A collection of task definitions that can be executed together.
Enums§
- Host
Task Result - Represents the execution outcome of a task on a single host.
- Message
Level - Represents the severity level of a task message.
- Task
Execution Mode - Sub-task provider interface.
- Task
Failure Kind - Categorizes the type of failure that occurred during task execution.
Traits§
- Task
- Core task interface required for execution.
- Task
Connection Resolver - Opens or verifies task-scoped connections before execution.
- Task
Info - Task metadata required for execution.
- Task
Processor - Processes task results during aggregate and per-host task lifecycles.
- Task
Processor Resolver - Resolves named task processors at execution time.