Skip to main content

Module task

Module task 

Source
Expand description

Task execution framework for Genja.

This module provides the core task execution infrastructure for Genja, enabling structured task definition, execution, and result tracking across multiple hosts. It defines traits, types, and utilities for building task-based automation workflows with support for nested sub-tasks, rich result metadata, and flexible error handling.

§Overview

The task system is built around several key concepts:

  • Task Definition: Tasks implement the Task trait, which combines metadata (TaskInfo) with execution logic and optional sub-tasks.
  • Task Collections: Tasks stores an ordered list of root TaskDefinition values. Each root task may own its own sub-task tree, so a Tasks value represents a forest of task trees.
  • Task Execution: Tasks execute against hosts and return HostTaskResult indicating success, failure, or skip status.
  • Result Tracking: The TaskResults structure maintains a hierarchical tree of execution results for tasks and their sub-tasks across all hosts.
  • Rich Metadata: Tasks can attach detailed metadata including timing information, warnings, messages, diffs, and custom data to their results.
  • Summaries and Serialization: Task results can be aggregated with TaskResults::host_summary and TaskResults::task_summary, then exported in either human-readable or raw JSON forms.
  • Execution Logging: Task execution emits structured log events for task start, skip, failure, and finish states, including per-host duration information.

§Task Lifecycle

The following diagram illustrates the complete task execution flow:

┌─────────────────────────────────────────────────────────────────────────┐
│                          Task Execution Flow                            │
└─────────────────────────────────────────────────────────────────────────┘

1. Task Definition
   ┌──────────────┐
   │ User defines │
   │ Task struct  │──────┐
   └──────────────┘      │
                         │ implements Task trait
                         ▼
                  ┌─────────────┐
                  │ Task trait  │
                  │ - TaskInfo  │
                  │ - sub_tasks │
                  │ - start()   │
                  └─────────────┘

2. Task Wrapping
   ┌──────────────────┐
   │ TaskDefinition   │
   │ wraps Task impl  │
   └──────────────────┘
          │
          │ provides polymorphic interface
          ▼

3. Task Execution (via Runner Plugin)
   ┌─────────────────────────────────────────────────────────────┐
   │ Runner Plugin (e.g., ThreadedRunner, SerialRunner)          │
   │                                                             │
   │  For each task result tree:                                 │
   │    ┌──────────────────────────────────────────────┐         │
   │    │ 1. Resolve selected PluginProcessor values   │         │
   │    │    - From TaskInfo::processor_names()        │         │
   │    │    - Via TaskProcessorResolver               │         │
   │    └──────────────────────────────────────────────┘         │
   │                      │                                      │
   │                      ▼                                      │
   │    ┌──────────────────────────────────────────────┐         │
   │    │ 2. PluginProcessor hook: on_task_start       │         │
   │    │    - Runs before host results are collected  │         │
   │    │    - Can initialize or modify TaskResults    │         │
   │    └──────────────────────────────────────────────┘         │
   │                      │                                      │
   │                      ▼                                      │
   │    ┌──────────────────────────────────────────────┐         │
   │    │ 3. For each host in inventory                │         │
   │    │                                              │         │
   │    │    PluginProcessor hook:                     │         │
   │    │      on_instance_start                       │         │
   │    │      - Receives task, parent, depth, host    │         │
   │    │                                              │         │
   │    │    Execute task.start(...)                   │         │
   │    │      - Record start timestamp                │         │
   │    │      - Optionally open task-scoped           │         │
   │    │        connection via TaskConnectionResolver │         │
   │    │      - Call task implementation              │         │
   │    │      - Record finish timestamp               │         │
   │    │                                              │         │
   │    │    Capture Result                            │         │
   │    │      - Success, failure, or skip             │         │
   │    │                                              │         │
   │    │    PluginProcessor hook:                     │         │
   │    │      on_instance_finish                      │         │
   │    │      - Can inspect or mutate HostTaskResult  │         │
   │    │                                              │         │
   │    │    Process sub-tasks, if any                 │         │
   │    │      - Each sub-task selects own processors  │         │
   │    │      - Recursively execute sub_tasks()       │         │
   │    └──────────────────────────────────────────────┘         │
   │                      │                                      │
   │                      ▼                                      │
   │    ┌──────────────────────────────────────────────┐         │
   │    │ 4. PluginProcessor hook: on_task_finish      │         │
   │    │    - Runs after host results are collected   │         │
   │    │    - Can finalize or modify TaskResults      │         │
   │    └──────────────────────────────────────────────┘         │
   └─────────────────────────────────────────────────────────────┘

4. Result Collection
   ┌──────────────────────────────────────────────────────────┐
   │ TaskResults (per task)                                   │
   │                                                          │
   │  ┌─────────────────────────────────────────┐             │
   │  │ Host Results (Map<hostname, result>)    │             │
   │  │  - router1 → HostTaskResult::Passed     │             │
   │  │  - router2 → HostTaskResult::Failed     │             │
   │  │  - router3 → HostTaskResult::Skipped    │             │
   │  └─────────────────────────────────────────┘             │
   │                                                          │
   │  ┌───────────────────────────────────────────┐           │
   │  │ Sub-task Results (Map<name, TaskResults>) │           │
   │  │  - "validate" → TaskResults               │           │
   │  │  - "deploy" → TaskResults                 │           │
   │  └───────────────────────────────────────────┘           │
   │                                                          │
   │  ┌─────────────────────────────────────────┐             │
   │  │ Execution Timing                        │             │
   │  │  - started_at: SystemTime               │             │
   │  │  - finished_at: SystemTime              │             │
   │  │  - duration: calculated from timestamps │             │
   │  └─────────────────────────────────────────┘             │
   └──────────────────────────────────────────────────────────┘

5. Result Aggregation
   ┌────────────────────────────────────────────────────────┐
   │ Final TaskResults tree                                 │
   │                                                        │
   │  root_task                                             │
   │  ├── host_results: {router1, router2, router3}         │
   │  ├── timing: {started_at, finished_at, duration}       │
   │  └── sub_tasks:                                        │
   │      ├── validate                                      │
   │      │   ├── host_results: {...}                       │
   │      │   └── timing: {...}                             │
   │      └── deploy                                        │
   │          ├── host_results: {...}                       │
   │          ├── timing: {...}                             │
   │          └── sub_tasks:                                │
   │              └── verify                                │
   │                  ├── host_results: {...}               │
   │                  └── timing: {...}                     │
   └────────────────────────────────────────────────────────┘

§Key Lifecycle Stages

  1. Definition: A task struct implements the Task trait, providing metadata (name, plugin), execution logic (start()), and optional sub-tasks.

  2. Wrapping: The task is wrapped in a TaskDefinition for polymorphic handling, allowing heterogeneous collections of tasks to be stored and executed uniformly.

  3. Execution: A runner plugin (e.g., ThreadedRunner, SerialRunner) orchestrates task execution across selected hosts:

    • Optionally resolves a task-scoped connection via TaskConnectionResolver
    • Calls Task::start for each host with a TaskRuntimeContext
    • Records timing information (start, finish, duration)
    • Captures results (success, failure, or skip)
    • Recursively processes sub-tasks up to max_depth
  4. Result Collection: Each host produces a HostTaskResult that is stored in the task’s TaskResults structure. Sub-tasks create nested TaskResults nodes, forming a tree structure that mirrors the task hierarchy.

  5. Aggregation: Results are collected into a TaskResults tree that provides:

    • Per-host outcomes for each task
    • Execution timing at each level
    • Nested sub-task results
    • Summary statistics and status

§Core Traits

§Task

The primary trait that all tasks must implement. It combines TaskInfo for metadata with execution methods and optional hierarchical sub-task structures.

In the common workflow, the genja_task attribute macro generates both TaskInfo and Task for you from an inherent impl block. If you call generated metadata methods such as name() or connection_plugin_name() directly, import TaskInfo so those trait methods are in scope.

use genja_core::inventory::Host;
use genja_core::task::{
    HostTaskResult, Task, TaskInfo, TaskRuntimeContext, TaskSuccess,
};
use genja_core::genja_task;

#[derive(Default)]
struct DeployTask {
    options: Option<serde_json::Value>,
    config_file: String,
}

#[genja_task(name = "deploy", connection_plugin_name = "ssh")]
impl DeployTask {
    async fn start_async(
        &self,
        _host: &Host,
        _context: &TaskRuntimeContext,
    ) -> Result<HostTaskResult, genja_core::task::TaskError> {
        Ok(HostTaskResult::passed(
            TaskSuccess::new()
                .with_changed(true)
                .with_summary("Configuration deployed successfully")
        ))
    }
}

let task = DeployTask {
    options: None,
    config_file: "router.conf".to_string()
};

assert_eq!(task.name(), "deploy");
assert_eq!(task.connection_plugin_name(), Some("ssh"));

§TaskInfo

Provides metadata about a task including its name, associated plugin, connection requirements, and optional configuration. This trait is typically auto-implemented by the genja_task attribute macro. Static metadata comes from macro arguments such as name, connection_plugin_name, and processors, while dynamic metadata can be provided through helper methods such as options().

§Task Processors

TaskProcessor provides lifecycle hooks for processing task results without changing the task’s execution implementation. A task selects processors by name, and the runtime resolves those names through a TaskProcessorResolver. In the full Genja runtime, the plugin manager implements the resolver, so invalid processor names fail with GenjaError::PluginNotFound.

Processors can be selected in three ways:

use genja_core::inventory::Host;
use genja_core::task::{
    HostTaskResult, Task, TaskDefinition, TaskRuntimeContext, TaskSuccess,
};
use genja_core::genja_task;

#[derive(Default)]
struct AttributeTask;

#[derive(Default)]
struct FieldTask {
    processor_names: Vec<String>,
}

#[genja_task(name = "attribute", processors = ["audit"])]
impl AttributeTask {
    async fn start_async(
        &self,
        _host: &Host,
        _context: &TaskRuntimeContext,
    ) -> Result<HostTaskResult, genja_core::task::TaskError> {
        Ok(HostTaskResult::passed(TaskSuccess::new()))
    }
}

#[genja_task(name = "field")]
impl FieldTask {
    async fn start_async(
        &self,
        _host: &Host,
        _context: &TaskRuntimeContext,
    ) -> Result<HostTaskResult, genja_core::task::TaskError> {
        Ok(HostTaskResult::passed(TaskSuccess::new()))
    }

    fn processor_names(&self) -> Vec<&str> {
        self.processor_names.iter().map(String::as_str).collect()
    }
}

let _attribute_task = AttributeTask;
let _field_task = FieldTask {
    processor_names: vec!["audit".to_string()],
};

let _root_override = TaskDefinition::new(AttributeTask)
    .with_processor("metrics");

Processor selection is per task. Sub-tasks do not inherit their parent’s processor list unless the sub-task itself returns the same processor names.

§Sub Tasks

Tasks can define sub-tasks that execute after the parent task completes. Sub-tasks receive their own runtime context and execution depth when they run. Implement fn sub_tasks(&self) -> Vec<Arc<dyn Task>> inside a #[genja_task(...)] impl block when a task needs child tasks.

§Behavioral Rules

The execution model is intentionally simple and deterministic:

  • The parent task’s execution method runs before any of its sub-tasks.
  • The parent task’s HostTaskResult is inserted into TaskResults before sub-task execution begins.
  • Sub-tasks run in the order returned by Task::sub_tasks().
  • Each host is executed independently. When running a single task, the full task tree is executed once per selected host.
  • Tasks preserves insertion order for root tasks. Runners execute each root task definition in that order unless a runner explicitly documents a different scheduling policy, and each returned TaskResults entry corresponds to the root task at the same position.
  • Sub-task results are grouped by sub-task name. The TaskResults node for a given sub-task contains per-host results accumulated across all hosts.
  • The framework does not automatically skip sub-tasks when a parent fails or is skipped. If you want that behavior, return an explicit HostTaskResult::Skipped from the sub-task or encode the condition in the task itself.
  • Processor hooks run only for tasks that selected processor names. The root task can also be given processor names through TaskDefinition::with_processor or TaskDefinition::with_processors.
  • For a task result tree, processor order is: on_task_start, then for each selected host on_instance_start, Task::start, on_instance_finish, then any sub-task trees, then on_task_finish.
  • If processor name resolution fails, execution returns GenjaError::PluginNotFound. If a processor hook returns an error, execution stops and propagates that error.
  • max_depth is checked using depth > max_depth. This means max_depth = 0 still allows the root task at depth 0, but rejects all sub-tasks at depth 1.
  • Exceeding max_depth records an internal HostTaskResult::Failed for the host at that task node instead of returning an outer execution error.
  • If Task::start returns TaskError, the framework captures it as a TaskFailure with timing metadata and stores it in the host results tree.

§Task Results

§HostTaskResult

Represents the outcome of executing a task on a single host. It can be:

  • Passed: Task completed successfully with optional metadata in TaskSuccess
  • Failed: Task encountered an error with details in TaskFailure
  • Skipped: Task was not executed with reason in TaskSkip
use genja_core::task::{HostTaskResult, TaskSuccess, TaskFailure, TaskFailureKind};
use serde_json::json;

// Success with metadata
let success = HostTaskResult::passed(
    TaskSuccess::new()
        .with_result(json!({"status": "deployed"}))
        .with_changed(true)
        .with_diff("+ new_config_line")
);

// Failure with classification
let failure = HostTaskResult::failed(
    TaskFailure::new(std::io::Error::new(
        std::io::ErrorKind::ConnectionRefused,
        "connection refused"
    ))
    .with_kind(TaskFailureKind::Connection)
    .with_retryable(true)
);

// Skipped with reason
let skipped = HostTaskResult::skipped_with_reason("parent_failed");

§TaskResults

A hierarchical structure that stores execution results for a task and all its sub-tasks across multiple hosts. It provides methods for querying results, tracking success/failure counts, computing summaries, serializing output, and navigating the task tree.

use genja_core::task::{TaskResults, HostTaskResult, TaskSuccess};

let mut results = TaskResults::new("deploy")
    .with_summary("Deployment completed");

results.insert_host_result(
    "router1",
    HostTaskResult::passed(TaskSuccess::new().with_changed(true))
);

results.insert_host_result(
    "router2",
    HostTaskResult::skipped_with_reason("maintenance_mode")
);

// Query results
assert_eq!(results.passed_hosts().len(), 1);
assert!(results.host_result("router1").unwrap().is_passed());
assert!(results.host_result("router2").unwrap().is_skipped());

In addition to raw host and sub-task access, TaskResults supports:

§Task Metadata

§TaskSuccess

Contains rich metadata about successful task execution:

  • result: Structured data returned by the task (JSON)
  • changed: Whether the task modified the target system
  • diff: Text representation of changes made
  • summary: Human-readable summary of execution
  • warnings: Non-fatal issues encountered
  • messages: Structured log messages with levels and codes
  • metadata: Additional custom data
  • timing: Start time, finish time, and duration

§TaskFailure

Contains detailed error information for failed tasks:

  • error: The underlying error that caused the failure
  • kind: Classification of the failure (TaskFailureKind)
  • retryable: Whether the operation can be retried
  • details: Additional context about the failure (JSON)
  • warnings: Non-fatal issues that preceded the failure
  • messages: Structured log messages

§TaskSkip

Contains information about why a task was skipped:

  • reason: Machine-readable skip reason
  • message: Human-readable explanation

§Failure Classification

The TaskFailureKind enum categorizes failures to enable appropriate error handling and retry logic:

  • Connection: Network or connectivity issues (often retryable)
  • Authentication: Credential or permission problems
  • Validation: Invalid input or configuration
  • Timeout: Operation exceeded time limit (often retryable)
  • Command: Remote command execution failed
  • Unsupported: Operation not supported by target
  • Internal: Genja/framework implementation error
  • External: Error returned from a task, plugin, or external dependency

§Message System

Tasks can emit structured messages during execution using TaskMessage:

use genja_core::task::{TaskMessage, MessageLevel};
use std::time::SystemTime;

let message = TaskMessage::new(MessageLevel::Warning, "High latency detected")
    .with_code("latency_warn")
    .with_timestamp(SystemTime::now());

Message levels include:

  • Info: Informational messages
  • Warning: Non-fatal issues
  • Error: Error details
  • Debug: Debugging information

The task framework itself also emits log records through the log crate during execution, including task start, skip, failure, and finish events with host and duration context.

§Task Execution

§TaskDefinition

A wrapper around task implementations that provides execution control and enforces the task execution flow. It handles recursive sub-task execution with depth limiting to prevent infinite recursion.

use genja_core::inventory::{BaseBuilderHost, Host};
use genja_core::genja_task;
use genja_core::task::{
    HostTaskResult, Task, TaskDefinition, TaskResults, TaskRuntimeContext, TaskSuccess,
};
use tokio::runtime::Builder;

#[derive(Default)]
struct DeployTask;

#[genja_task(name = "deploy", connection_plugin_name = "ssh")]
impl DeployTask {
    async fn start_async(
        &self,
        _host: &Host,
        _context: &TaskRuntimeContext,
    ) -> Result<HostTaskResult, genja_core::task::TaskError> {
        Ok(HostTaskResult::passed(
            TaskSuccess::new().with_summary("deploy complete"),
        ))
    }
}

let task = TaskDefinition::new(DeployTask);
let host = Host::builder().hostname("router1").build();
let mut results = TaskResults::new("deploy");

let runtime = Builder::new_current_thread().enable_all().build().unwrap();
runtime
    .block_on(task.start("router1", &host, &mut results, 1))
    .expect("task execution should succeed");

assert!(results.host_result("router1").unwrap().is_passed());

§Tasks

A collection type for managing an ordered list of root task definitions. Each root task may have its own nested sub-task tree. Runtime runners execute the list as a forest of task trees and return results in the same root order.

use genja_core::inventory::Host;
use genja_core::genja_task;
use genja_core::task::{HostTaskResult, Task, TaskRuntimeContext, TaskSuccess, Tasks};

struct WorkflowTask {
    name: &'static str,
}

#[genja_task(name = "workflow")]
impl WorkflowTask {
    async fn start_async(
        &self,
        _host: &Host,
        _context: &TaskRuntimeContext,
    ) -> Result<HostTaskResult, genja_core::task::TaskError> {
        Ok(HostTaskResult::passed(TaskSuccess::new()))
    }

    fn options(&self) -> Option<&serde_json::Value> {
        None
    }
}

let mut tasks = Tasks::new();
tasks.add_task(WorkflowTask { name: "collect_facts" });
tasks.add_task(WorkflowTask { name: "deploy_changes" });
tasks.add_task(WorkflowTask { name: "verify_health" });

assert_eq!(tasks.len(), 3);

§Advanced Usage

§Hierarchical Task Execution

Tasks can define sub-tasks that execute after the parent task completes. This

Structs§

BlockingTaskConnection
BlockingTaskRuntimeContext
TaskDefinition
A wrapper around a task implementation that enforces the task trait flow.
TaskError
Represents an error that occurred during task execution.
TaskExecutionContext
Execution context passed into task implementations.
TaskFailure
Represents a failed task execution with comprehensive error information and context.
TaskHostSummary
Aggregate host outcome counts for a task result node.
TaskMessage
Represents a structured message generated during task execution.
TaskProcessorContext
Execution context passed to task processors.
TaskResults
Results of a task execution, including timing, host outcomes, and nested sub-task results.
TaskResultsSummary
Recursive summary of a task result tree.
TaskRuntimeContext
Runtime context passed into task implementations.
TaskSkip
Represents information about a skipped task execution.
TaskSuccess
Represents the successful execution of a task on a host.
Tasks
A collection of task definitions that can be executed together.

Enums§

HostTaskResult
Represents the execution outcome of a task on a single host.
MessageLevel
Represents the severity level of a task message.
TaskExecutionMode
Sub-task provider interface.
TaskFailureKind
Categorizes the type of failure that occurred during task execution.

Traits§

Task
Core task interface required for execution.
TaskConnectionResolver
Opens or verifies task-scoped connections before execution.
TaskInfo
Task metadata required for execution.
TaskProcessor
Processes task results during aggregate and per-host task lifecycles.
TaskProcessorResolver
Resolves named task processors at execution time.