genja_core/task.rs
1//! Task execution framework for Genja.
2//!
3//! This module provides the core task execution infrastructure for Genja, enabling
4//! structured task definition, execution, and result tracking across multiple hosts.
5//! It defines traits, types, and utilities for building task-based automation workflows
6//! with support for nested sub-tasks, rich result metadata, and flexible error handling.
7//!
8//! # Overview
9//!
10//! The task system is built around several key concepts:
11//!
12//! - **Task Definition**: Tasks implement the [`Task`] trait, which combines metadata
13//! ([`TaskInfo`]) with execution logic and optional sub-tasks.
14//! - **Task Collections**: [`Tasks`] stores an ordered list of root
15//! [`TaskDefinition`] values. Each root task may own its own sub-task tree, so a
16//! `Tasks` value represents a forest of task trees.
17//! - **Task Execution**: Tasks execute against hosts and return [`HostTaskResult`]
18//! indicating success, failure, or skip status.
19//! - **Result Tracking**: The [`TaskResults`] structure maintains a hierarchical tree
20//! of execution results for tasks and their sub-tasks across all hosts.
21//! - **Rich Metadata**: Tasks can attach detailed metadata including timing information,
22//! warnings, messages, diffs, and custom data to their results.
23//! - **Summaries and Serialization**: Task results can be aggregated with
24//! [`TaskResults::host_summary`] and [`TaskResults::task_summary`], then exported in
25//! either human-readable or raw JSON forms.
26//! - **Execution Logging**: Task execution emits structured log events for task start,
27//! skip, failure, and finish states, including per-host duration information.
28//!
29//! # Task Lifecycle
30//! The following diagram illustrates the complete task execution flow:
31//!
32//! ```text
33//! ┌─────────────────────────────────────────────────────────────────────────┐
34//! │ Task Execution Flow │
35//! └─────────────────────────────────────────────────────────────────────────┘
36//!
37//! 1. Task Definition
38//! ┌──────────────┐
39//! │ User defines │
40//! │ Task struct │──────┐
41//! └──────────────┘ │
42//! │ implements Task trait
43//! ▼
44//! ┌─────────────┐
45//! │ Task trait │
46//! │ - TaskInfo │
47//! │ - sub_tasks │
48//! │ - start() │
49//! └─────────────┘
50//!
51//! 2. Task Wrapping
52//! ┌──────────────────┐
53//! │ TaskDefinition │
54//! │ wraps Task impl │
55//! └──────────────────┘
56//! │
57//! │ provides polymorphic interface
58//! ▼
59//!
60//! 3. Task Execution (via Runner Plugin)
61//! ┌─────────────────────────────────────────────────────────────┐
62//! │ Runner Plugin (e.g., ThreadedRunner, SerialRunner) │
63//! │ │
64//! │ For each task result tree: │
65//! │ ┌──────────────────────────────────────────────┐ │
66//! │ │ 1. Resolve selected PluginProcessor values │ │
67//! │ │ - From TaskInfo::processor_names() │ │
68//! │ │ - Via TaskProcessorResolver │ │
69//! │ └──────────────────────────────────────────────┘ │
70//! │ │ │
71//! │ ▼ │
72//! │ ┌──────────────────────────────────────────────┐ │
73//! │ │ 2. PluginProcessor hook: on_task_start │ │
74//! │ │ - Runs before host results are collected │ │
75//! │ │ - Can initialize or modify TaskResults │ │
76//! │ └──────────────────────────────────────────────┘ │
77//! │ │ │
78//! │ ▼ │
79//! │ ┌──────────────────────────────────────────────┐ │
80//! │ │ 3. For each host in inventory │ │
81//! │ │ │ │
82//! │ │ PluginProcessor hook: │ │
83//! │ │ on_instance_start │ │
84//! │ │ - Receives task, parent, depth, host │ │
85//! │ │ │ │
86//! │ │ Execute task.start(...) │ │
87//! │ │ - Record start timestamp │ │
88//! │ │ - Optionally open task-scoped │ │
89//! │ │ connection via TaskConnectionResolver │ │
90//! │ │ - Call task implementation │ │
91//! │ │ - Record finish timestamp │ │
92//! │ │ │ │
93//! │ │ Capture Result │ │
94//! │ │ - Success, failure, or skip │ │
95//! │ │ │ │
96//! │ │ PluginProcessor hook: │ │
97//! │ │ on_instance_finish │ │
98//! │ │ - Can inspect or mutate HostTaskResult │ │
99//! │ │ │ │
100//! │ │ Process sub-tasks, if any │ │
101//! │ │ - Each sub-task selects own processors │ │
102//! │ │ - Recursively execute sub_tasks() │ │
103//! │ └──────────────────────────────────────────────┘ │
104//! │ │ │
105//! │ ▼ │
106//! │ ┌──────────────────────────────────────────────┐ │
107//! │ │ 4. PluginProcessor hook: on_task_finish │ │
108//! │ │ - Runs after host results are collected │ │
109//! │ │ - Can finalize or modify TaskResults │ │
110//! │ └──────────────────────────────────────────────┘ │
111//! └─────────────────────────────────────────────────────────────┘
112//!
113//! 4. Result Collection
114//! ┌──────────────────────────────────────────────────────────┐
115//! │ TaskResults (per task) │
116//! │ │
117//! │ ┌─────────────────────────────────────────┐ │
118//! │ │ Host Results (Map<hostname, result>) │ │
119//! │ │ - router1 → HostTaskResult::Passed │ │
120//! │ │ - router2 → HostTaskResult::Failed │ │
121//! │ │ - router3 → HostTaskResult::Skipped │ │
122//! │ └─────────────────────────────────────────┘ │
123//! │ │
124//! │ ┌───────────────────────────────────────────┐ │
125//! │ │ Sub-task Results (Map<name, TaskResults>) │ │
126//! │ │ - "validate" → TaskResults │ │
127//! │ │ - "deploy" → TaskResults │ │
128//! │ └───────────────────────────────────────────┘ │
129//! │ │
130//! │ ┌─────────────────────────────────────────┐ │
131//! │ │ Execution Timing │ │
132//! │ │ - started_at: SystemTime │ │
133//! │ │ - finished_at: SystemTime │ │
134//! │ │ - duration: calculated from timestamps │ │
135//! │ └─────────────────────────────────────────┘ │
136//! └──────────────────────────────────────────────────────────┘
137//!
138//! 5. Result Aggregation
139//! ┌────────────────────────────────────────────────────────┐
140//! │ Final TaskResults tree │
141//! │ │
142//! │ root_task │
143//! │ ├── host_results: {router1, router2, router3} │
144//! │ ├── timing: {started_at, finished_at, duration} │
145//! │ └── sub_tasks: │
146//! │ ├── validate │
147//! │ │ ├── host_results: {...} │
148//! │ │ └── timing: {...} │
149//! │ └── deploy │
150//! │ ├── host_results: {...} │
151//! │ ├── timing: {...} │
152//! │ └── sub_tasks: │
153//! │ └── verify │
154//! │ ├── host_results: {...} │
155//! │ └── timing: {...} │
156//! └────────────────────────────────────────────────────────┘
157//! ```
158//!
159//! ## Key Lifecycle Stages
160//!
161//! 1. **Definition**: A task struct implements the `Task` trait, providing metadata
162//! (name, plugin), execution logic (`start()`), and optional sub-tasks.
163//!
164//! 2. **Wrapping**: The task is wrapped in a `TaskDefinition` for polymorphic handling,
165//! allowing heterogeneous collections of tasks to be stored and executed uniformly.
166//!
167//! 3. **Execution**: A runner plugin (e.g., `ThreadedRunner`, `SerialRunner`) orchestrates
168//! task execution across selected hosts:
169//! - Optionally resolves a task-scoped connection via [`TaskConnectionResolver`]
170//! - Calls [`Task::start`] for each host with a [`TaskRuntimeContext`]
171//! - Records timing information (start, finish, duration)
172//! - Captures results (success, failure, or skip)
173//! - Recursively processes sub-tasks up to `max_depth`
174//!
175//! 4. **Result Collection**: Each host produces a `HostTaskResult` that is stored in
176//! the task's `TaskResults` structure. Sub-tasks create nested `TaskResults` nodes,
177//! forming a tree structure that mirrors the task hierarchy.
178//!
179//! 5. **Aggregation**: Results are collected into a `TaskResults` tree that provides:
180//! - Per-host outcomes for each task
181//! - Execution timing at each level
182//! - Nested sub-task results
183//! - Summary statistics and status
184//!
185//! # Core Traits
186//!
187//! ## [`Task`]
188//!
189//! The primary trait that all tasks must implement. It combines [`TaskInfo`] for
190//! metadata with execution methods and optional hierarchical sub-task structures.
191//!
192//! In the common workflow, the [`genja_task`](crate::genja_task) attribute macro
193//! generates both [`TaskInfo`] and [`Task`] for you from an inherent `impl` block.
194//! If you call generated metadata methods such as `name()` or
195//! `connection_plugin_name()` directly, import [`TaskInfo`] so those trait methods
196//! are in scope.
197//!
198//! ```rust
199//! use genja_core::inventory::Host;
200//! use genja_core::task::{
201//! HostTaskResult, Task, TaskInfo, TaskRuntimeContext, TaskSuccess,
202//! };
203//! use genja_core::genja_task;
204//!
205//! #[derive(Default)]
206//! struct DeployTask {
207//! options: Option<serde_json::Value>,
208//! config_file: String,
209//! }
210//!
211//! #[genja_task(name = "deploy", connection_plugin_name = "ssh")]
212//! impl DeployTask {
213//! async fn start_async(
214//! &self,
215//! _host: &Host,
216//! _context: &TaskRuntimeContext,
217//! ) -> Result<HostTaskResult, genja_core::task::TaskError> {
218//! Ok(HostTaskResult::passed(
219//! TaskSuccess::new()
220//! .with_changed(true)
221//! .with_summary("Configuration deployed successfully")
222//! ))
223//! }
224//! }
225//!
226//! let task = DeployTask {
227//! options: None,
228//! config_file: "router.conf".to_string()
229//! };
230//!
231//! assert_eq!(task.name(), "deploy");
232//! assert_eq!(task.connection_plugin_name(), Some("ssh"));
233//! ```
234//!
235//! ## [`TaskInfo`]
236//!
237//! Provides metadata about a task including its name, associated plugin, connection
238//! requirements, and optional configuration. This trait is typically
239//! auto-implemented by the [`genja_task`](crate::genja_task) attribute macro.
240//! Static metadata comes from macro arguments such as `name`,
241//! `connection_plugin_name`, and `processors`, while dynamic metadata can be
242//! provided through helper methods such as `options()`.
243//!
244//! ## Task Processors
245//!
246//! [`TaskProcessor`] provides lifecycle hooks for processing task results without
247//! changing the task's execution implementation. A task selects processors by name,
248//! and the runtime resolves those names through a [`TaskProcessorResolver`]. In the
249//! full Genja runtime, the plugin manager implements the resolver, so invalid
250//! processor names fail with `GenjaError::PluginNotFound`.
251//!
252//! Processors can be selected in three ways:
253//!
254//! ```rust
255//! use genja_core::inventory::Host;
256//! use genja_core::task::{
257//! HostTaskResult, Task, TaskDefinition, TaskRuntimeContext, TaskSuccess,
258//! };
259//! use genja_core::genja_task;
260//!
261//! #[derive(Default)]
262//! struct AttributeTask;
263//!
264//! #[derive(Default)]
265//! struct FieldTask {
266//! processor_names: Vec<String>,
267//! }
268//!
269//! #[genja_task(name = "attribute", processors = ["audit"])]
270//! impl AttributeTask {
271//! async fn start_async(
272//! &self,
273//! _host: &Host,
274//! _context: &TaskRuntimeContext,
275//! ) -> Result<HostTaskResult, genja_core::task::TaskError> {
276//! Ok(HostTaskResult::passed(TaskSuccess::new()))
277//! }
278//! }
279//!
280//! #[genja_task(name = "field")]
281//! impl FieldTask {
282//! async fn start_async(
283//! &self,
284//! _host: &Host,
285//! _context: &TaskRuntimeContext,
286//! ) -> Result<HostTaskResult, genja_core::task::TaskError> {
287//! Ok(HostTaskResult::passed(TaskSuccess::new()))
288//! }
289//!
290//! fn processor_names(&self) -> Vec<&str> {
291//! self.processor_names.iter().map(String::as_str).collect()
292//! }
293//! }
294//!
295//! let _attribute_task = AttributeTask;
296//! let _field_task = FieldTask {
297//! processor_names: vec!["audit".to_string()],
298//! };
299//!
300//! let _root_override = TaskDefinition::new(AttributeTask)
301//! .with_processor("metrics");
302//! ```
303//!
304//! Processor selection is per task. Sub-tasks do not inherit their parent's
305//! processor list unless the sub-task itself returns the same processor names.
306//!
307//! ## Sub Tasks
308//!
309//! Tasks can define sub-tasks that execute after the parent task completes.
310//! Sub-tasks receive their own runtime context and execution depth when they
311//! run. Implement `fn sub_tasks(&self) -> Vec<Arc<dyn Task>>` inside a
312//! `#[genja_task(...)]` impl block when a task needs child tasks.
313//!
314//! # Behavioral Rules
315//!
316//! The execution model is intentionally simple and deterministic:
317//!
318//! - The parent task's execution method runs before any of its sub-tasks.
319//! - The parent task's [`HostTaskResult`] is inserted into [`TaskResults`] before
320//! sub-task execution begins.
321//! - Sub-tasks run in the order returned by [`Task::sub_tasks()`].
322//! - Each host is executed independently. When running a single task, the full task
323//! tree is executed once per selected host.
324//! - [`Tasks`] preserves insertion order for root tasks. Runners execute each root
325//! task definition in that order unless a runner explicitly documents a different
326//! scheduling policy, and each returned [`TaskResults`] entry corresponds to the
327//! root task at the same position.
328//! - Sub-task results are grouped by sub-task name. The `TaskResults` node for a
329//! given sub-task contains per-host results accumulated across all hosts.
330//! - The framework does not automatically skip sub-tasks when a parent fails or is
331//! skipped. If you want that behavior, return an explicit [`HostTaskResult::Skipped`]
332//! from the sub-task or encode the condition in the task itself.
333//! - Processor hooks run only for tasks that selected processor names. The root
334//! task can also be given processor names through [`TaskDefinition::with_processor`]
335//! or [`TaskDefinition::with_processors`].
336//! - For a task result tree, processor order is:
337//! `on_task_start`, then for each selected host `on_instance_start`,
338//! [`Task::start`], `on_instance_finish`, then any sub-task trees, then
339//! `on_task_finish`.
340//! - If processor name resolution fails, execution returns
341//! `GenjaError::PluginNotFound`. If a processor hook returns an error, execution
342//! stops and propagates that error.
343//! - `max_depth` is checked using `depth > max_depth`. This means `max_depth = 0`
344//! still allows the root task at depth `0`, but rejects all sub-tasks at depth `1`.
345//! - Exceeding `max_depth` records an internal [`HostTaskResult::Failed`] for the host
346//! at that task node instead of returning an outer execution error.
347//! - If [`Task::start`] returns [`TaskError`], the framework captures it as a
348//! [`TaskFailure`] with timing metadata and stores it in the host results tree.
349//!
350//! # Task Results
351//!
352//! ## [`HostTaskResult`]
353//!
354//! Represents the outcome of executing a task on a single host. It can be:
355//!
356//! - **Passed**: Task completed successfully with optional metadata in [`TaskSuccess`]
357//! - **Failed**: Task encountered an error with details in [`TaskFailure`]
358//! - **Skipped**: Task was not executed with reason in [`TaskSkip`]
359//!
360//! ```rust
361//! use genja_core::task::{HostTaskResult, TaskSuccess, TaskFailure, TaskFailureKind};
362//! use serde_json::json;
363//!
364//! // Success with metadata
365//! let success = HostTaskResult::passed(
366//! TaskSuccess::new()
367//! .with_result(json!({"status": "deployed"}))
368//! .with_changed(true)
369//! .with_diff("+ new_config_line")
370//! );
371//!
372//! // Failure with classification
373//! let failure = HostTaskResult::failed(
374//! TaskFailure::new(std::io::Error::new(
375//! std::io::ErrorKind::ConnectionRefused,
376//! "connection refused"
377//! ))
378//! .with_kind(TaskFailureKind::Connection)
379//! .with_retryable(true)
380//! );
381//!
382//! // Skipped with reason
383//! let skipped = HostTaskResult::skipped_with_reason("parent_failed");
384//! ```
385//!
386//! ## [`TaskResults`]
387//!
388//! A hierarchical structure that stores execution results for a task and all its
389//! sub-tasks across multiple hosts. It provides methods for querying results,
390//! tracking success/failure counts, computing summaries, serializing output, and
391//! navigating the task tree.
392//!
393//! ```rust
394//! use genja_core::task::{TaskResults, HostTaskResult, TaskSuccess};
395//!
396//! let mut results = TaskResults::new("deploy")
397//! .with_summary("Deployment completed");
398//!
399//! results.insert_host_result(
400//! "router1",
401//! HostTaskResult::passed(TaskSuccess::new().with_changed(true))
402//! );
403//!
404//! results.insert_host_result(
405//! "router2",
406//! HostTaskResult::skipped_with_reason("maintenance_mode")
407//! );
408//!
409//! // Query results
410//! assert_eq!(results.passed_hosts().len(), 1);
411//! assert!(results.host_result("router1").unwrap().is_passed());
412//! assert!(results.host_result("router2").unwrap().is_skipped());
413//! ```
414//!
415//! In addition to raw host and sub-task access, [`TaskResults`] supports:
416//!
417//! - aggregate host counts via [`TaskResults::host_summary`]
418//! - recursive task-tree summaries via [`TaskResults::task_summary`]
419//! - human-readable JSON via [`TaskResults::to_json_string`] and
420//! [`TaskResults::to_pretty_json_string`]
421//! - raw serde JSON via [`TaskResults::to_raw_json_string`] and
422//! [`TaskResults::to_raw_pretty_json_string`]
423//!
424//! # Task Metadata
425//!
426//! ## [`TaskSuccess`]
427//!
428//! Contains rich metadata about successful task execution:
429//!
430//! - **result**: Structured data returned by the task (JSON)
431//! - **changed**: Whether the task modified the target system
432//! - **diff**: Text representation of changes made
433//! - **summary**: Human-readable summary of execution
434//! - **warnings**: Non-fatal issues encountered
435//! - **messages**: Structured log messages with levels and codes
436//! - **metadata**: Additional custom data
437//! - **timing**: Start time, finish time, and duration
438//!
439//! ## [`TaskFailure`]
440//!
441//! Contains detailed error information for failed tasks:
442//!
443//! - **error**: The underlying error that caused the failure
444//! - **kind**: Classification of the failure ([`TaskFailureKind`])
445//! - **retryable**: Whether the operation can be retried
446//! - **details**: Additional context about the failure (JSON)
447//! - **warnings**: Non-fatal issues that preceded the failure
448//! - **messages**: Structured log messages
449//!
450//! ## [`TaskSkip`]
451//!
452//! Contains information about why a task was skipped:
453//!
454//! - **reason**: Machine-readable skip reason
455//! - **message**: Human-readable explanation
456//!
457//! # Failure Classification
458//!
459//! The [`TaskFailureKind`] enum categorizes failures to enable appropriate error
460//! handling and retry logic:
461//!
462//! - **Connection**: Network or connectivity issues (often retryable)
463//! - **Authentication**: Credential or permission problems
464//! - **Validation**: Invalid input or configuration
465//! - **Timeout**: Operation exceeded time limit (often retryable)
466//! - **Command**: Remote command execution failed
467//! - **Unsupported**: Operation not supported by target
468//! - **Internal**: Genja/framework implementation error
469//! - **External**: Error returned from a task, plugin, or external dependency
470//!
471//! # Message System
472//!
473//! Tasks can emit structured messages during execution using [`TaskMessage`]:
474//!
475//! ```rust
476//! use genja_core::task::{TaskMessage, MessageLevel};
477//! use std::time::SystemTime;
478//!
479//! let message = TaskMessage::new(MessageLevel::Warning, "High latency detected")
480//! .with_code("latency_warn")
481//! .with_timestamp(SystemTime::now());
482//! ```
483//!
484//! Message levels include:
485//! - **Info**: Informational messages
486//! - **Warning**: Non-fatal issues
487//! - **Error**: Error details
488//! - **Debug**: Debugging information
489//!
490//! The task framework itself also emits log records through the `log` crate during
491//! execution, including task start, skip, failure, and finish events with host and
492//! duration context.
493//!
494//! # Task Execution
495//!
496//! ## [`TaskDefinition`]
497//!
498//! A wrapper around task implementations that provides execution control and
499//! enforces the task execution flow. It handles recursive sub-task execution
500//! with depth limiting to prevent infinite recursion.
501//!
502//! ```rust
503//! use genja_core::inventory::{BaseBuilderHost, Host};
504//! use genja_core::genja_task;
505//! use genja_core::task::{
506//! HostTaskResult, Task, TaskDefinition, TaskResults, TaskRuntimeContext, TaskSuccess,
507//! };
508//! use tokio::runtime::Builder;
509//!
510//! #[derive(Default)]
511//! struct DeployTask;
512//!
513//! #[genja_task(name = "deploy", connection_plugin_name = "ssh")]
514//! impl DeployTask {
515//! async fn start_async(
516//! &self,
517//! _host: &Host,
518//! _context: &TaskRuntimeContext,
519//! ) -> Result<HostTaskResult, genja_core::task::TaskError> {
520//! Ok(HostTaskResult::passed(
521//! TaskSuccess::new().with_summary("deploy complete"),
522//! ))
523//! }
524//! }
525//!
526//! let task = TaskDefinition::new(DeployTask);
527//! let host = Host::builder().hostname("router1").build();
528//! let mut results = TaskResults::new("deploy");
529//!
530//! let runtime = Builder::new_current_thread().enable_all().build().unwrap();
531//! runtime
532//! .block_on(task.start("router1", &host, &mut results, 1))
533//! .expect("task execution should succeed");
534//!
535//! assert!(results.host_result("router1").unwrap().is_passed());
536//! ```
537//!
538//! ## [`Tasks`]
539//!
540//! A collection type for managing an ordered list of root task definitions.
541//! Each root task may have its own nested sub-task tree. Runtime runners execute
542//! the list as a forest of task trees and return results in the same root order.
543//!
544//! ```rust
545//! use genja_core::inventory::Host;
546//! use genja_core::genja_task;
547//! use genja_core::task::{HostTaskResult, Task, TaskRuntimeContext, TaskSuccess, Tasks};
548//!
549//! struct WorkflowTask {
550//! name: &'static str,
551//! }
552//!
553//! #[genja_task(name = "workflow")]
554//! impl WorkflowTask {
555//! async fn start_async(
556//! &self,
557//! _host: &Host,
558//! _context: &TaskRuntimeContext,
559//! ) -> Result<HostTaskResult, genja_core::task::TaskError> {
560//! Ok(HostTaskResult::passed(TaskSuccess::new()))
561//! }
562//!
563//! fn options(&self) -> Option<&serde_json::Value> {
564//! None
565//! }
566//! }
567//!
568//! let mut tasks = Tasks::new();
569//! tasks.add_task(WorkflowTask { name: "collect_facts" });
570//! tasks.add_task(WorkflowTask { name: "deploy_changes" });
571//! tasks.add_task(WorkflowTask { name: "verify_health" });
572//!
573//! assert_eq!(tasks.len(), 3);
574//! ```
575//!
576//! # Advanced Usage
577//!
578//! ## Hierarchical Task Execution
579//!
580//! Tasks can define sub-tasks that execute after the parent task completes. This
581//!
582use crate::inventory::{Connection, Host};
583use crate::types::{CustomTreeMap, NatString};
584use async_recursion::async_recursion;
585use async_trait::async_trait;
586use log::{debug, info, warn};
587use serde::Serialize;
588use serde_json::Value;
589use std::any::{Any, type_name};
590use std::error::Error;
591use std::fmt;
592use std::ops::{Deref, DerefMut};
593use std::sync::Arc;
594use std::time::SystemTime;
595use tokio::runtime::Handle;
596use tokio::sync::Mutex;
597use tokio::task;
598
599/// Represents an error that occurred during task execution.
600///
601/// `TaskError` wraps an underlying error and preserves both the error itself and its
602/// concrete type information. This allows for type-safe error handling and downcasting
603/// while maintaining thread safety through `Arc` wrapping.
604///
605/// The error type stores:
606/// - The actual error as a trait object (`dyn Error + Send + Sync`)
607/// - The original error's type name as a string for debugging and logging
608/// - An optional source reference to the original concrete error for downcasting
609///
610/// # Thread Safety
611///
612/// `TaskError` is designed to be safely shared across threads, with all internal
613/// references wrapped in `Arc` and requiring `Send + Sync` bounds.
614///
615/// # Example
616///
617/// ```rust
618/// use genja_core::task::TaskError;
619/// use std::io;
620///
621/// // Create a TaskError from a standard error
622/// let io_error = io::Error::new(io::ErrorKind::NotFound, "file not found");
623/// let task_error = TaskError::new(io_error);
624///
625/// // Access error information
626/// println!("Error: {}", task_error.error());
627/// println!("Error type: {}", task_error.error_type());
628///
629/// // Attempt to downcast to the original error type
630/// if let Some(io_err) = task_error.downcast_ref::<io::Error>() {
631/// println!("Original IO error kind: {:?}", io_err.kind());
632/// }
633/// ```
634#[derive(Clone)]
635pub struct TaskError {
636 error: Arc<dyn Error + Send + Sync + 'static>,
637 error_type: String,
638 source: Option<Arc<dyn Any + Send + Sync + 'static>>,
639}
640
641type TaskFailureSource = Arc<dyn Any + Send + Sync + 'static>;
642
643impl TaskError {
644 pub fn new<E>(error: E) -> Self
645 where
646 E: Error + Send + Sync + 'static,
647 {
648 let error = Arc::new(error);
649 Self {
650 error_type: type_name::<E>().to_string(),
651 source: Some(error.clone()),
652 error,
653 }
654 }
655
656 pub fn from_arc(error: Arc<dyn Error + Send + Sync + 'static>) -> Self {
657 Self {
658 error,
659 error_type: "dyn core::error::Error".to_string(),
660 source: None,
661 }
662 }
663
664 pub fn error(&self) -> &(dyn Error + Send + Sync + 'static) {
665 self.error.as_ref()
666 }
667
668 pub fn error_type(&self) -> &str {
669 &self.error_type
670 }
671
672 pub fn downcast_ref<E>(&self) -> Option<&E>
673 where
674 E: 'static,
675 {
676 self.source
677 .as_ref()
678 .and_then(|source| source.downcast_ref::<E>())
679 }
680}
681
682impl fmt::Debug for TaskError {
683 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
684 f.debug_struct("TaskError")
685 .field("error_type", &self.error_type)
686 .field("message", &self.error.to_string())
687 .finish()
688 }
689}
690
691impl fmt::Display for TaskError {
692 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
693 write!(f, "{}", self.error)
694 }
695}
696
697impl Error for TaskError {}
698
699#[derive(Debug)]
700struct CapturedTaskFailure {
701 message: String,
702}
703
704impl fmt::Display for CapturedTaskFailure {
705 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
706 write!(f, "{}", self.message)
707 }
708}
709
710impl Error for CapturedTaskFailure {}
711
712fn format_timestamp_display(timestamp: SystemTime) -> String {
713 humantime::format_rfc3339_seconds(timestamp).to_string()
714}
715
716fn format_duration_display(duration_ns: u128) -> String {
717 if duration_ns < 1_000 {
718 return format!("{duration_ns}ns");
719 }
720
721 if duration_ns < 1_000_000 {
722 return format_decimal_unit(duration_ns as f64 / 1_000.0, "us");
723 }
724
725 if duration_ns < 1_000_000_000 {
726 return format_decimal_unit(duration_ns as f64 / 1_000_000.0, "ms");
727 }
728
729 if duration_ns < 60_000_000_000 {
730 return format_decimal_unit(duration_ns as f64 / 1_000_000_000.0, "s");
731 }
732
733 if duration_ns < 3_600_000_000_000 {
734 return format_decimal_unit(duration_ns as f64 / 60_000_000_000.0, "m");
735 }
736
737 format_decimal_unit(duration_ns as f64 / 3_600_000_000_000.0, "h")
738}
739
740fn format_decimal_unit(value: f64, unit: &str) -> String {
741 let precision = if value >= 100.0 {
742 0
743 } else if value >= 10.0 {
744 1
745 } else {
746 2
747 };
748
749 let formatted = format!("{value:.precision$}");
750 let trimmed = if let Some((whole, fractional)) = formatted.split_once('.') {
751 let fractional = fractional.trim_end_matches('0');
752 if fractional.is_empty() {
753 whole.to_string()
754 } else {
755 format!("{whole}.{fractional}")
756 }
757 } else {
758 formatted
759 };
760 format!("{trimmed}{unit}")
761}
762
763/// Results of a task execution, including timing, host outcomes, and nested sub-task results.
764///
765/// `TaskResults` captures the complete execution state of a task across multiple hosts,
766/// including timing information, a summary, per-host results, and any sub-tasks that were
767/// executed as part of this task. This structure forms a tree where each task can contain
768/// results for multiple hosts and multiple sub-tasks, allowing for hierarchical task execution
769/// tracking.
770///
771/// # Fields
772///
773/// * `task_name` - The name of the task that was executed.
774/// * `started_at` - The timestamp when the task execution started, if available.
775/// * `finished_at` - The timestamp when the task execution finished, if available.
776/// * `duration_ms` - The duration of the task execution in milliseconds, if available.
777/// * `summary` - An optional summary message describing the overall task execution.
778/// * `hosts` - A map of hostname to `HostTaskResult`, containing the execution result for each host.
779/// * `sub_tasks` - A map of sub-task name to `TaskResults`, containing results for any nested tasks.
780///
781/// # Example
782///
783/// ```rust
784/// use genja_core::task::{TaskResults, HostTaskResult, TaskSuccess};
785/// use std::time::SystemTime;
786///
787/// // Create a new task results container
788/// let mut results = TaskResults::new("deploy_config")
789/// .with_started_at(SystemTime::now())
790/// .with_summary("Deploying configuration to network devices");
791///
792/// // Add results for individual hosts
793/// results.insert_host_result(
794/// "router1",
795/// HostTaskResult::passed(
796/// TaskSuccess::new()
797/// .with_changed(true)
798/// .with_summary("Configuration deployed successfully")
799/// )
800/// );
801///
802/// results.insert_host_result(
803/// "router2",
804/// HostTaskResult::skipped_with_reason("Device in maintenance mode")
805/// );
806///
807/// // Create and add sub-task results
808/// let mut validation_results = TaskResults::new("validate_config");
809/// validation_results.insert_host_result(
810/// "router1",
811/// HostTaskResult::passed(TaskSuccess::new())
812/// );
813///
814/// results.insert_sub_task("validate_config", validation_results);
815///
816/// // Query results
817/// assert_eq!(results.task_name(), "deploy_config");
818/// assert_eq!(results.passed_hosts().len(), 1);
819/// assert!(results.sub_task("validate_config").is_some());
820/// ```
821#[derive(Debug, Clone, Default, Serialize)]
822pub struct TaskResults {
823 task_name: String,
824 started_at: Option<SystemTime>,
825 finished_at: Option<SystemTime>,
826 duration_ns: Option<u128>,
827 duration_ms: Option<u128>,
828 summary: Option<String>,
829 hosts: CustomTreeMap<HostTaskResult>,
830 sub_tasks: CustomTreeMap<TaskResults>,
831}
832
833#[derive(Serialize)]
834struct TaskResultsHumanJson<'a> {
835 task_name: &'a str,
836 started_at: Option<String>,
837 finished_at: Option<String>,
838 duration: Option<String>,
839 summary: Option<&'a str>,
840 hosts: CustomTreeMap<HostTaskResultHumanJson<'a>>,
841 sub_tasks: CustomTreeMap<TaskResultsHumanJson<'a>>,
842}
843
844#[derive(Serialize)]
845enum HostTaskResultHumanJson<'a> {
846 Passed(TaskSuccessHumanJson<'a>),
847 Failed(TaskFailureHumanJson<'a>),
848 Skipped(TaskSkipHumanJson<'a>),
849}
850
851#[derive(Serialize)]
852struct TaskSuccessHumanJson<'a> {
853 result: Option<&'a Value>,
854 changed: bool,
855 diff: Option<&'a str>,
856 summary: Option<&'a str>,
857 warnings: &'a [String],
858 messages: &'a [TaskMessage],
859 metadata: Option<&'a Value>,
860 started_at: Option<String>,
861 finished_at: Option<String>,
862 duration: Option<String>,
863}
864
865#[derive(Serialize)]
866struct TaskFailureHumanJson<'a> {
867 kind: &'a TaskFailureKind,
868 error_type: &'a str,
869 message: &'a str,
870 retryable: bool,
871 details: Option<&'a Value>,
872 warnings: &'a [String],
873 messages: &'a [TaskMessage],
874 started_at: Option<String>,
875 finished_at: Option<String>,
876 duration: Option<String>,
877}
878
879#[derive(Serialize)]
880struct TaskSkipHumanJson<'a> {
881 reason: Option<&'a str>,
882 message: Option<&'a str>,
883}
884
885/// Aggregate host outcome counts for a task result node.
886#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Default)]
887pub struct TaskHostSummary {
888 passed: usize,
889 failed: usize,
890 skipped: usize,
891}
892
893/// Recursive summary of a task result tree.
894#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
895pub struct TaskResultsSummary {
896 task_name: String,
897 hosts: TaskHostSummary,
898 duration_ns: Option<u128>,
899 sub_tasks: CustomTreeMap<TaskResultsSummary>,
900}
901
902impl TaskHostSummary {
903 /// Creates a new aggregate host summary.
904 pub fn new(passed: usize, failed: usize, skipped: usize) -> Self {
905 Self {
906 passed,
907 failed,
908 skipped,
909 }
910 }
911
912 /// Returns the number of hosts that passed.
913 pub fn passed(&self) -> usize {
914 self.passed
915 }
916
917 /// Returns the number of hosts that failed.
918 pub fn failed(&self) -> usize {
919 self.failed
920 }
921
922 /// Returns the number of hosts that were skipped.
923 pub fn skipped(&self) -> usize {
924 self.skipped
925 }
926
927 /// Returns the total number of hosts represented in this summary.
928 pub fn total(&self) -> usize {
929 self.passed + self.failed + self.skipped
930 }
931}
932
933impl TaskResultsSummary {
934 /// Returns the task name for this summary node.
935 pub fn task_name(&self) -> &str {
936 &self.task_name
937 }
938
939 /// Returns the host outcome counts for this summary node.
940 pub fn hosts(&self) -> TaskHostSummary {
941 self.hosts
942 }
943
944 /// Returns the duration in milliseconds for this summary node, if available.
945 pub fn duration_ms(&self) -> Option<u128> {
946 self.duration_ns.map(|duration_ns| duration_ns / 1_000_000)
947 }
948
949 /// Returns the duration in a human-readable format for this summary node, if available.
950 pub fn duration_display(&self) -> Option<String> {
951 self.duration_ns.map(format_duration_display)
952 }
953
954 /// Returns recursive sub-task summaries keyed by task name.
955 pub fn sub_tasks(&self) -> &CustomTreeMap<TaskResultsSummary> {
956 &self.sub_tasks
957 }
958}
959
960impl<'a> From<&'a HostTaskResult> for HostTaskResultHumanJson<'a> {
961 fn from(result: &'a HostTaskResult) -> Self {
962 match result {
963 HostTaskResult::Passed(success) => Self::Passed(TaskSuccessHumanJson::from(success)),
964 HostTaskResult::Failed(failure) => Self::Failed(TaskFailureHumanJson::from(failure)),
965 HostTaskResult::Skipped(skip) => Self::Skipped(TaskSkipHumanJson::from(skip)),
966 }
967 }
968}
969
970impl<'a> From<&'a TaskSuccess> for TaskSuccessHumanJson<'a> {
971 fn from(success: &'a TaskSuccess) -> Self {
972 Self {
973 result: success.result(),
974 changed: success.changed(),
975 diff: success.diff(),
976 summary: success.summary(),
977 warnings: success.warnings(),
978 messages: success.messages(),
979 metadata: success.metadata(),
980 started_at: success.started_at_display(),
981 finished_at: success.finished_at_display(),
982 duration: success.duration_display(),
983 }
984 }
985}
986
987impl<'a> From<&'a TaskFailure> for TaskFailureHumanJson<'a> {
988 fn from(failure: &'a TaskFailure) -> Self {
989 Self {
990 kind: failure.kind(),
991 error_type: failure.error_type(),
992 message: failure.message(),
993 retryable: failure.retryable(),
994 details: failure.details(),
995 warnings: failure.warnings(),
996 messages: failure.messages(),
997 started_at: failure.started_at_display(),
998 finished_at: failure.finished_at_display(),
999 duration: failure.duration_display(),
1000 }
1001 }
1002}
1003
1004impl<'a> From<&'a TaskSkip> for TaskSkipHumanJson<'a> {
1005 fn from(skip: &'a TaskSkip) -> Self {
1006 Self {
1007 reason: skip.reason(),
1008 message: skip.message(),
1009 }
1010 }
1011}
1012
1013impl<'a> From<&'a TaskResults> for TaskResultsHumanJson<'a> {
1014 fn from(results: &'a TaskResults) -> Self {
1015 let mut hosts = CustomTreeMap::new();
1016 for (hostname, host_result) in results.hosts().iter() {
1017 hosts.insert(hostname, HostTaskResultHumanJson::from(host_result));
1018 }
1019
1020 let mut sub_tasks = CustomTreeMap::new();
1021 for (task_name, task_results) in results.sub_tasks().iter() {
1022 sub_tasks.insert(task_name, TaskResultsHumanJson::from(task_results));
1023 }
1024
1025 Self {
1026 task_name: results.task_name(),
1027 started_at: results.started_at_display(),
1028 finished_at: results.finished_at_display(),
1029 duration: results.duration_display(),
1030 summary: results.summary(),
1031 hosts,
1032 sub_tasks,
1033 }
1034 }
1035}
1036
1037impl TaskResults {
1038 /// Creates a new `TaskResults` instance with the specified task name.
1039 ///
1040 /// This constructor initializes a `TaskResults` with the given task name and empty
1041 /// collections for hosts and sub-tasks. All timing and summary fields are set to `None`.
1042 ///
1043 /// # Parameters
1044 ///
1045 /// * `task_name` - The name of the task. Can be any type that implements `Into<String>`,
1046 /// such as `&str`, `String`, or other string-like types.
1047 ///
1048 /// # Returns
1049 ///
1050 /// A new `TaskResults` instance with the specified task name and default values for all
1051 /// other fields.
1052 pub fn new(task_name: impl Into<String>) -> Self {
1053 Self {
1054 task_name: task_name.into(),
1055 started_at: None,
1056 finished_at: None,
1057 duration_ns: None,
1058 duration_ms: None,
1059 summary: None,
1060 hosts: CustomTreeMap::new(),
1061 sub_tasks: CustomTreeMap::new(),
1062 }
1063 }
1064
1065 /// Returns the name of the task.
1066 ///
1067 /// # Returns
1068 ///
1069 /// A string slice containing the task name.
1070 pub fn task_name(&self) -> &str {
1071 &self.task_name
1072 }
1073
1074 /// Sets the task execution start timestamp.
1075 ///
1076 /// This is a builder method that consumes `self` and returns the modified instance,
1077 /// allowing for method chaining.
1078 ///
1079 /// # Parameters
1080 ///
1081 /// * `started_at` - The timestamp when the task execution started.
1082 ///
1083 /// # Returns
1084 ///
1085 /// The modified `TaskResults` instance with the start timestamp set.
1086 pub fn with_started_at(mut self, started_at: SystemTime) -> Self {
1087 self.started_at = Some(started_at);
1088 self
1089 }
1090
1091 /// Sets the task execution finish timestamp.
1092 ///
1093 /// This is a builder method that consumes `self` and returns the modified instance,
1094 /// allowing for method chaining.
1095 ///
1096 /// # Parameters
1097 ///
1098 /// * `finished_at` - The timestamp when the task execution finished.
1099 ///
1100 /// # Returns
1101 ///
1102 /// The modified `TaskResults` instance with the finish timestamp set.
1103 pub fn with_finished_at(mut self, finished_at: SystemTime) -> Self {
1104 self.finished_at = Some(finished_at);
1105 self
1106 }
1107
1108 /// Sets the task execution duration in milliseconds.
1109 ///
1110 /// This is a builder method that consumes `self` and returns the modified instance,
1111 /// allowing for method chaining.
1112 ///
1113 /// # Parameters
1114 ///
1115 /// * `duration_ms` - The duration of the task execution in milliseconds.
1116 ///
1117 /// # Returns
1118 ///
1119 /// The modified `TaskResults` instance with the duration set.
1120 pub fn with_duration_ms(mut self, duration_ms: u128) -> Self {
1121 self.duration_ns = Some(duration_ms.saturating_mul(1_000_000));
1122 self.duration_ms = Some(duration_ms);
1123 self
1124 }
1125
1126 /// Sets the task execution duration in nanoseconds.
1127 pub fn with_duration_ns(mut self, duration_ns: u128) -> Self {
1128 self.duration_ns = Some(duration_ns);
1129 self.duration_ms = Some(duration_ns / 1_000_000);
1130 self
1131 }
1132
1133 /// Sets a summary message describing the task execution.
1134 ///
1135 /// This is a builder method that consumes `self` and returns the modified instance,
1136 /// allowing for method chaining.
1137 ///
1138 /// # Parameters
1139 ///
1140 /// * `summary` - A human-readable summary message. Can be any type that implements
1141 /// `Into<String>`, such as `&str`, `String`, or other string-like types.
1142 ///
1143 /// # Returns
1144 ///
1145 /// The modified `TaskResults` instance with the summary set.
1146 pub fn with_summary(mut self, summary: impl Into<String>) -> Self {
1147 self.summary = Some(summary.into());
1148 self
1149 }
1150
1151 /// Merges another result tree for the same task into this one.
1152 ///
1153 /// Host results are inserted directly and sub-task trees are merged
1154 /// recursively. Aggregate timing is widened to cover the full execution
1155 /// window across both result trees.
1156 pub fn merge(&mut self, other: TaskResults) {
1157 let mut other = other;
1158 debug_assert_eq!(self.task_name, other.task_name);
1159
1160 if let (Some(started_at), Some(finished_at)) = (other.started_at, other.finished_at) {
1161 self.record_execution_timing(started_at, finished_at);
1162 } else {
1163 if self.started_at.is_none() {
1164 self.started_at = other.started_at;
1165 }
1166 if self.finished_at.is_none() {
1167 self.finished_at = other.finished_at;
1168 }
1169 if self.duration_ns.is_none() {
1170 self.duration_ns = other.duration_ns;
1171 }
1172 if self.duration_ms.is_none() {
1173 self.duration_ms = other.duration_ms;
1174 }
1175 }
1176
1177 if self.summary.is_none() {
1178 self.summary = other.summary;
1179 }
1180
1181 for (hostname, result) in std::mem::take(&mut other.hosts).into_iter() {
1182 self.insert_host_result(hostname, result);
1183 }
1184
1185 for (task_name, sub_results) in std::mem::take(&mut other.sub_tasks).into_iter() {
1186 if let Some(existing) = self.sub_task_mut(task_name.as_str()) {
1187 existing.merge(sub_results);
1188 } else {
1189 self.insert_sub_task(task_name, sub_results);
1190 }
1191 }
1192 }
1193
1194 fn record_execution_timing(&mut self, started_at: SystemTime, finished_at: SystemTime) {
1195 if self.started_at.is_none_or(|current| started_at < current) {
1196 self.started_at = Some(started_at);
1197 }
1198
1199 if self.finished_at.is_none_or(|current| finished_at > current) {
1200 self.finished_at = Some(finished_at);
1201 }
1202
1203 if let (Some(started_at), Some(finished_at)) = (self.started_at, self.finished_at) {
1204 let duration_ns = finished_at
1205 .duration_since(started_at)
1206 .map(|duration| duration.as_nanos())
1207 .unwrap_or(0);
1208 self.duration_ns = Some(duration_ns);
1209 self.duration_ms = Some(duration_ns / 1_000_000);
1210 }
1211 }
1212
1213 /// Returns the task execution start timestamp, if available.
1214 ///
1215 /// # Returns
1216 ///
1217 /// `Some(SystemTime)` if the start timestamp was set, `None` otherwise.
1218 pub fn started_at(&self) -> Option<SystemTime> {
1219 self.started_at
1220 }
1221
1222 /// Returns the task execution finish timestamp, if available.
1223 ///
1224 /// # Returns
1225 ///
1226 /// `Some(SystemTime)` if the finish timestamp was set, `None` otherwise.
1227 pub fn finished_at(&self) -> Option<SystemTime> {
1228 self.finished_at
1229 }
1230
1231 /// Returns the task execution duration in milliseconds, if available.
1232 ///
1233 /// # Returns
1234 ///
1235 /// `Some(u128)` if the duration was set, `None` otherwise.
1236 pub fn duration_ms(&self) -> Option<u128> {
1237 self.duration_ns
1238 .map(|duration_ns| duration_ns / 1_000_000)
1239 .or(self.duration_ms)
1240 }
1241
1242 /// Returns the task execution duration in nanoseconds, if available.
1243 pub fn duration_ns(&self) -> Option<u128> {
1244 self.duration_ns.or_else(|| {
1245 self.duration_ms
1246 .map(|duration_ms| duration_ms.saturating_mul(1_000_000))
1247 })
1248 }
1249
1250 /// Returns the task execution start timestamp in RFC 3339 format, if available.
1251 pub fn started_at_display(&self) -> Option<String> {
1252 self.started_at.map(format_timestamp_display)
1253 }
1254
1255 /// Returns the task execution finish timestamp in RFC 3339 format, if available.
1256 pub fn finished_at_display(&self) -> Option<String> {
1257 self.finished_at.map(format_timestamp_display)
1258 }
1259
1260 /// Returns the task execution duration in a human-readable format, if available.
1261 pub fn duration_display(&self) -> Option<String> {
1262 self.duration_ns().map(format_duration_display)
1263 }
1264
1265 /// Serializes task results as compact human-readable JSON.
1266 pub fn to_json_string(&self) -> Result<String, serde_json::Error> {
1267 serde_json::to_string(&TaskResultsHumanJson::from(self))
1268 }
1269
1270 /// Serializes task results as pretty-printed human-readable JSON.
1271 pub fn to_pretty_json_string(&self) -> Result<String, serde_json::Error> {
1272 serde_json::to_string_pretty(&TaskResultsHumanJson::from(self))
1273 }
1274
1275 /// Serializes task results as compact raw JSON using the struct's default serde representation.
1276 pub fn to_raw_json_string(&self) -> Result<String, serde_json::Error> {
1277 serde_json::to_string(self)
1278 }
1279
1280 /// Serializes task results as pretty-printed raw JSON using the struct's default serde representation.
1281 pub fn to_raw_pretty_json_string(&self) -> Result<String, serde_json::Error> {
1282 serde_json::to_string_pretty(self)
1283 }
1284
1285 /// Returns the task summary message, if available.
1286 ///
1287 /// # Returns
1288 ///
1289 /// `Some(&str)` if a summary was set, `None` otherwise.
1290 pub fn summary(&self) -> Option<&str> {
1291 self.summary.as_deref()
1292 }
1293
1294 /// Inserts or updates the execution result for a specific host.
1295 ///
1296 /// If a result already exists for the given hostname, it will be replaced with the new result.
1297 ///
1298 /// # Parameters
1299 ///
1300 /// * `hostname` - The hostname to associate with the result. Can be any type that implements
1301 /// `Into<NatString>`, such as `&str`, `String`, or `NatString`.
1302 /// * `result` - The `HostTaskResult` containing the execution outcome for this host.
1303 pub fn insert_host_result<K>(&mut self, hostname: K, result: HostTaskResult)
1304 where
1305 K: Into<NatString>,
1306 {
1307 self.hosts.insert(hostname.into(), result);
1308 }
1309
1310 /// Retrieves the execution result for a specific host.
1311 ///
1312 /// # Parameters
1313 ///
1314 /// * `hostname` - The hostname to look up.
1315 ///
1316 /// # Returns
1317 ///
1318 /// `Some(&HostTaskResult)` if a result exists for the hostname, `None` otherwise.
1319 pub fn host_result(&self, hostname: &str) -> Option<&HostTaskResult> {
1320 self.hosts.get(hostname)
1321 }
1322
1323 /// Retrieves a mutable reference to the execution result for a specific host.
1324 ///
1325 /// # Parameters
1326 ///
1327 /// * `hostname` - The hostname to look up.
1328 ///
1329 /// # Returns
1330 ///
1331 /// `Some(&mut HostTaskResult)` if a result exists for the hostname, `None` otherwise.
1332 pub fn host_result_mut(&mut self, hostname: &str) -> Option<&mut HostTaskResult> {
1333 self.hosts.get_mut(hostname)
1334 }
1335
1336 /// Returns a reference to the map of all host results.
1337 ///
1338 /// # Returns
1339 ///
1340 /// A reference to the `CustomTreeMap` containing hostname to `HostTaskResult` mappings.
1341 pub fn hosts(&self) -> &CustomTreeMap<HostTaskResult> {
1342 &self.hosts
1343 }
1344
1345 /// Inserts or updates the results for a sub-task.
1346 ///
1347 /// If results already exist for the given sub-task name, they will be replaced with the new results.
1348 ///
1349 /// # Parameters
1350 ///
1351 /// * `task_name` - The name of the sub-task. Can be any type that implements `Into<NatString>`,
1352 /// such as `&str`, `String`, or `NatString`.
1353 /// * `results` - The `TaskResults` containing the execution results for this sub-task.
1354 pub fn insert_sub_task<K>(&mut self, task_name: K, results: TaskResults)
1355 where
1356 K: Into<NatString>,
1357 {
1358 self.sub_tasks.insert(task_name.into(), results);
1359 }
1360
1361 /// Retrieves the results for a specific sub-task.
1362 ///
1363 /// # Parameters
1364 ///
1365 /// * `task_name` - The name of the sub-task to look up.
1366 ///
1367 /// # Returns
1368 ///
1369 /// `Some(&TaskResults)` if results exist for the sub-task, `None` otherwise.
1370 pub fn sub_task(&self, task_name: &str) -> Option<&TaskResults> {
1371 self.sub_tasks.get(task_name)
1372 }
1373
1374 /// Retrieves a mutable reference to the results for a specific sub-task.
1375 ///
1376 /// # Parameters
1377 ///
1378 /// * `task_name` - The name of the sub-task to look up.
1379 ///
1380 /// # Returns
1381 ///
1382 /// `Some(&mut TaskResults)` if results exist for the sub-task, `None` otherwise.
1383 pub fn sub_task_mut(&mut self, task_name: &str) -> Option<&mut TaskResults> {
1384 self.sub_tasks.get_mut(task_name)
1385 }
1386
1387 /// Returns a reference to the map of all sub-task results.
1388 ///
1389 /// # Returns
1390 ///
1391 /// A reference to the `CustomTreeMap` containing sub-task name to `TaskResults` mappings.
1392 pub fn sub_tasks(&self) -> &CustomTreeMap<TaskResults> {
1393 &self.sub_tasks
1394 }
1395
1396 /// Returns a list of hostnames for which the task execution passed.
1397 ///
1398 /// This method filters the host results and collects the hostnames where the
1399 /// `HostTaskResult` indicates a successful execution (passed state).
1400 ///
1401 /// # Returns
1402 ///
1403 /// A `Vec` containing references to the hostnames of all hosts where the task passed.
1404 pub fn passed_hosts(&self) -> Vec<&NatString> {
1405 self.hosts
1406 .iter()
1407 .filter_map(|(host, result)| result.is_passed().then_some(host))
1408 .collect()
1409 }
1410
1411 /// Returns a list of hostnames for which the task execution failed.
1412 ///
1413 /// This method filters the host results and collects the hostnames where the
1414 /// `HostTaskResult` indicates a failed execution.
1415 ///
1416 /// # Returns
1417 ///
1418 /// A `Vec` containing references to the hostnames of all hosts where the task failed.
1419 pub fn failed_hosts(&self) -> Vec<&NatString> {
1420 self.hosts
1421 .iter()
1422 .filter_map(|(host, result)| result.is_failed().then_some(host))
1423 .collect()
1424 }
1425
1426 /// Returns a list of hostnames for which the task execution was skipped.
1427 pub fn skipped_hosts(&self) -> Vec<&NatString> {
1428 self.hosts
1429 .iter()
1430 .filter_map(|(host, result)| result.is_skipped().then_some(host))
1431 .collect()
1432 }
1433
1434 /// Returns aggregate host counts for this task only.
1435 pub fn host_summary(&self) -> TaskHostSummary {
1436 TaskHostSummary::new(
1437 self.passed_hosts().len(),
1438 self.failed_hosts().len(),
1439 self.skipped_hosts().len(),
1440 )
1441 }
1442
1443 /// Returns a recursive summary of this task and all sub-tasks.
1444 pub fn task_summary(&self) -> TaskResultsSummary {
1445 let mut sub_tasks = CustomTreeMap::new();
1446 for (task_name, task_results) in self.sub_tasks().iter() {
1447 sub_tasks.insert(task_name, task_results.task_summary());
1448 }
1449
1450 TaskResultsSummary {
1451 task_name: self.task_name.clone(),
1452 hosts: self.host_summary(),
1453 duration_ns: self.duration_ns(),
1454 sub_tasks,
1455 }
1456 }
1457}
1458
1459/// Represents the execution outcome of a task on a single host.
1460///
1461/// `HostTaskResult` captures one of three possible states for a task execution:
1462/// - **Passed**: The task completed successfully, potentially with changes, warnings, or metadata.
1463/// - **Failed**: The task encountered an error and could not complete successfully.
1464/// - **Skipped**: The task was not executed, typically due to conditional logic or dependencies.
1465///
1466/// This enum provides a type-safe way to represent task outcomes and includes helper methods
1467/// to query the result state and extract the underlying success, failure, or skip details.
1468///
1469/// # Variants
1470///
1471/// * `Passed(TaskSuccess)` - The task executed successfully. Contains detailed information about
1472/// the execution including any results, changes made, warnings, and timing information.
1473///
1474/// * `Failed(TaskFailure)` - The task failed during execution. Contains error information,
1475/// failure classification, retry hints, and any warnings or messages collected before failure.
1476///
1477/// * `Skipped(TaskSkip)` - The task was skipped and not executed. Contains optional reason
1478/// and message explaining why the task was skipped.
1479///
1480/// # Example
1481///
1482/// ```rust
1483/// use genja_core::task::{HostTaskResult, TaskSuccess, TaskFailure};
1484///
1485/// // Create a successful result
1486/// let success = HostTaskResult::passed(
1487/// TaskSuccess::new()
1488/// .with_changed(true)
1489/// .with_summary("Configuration updated")
1490/// );
1491///
1492/// // Check the result state
1493/// assert!(success.is_passed());
1494/// assert!(!success.is_failed());
1495///
1496/// // Extract success details
1497/// if let Some(details) = success.success() {
1498/// assert!(details.changed());
1499/// }
1500///
1501/// // Create a skipped result
1502/// let skipped = HostTaskResult::skipped_with_reason("Host in maintenance mode");
1503/// assert!(skipped.is_skipped());
1504/// ```
1505#[derive(Debug, Clone, Serialize)]
1506pub enum HostTaskResult {
1507 Passed(TaskSuccess),
1508 Failed(TaskFailure),
1509 Skipped(TaskSkip),
1510}
1511
1512impl HostTaskResult {
1513 /// Creates a new `HostTaskResult` representing a successful task execution.
1514 ///
1515 /// This constructor wraps a `TaskSuccess` instance in the `Passed` variant,
1516 /// indicating that the task completed successfully on the host.
1517 ///
1518 /// # Parameters
1519 ///
1520 /// * `result` - The `TaskSuccess` containing details about the successful execution,
1521 /// including any results, changes made, warnings, and timing information.
1522 ///
1523 /// # Returns
1524 ///
1525 /// A `HostTaskResult::Passed` variant containing the provided success details.
1526 pub fn passed(result: TaskSuccess) -> Self {
1527 Self::Passed(result)
1528 }
1529
1530 /// Creates a new `HostTaskResult` representing a failed task execution.
1531 ///
1532 /// This constructor wraps a `TaskFailure` instance in the `Failed` variant,
1533 /// indicating that the task encountered an error and could not complete successfully.
1534 ///
1535 /// # Parameters
1536 ///
1537 /// * `failure` - The `TaskFailure` containing error information, failure classification,
1538 /// retry hints, and any warnings or messages collected before failure.
1539 ///
1540 /// # Returns
1541 ///
1542 /// A `HostTaskResult::Failed` variant containing the provided failure details.
1543 pub fn failed(failure: TaskFailure) -> Self {
1544 Self::Failed(failure)
1545 }
1546
1547 /// Creates a new `HostTaskResult` representing a skipped task execution.
1548 ///
1549 /// This constructor creates a `Skipped` variant with default (empty) skip details,
1550 /// indicating that the task was not executed on the host.
1551 ///
1552 /// # Returns
1553 ///
1554 /// A `HostTaskResult::Skipped` variant with default skip information (no reason or message).
1555 pub fn skipped() -> Self {
1556 Self::Skipped(TaskSkip::default())
1557 }
1558
1559 /// Creates a new `HostTaskResult` representing a skipped task execution with a reason.
1560 ///
1561 /// This constructor creates a `Skipped` variant with a specified reason explaining
1562 /// why the task was not executed on the host.
1563 ///
1564 /// # Parameters
1565 ///
1566 /// * `reason` - A machine-readable reason code or identifier explaining why the task
1567 /// was skipped. Can be any type that implements `Into<String>`, such as `&str`,
1568 /// `String`, or other string-like types.
1569 ///
1570 /// # Returns
1571 ///
1572 /// A `HostTaskResult::Skipped` variant with the specified reason set.
1573 pub fn skipped_with_reason(reason: impl Into<String>) -> Self {
1574 Self::Skipped(TaskSkip::new().with_reason(reason))
1575 }
1576
1577 /// Checks if the task execution passed (completed successfully).
1578 ///
1579 /// # Returns
1580 ///
1581 /// `true` if this result represents a successful task execution (`Passed` variant),
1582 /// `false` otherwise.
1583 pub fn is_passed(&self) -> bool {
1584 matches!(self, Self::Passed(_))
1585 }
1586
1587 /// Checks if the task execution failed.
1588 ///
1589 /// # Returns
1590 ///
1591 /// `true` if this result represents a failed task execution (`Failed` variant),
1592 /// `false` otherwise.
1593 pub fn is_failed(&self) -> bool {
1594 matches!(self, Self::Failed(_))
1595 }
1596
1597 /// Checks if the task execution was skipped.
1598 ///
1599 /// # Returns
1600 ///
1601 /// `true` if this result represents a skipped task execution (`Skipped` variant),
1602 /// `false` otherwise.
1603 pub fn is_skipped(&self) -> bool {
1604 matches!(self, Self::Skipped(_))
1605 }
1606
1607 /// Retrieves the success details if the task passed.
1608 ///
1609 /// This method extracts the `TaskSuccess` from a `Passed` variant, providing
1610 /// access to execution results, changes, warnings, and other success metadata.
1611 ///
1612 /// # Returns
1613 ///
1614 /// `Some(&TaskSuccess)` if this is a `Passed` result, `None` if the task failed
1615 /// or was skipped.
1616 pub fn success(&self) -> Option<&TaskSuccess> {
1617 match self {
1618 Self::Passed(success) => Some(success),
1619 Self::Failed(_) | Self::Skipped(_) => None,
1620 }
1621 }
1622
1623 /// Retrieves the failure details if the task failed.
1624 ///
1625 /// This method extracts the `TaskFailure` from a `Failed` variant, providing
1626 /// access to error information, failure classification, and retry hints.
1627 ///
1628 /// # Returns
1629 ///
1630 /// `Some(&TaskFailure)` if this is a `Failed` result, `None` if the task passed
1631 /// or was skipped.
1632 pub fn failure(&self) -> Option<&TaskFailure> {
1633 match self {
1634 Self::Failed(failure) => Some(failure),
1635 Self::Passed(_) | Self::Skipped(_) => None,
1636 }
1637 }
1638
1639 /// Retrieves the skip details if the task was skipped.
1640 ///
1641 /// This method extracts the `TaskSkip` from a `Skipped` variant, providing
1642 /// access to the reason and message explaining why the task was not executed.
1643 ///
1644 /// # Returns
1645 ///
1646 /// `Some(&TaskSkip)` if this is a `Skipped` result, `None` if the task passed
1647 /// or failed.
1648 pub fn skipped_detail(&self) -> Option<&TaskSkip> {
1649 match self {
1650 Self::Skipped(skip) => Some(skip),
1651 Self::Passed(_) | Self::Failed(_) => None,
1652 }
1653 }
1654
1655 fn with_execution_timing(
1656 self,
1657 started_at: SystemTime,
1658 finished_at: SystemTime,
1659 duration_ns: u128,
1660 ) -> Self {
1661 match self {
1662 Self::Passed(success) => Self::Passed(
1663 success
1664 .with_started_at(started_at)
1665 .with_finished_at(finished_at)
1666 .with_duration_ns(duration_ns),
1667 ),
1668 Self::Failed(failure) => Self::Failed(
1669 failure
1670 .with_started_at(started_at)
1671 .with_finished_at(finished_at)
1672 .with_duration_ns(duration_ns),
1673 ),
1674 Self::Skipped(skip) => Self::Skipped(skip),
1675 }
1676 }
1677}
1678
1679/// Represents the successful execution of a task on a host.
1680///
1681/// `TaskSuccess` captures detailed information about a task that completed successfully,
1682/// including the execution result, whether changes were made, timing information, and any
1683/// warnings or messages generated during execution. This structure provides a comprehensive
1684/// view of what happened during task execution, even when the task succeeded.
1685///
1686/// # Fields
1687///
1688/// * `result` - The structured result data produced by the task, if any. This can contain
1689/// arbitrary JSON data representing the task's output.
1690///
1691/// * `changed` - Indicates whether the task made any changes to the target system. This is
1692/// important for idempotency tracking and reporting.
1693///
1694/// * `diff` - A textual representation of changes made, useful for showing what was modified
1695/// before and after the task execution.
1696///
1697/// * `summary` - A human-readable summary message describing what the task accomplished.
1698///
1699/// * `warnings` - A list of warning messages generated during execution. Warnings indicate
1700/// potential issues that didn't prevent success but may require attention.
1701///
1702/// * `messages` - Structured messages with levels (Info, Warning, Error, Debug) that provide
1703/// detailed execution information beyond simple warnings.
1704///
1705/// * `metadata` - Additional structured metadata about the execution, such as version information,
1706/// configuration details, or other contextual data.
1707///
1708/// * `started_at` - The timestamp when the task execution started, if available.
1709///
1710/// * `finished_at` - The timestamp when the task execution finished, if available.
1711///
1712/// * `duration_ms` - The duration of the task execution in milliseconds, if available.
1713///
1714/// # Example
1715///
1716/// ```rust
1717/// use genja_core::task::TaskSuccess;
1718/// use serde_json::json;
1719///
1720/// let success = TaskSuccess::new()
1721/// .with_result(json!({"status": "deployed"}))
1722/// .with_changed(true)
1723/// .with_summary("Configuration deployed successfully")
1724/// .with_warning("Using deprecated configuration format")
1725/// .with_diff("- old_value\n+ new_value");
1726///
1727/// assert!(success.changed());
1728/// assert_eq!(success.warnings().len(), 1);
1729/// ```
1730#[derive(Debug, Clone, Default, Serialize)]
1731pub struct TaskSuccess {
1732 result: Option<Value>,
1733 changed: bool,
1734 diff: Option<String>,
1735 summary: Option<String>,
1736 warnings: Vec<String>,
1737 messages: Vec<TaskMessage>,
1738 metadata: Option<Value>,
1739 started_at: Option<SystemTime>,
1740 finished_at: Option<SystemTime>,
1741 duration_ns: Option<u128>,
1742 duration_ms: Option<u128>,
1743}
1744
1745impl TaskSuccess {
1746 /// Creates a new `TaskSuccess` instance with default values.
1747 ///
1748 /// This constructor initializes a `TaskSuccess` with all fields set to their default values:
1749 /// no result data, no changes made, no diff, no summary, empty warnings and messages lists,
1750 /// no metadata, and no timing information.
1751 ///
1752 /// # Returns
1753 ///
1754 /// A new `TaskSuccess` instance with default values for all fields.
1755 pub fn new() -> Self {
1756 Self::default()
1757 }
1758
1759 /// Sets the structured result data produced by the task.
1760 ///
1761 /// This is a builder method that consumes `self` and returns the modified instance,
1762 /// allowing for method chaining. The result can contain arbitrary JSON data representing
1763 /// the task's output, such as configuration details, status information, or any other
1764 /// structured data relevant to the task execution.
1765 ///
1766 /// # Parameters
1767 ///
1768 /// * `result` - A `serde_json::Value` containing the structured result data.
1769 ///
1770 /// # Returns
1771 ///
1772 /// The modified `TaskSuccess` instance with the result data set.
1773 pub fn with_result(mut self, result: Value) -> Self {
1774 self.result = Some(result);
1775 self
1776 }
1777
1778 /// Sets whether the task made changes to the target system.
1779 ///
1780 /// This is a builder method that consumes `self` and returns the modified instance,
1781 /// allowing for method chaining. The changed flag is important for idempotency tracking
1782 /// and reporting, indicating whether the task modified the system state or found it
1783 /// already in the desired state.
1784 ///
1785 /// # Parameters
1786 ///
1787 /// * `changed` - `true` if the task made changes to the system, `false` if no changes
1788 /// were necessary or made.
1789 ///
1790 /// # Returns
1791 ///
1792 /// The modified `TaskSuccess` instance with the changed flag set.
1793 pub fn with_changed(mut self, changed: bool) -> Self {
1794 self.changed = changed;
1795 self
1796 }
1797
1798 /// Sets a textual representation of changes made by the task.
1799 ///
1800 /// This is a builder method that consumes `self` and returns the modified instance,
1801 /// allowing for method chaining. The diff typically shows what was modified, often
1802 /// in a before/after format, making it easy to understand what changed during execution.
1803 ///
1804 /// # Parameters
1805 ///
1806 /// * `diff` - A textual representation of the changes. Can be any type that implements
1807 /// `Into<String>`, such as `&str`, `String`, or other string-like types.
1808 ///
1809 /// # Returns
1810 ///
1811 /// The modified `TaskSuccess` instance with the diff set.
1812 pub fn with_diff(mut self, diff: impl Into<String>) -> Self {
1813 self.diff = Some(diff.into());
1814 self
1815 }
1816
1817 /// Sets a human-readable summary message describing what the task accomplished.
1818 ///
1819 /// This is a builder method that consumes `self` and returns the modified instance,
1820 /// allowing for method chaining. The summary provides a concise description of the
1821 /// task's outcome, useful for logging and reporting.
1822 ///
1823 /// # Parameters
1824 ///
1825 /// * `summary` - A human-readable summary message. Can be any type that implements
1826 /// `Into<String>`, such as `&str`, `String`, or other string-like types.
1827 ///
1828 /// # Returns
1829 ///
1830 /// The modified `TaskSuccess` instance with the summary set.
1831 pub fn with_summary(mut self, summary: impl Into<String>) -> Self {
1832 self.summary = Some(summary.into());
1833 self
1834 }
1835
1836 /// Adds a warning message to the list of warnings generated during execution.
1837 ///
1838 /// This is a builder method that consumes `self` and returns the modified instance,
1839 /// allowing for method chaining. Warnings indicate potential issues that didn't prevent
1840 /// success but may require attention. Multiple warnings can be added by calling this
1841 /// method multiple times.
1842 ///
1843 /// # Parameters
1844 ///
1845 /// * `warning` - A warning message. Can be any type that implements `Into<String>`,
1846 /// such as `&str`, `String`, or other string-like types.
1847 ///
1848 /// # Returns
1849 ///
1850 /// The modified `TaskSuccess` instance with the warning added to the warnings list.
1851 pub fn with_warning(mut self, warning: impl Into<String>) -> Self {
1852 self.warnings.push(warning.into());
1853 self
1854 }
1855
1856 /// Adds a structured message to the list of messages generated during execution.
1857 ///
1858 /// This is a builder method that consumes `self` and returns the modified instance,
1859 /// allowing for method chaining. Messages provide detailed execution information with
1860 /// associated severity levels. Multiple messages can be added by calling this method
1861 /// multiple times.
1862 ///
1863 /// # Parameters
1864 ///
1865 /// * `message` - A `TaskMessage` containing the message text, severity level, and
1866 /// optional code and timestamp.
1867 ///
1868 /// # Returns
1869 ///
1870 /// The modified `TaskSuccess` instance with the message added to the messages list.
1871 pub fn with_message(mut self, message: TaskMessage) -> Self {
1872 self.messages.push(message);
1873 self
1874 }
1875
1876 /// Sets additional structured metadata about the execution.
1877 ///
1878 /// This is a builder method that consumes `self` and returns the modified instance,
1879 /// allowing for method chaining. Metadata can contain arbitrary JSON data such as
1880 /// version information, configuration details, or other contextual data relevant to
1881 /// the task execution.
1882 ///
1883 /// # Parameters
1884 ///
1885 /// * `metadata` - A `serde_json::Value` containing the structured metadata.
1886 ///
1887 /// # Returns
1888 ///
1889 /// The modified `TaskSuccess` instance with the metadata set.
1890 pub fn with_metadata(mut self, metadata: Value) -> Self {
1891 self.metadata = Some(metadata);
1892 self
1893 }
1894
1895 /// Sets the task execution start timestamp.
1896 ///
1897 /// This is a builder method that consumes `self` and returns the modified instance,
1898 /// allowing for method chaining.
1899 ///
1900 /// # Parameters
1901 ///
1902 /// * `started_at` - The timestamp when the task execution started.
1903 ///
1904 /// # Returns
1905 ///
1906 /// The modified `TaskSuccess` instance with the start timestamp set.
1907 pub fn with_started_at(mut self, started_at: SystemTime) -> Self {
1908 self.started_at = Some(started_at);
1909 self
1910 }
1911
1912 /// Sets the task execution finish timestamp.
1913 ///
1914 /// This is a builder method that consumes `self` and returns the modified instance,
1915 /// allowing for method chaining.
1916 ///
1917 /// # Parameters
1918 ///
1919 /// * `finished_at` - The timestamp when the task execution finished.
1920 ///
1921 /// # Returns
1922 ///
1923 /// The modified `TaskSuccess` instance with the finish timestamp set.
1924 pub fn with_finished_at(mut self, finished_at: SystemTime) -> Self {
1925 self.finished_at = Some(finished_at);
1926 self
1927 }
1928
1929 /// Sets the task execution duration in milliseconds.
1930 ///
1931 /// This is a builder method that consumes `self` and returns the modified instance,
1932 /// allowing for method chaining.
1933 ///
1934 /// # Parameters
1935 ///
1936 /// * `duration_ms` - The duration of the task execution in milliseconds.
1937 ///
1938 /// # Returns
1939 ///
1940 /// The modified `TaskSuccess` instance with the duration set.
1941 pub fn with_duration_ms(mut self, duration_ms: u128) -> Self {
1942 self.duration_ns = Some(duration_ms.saturating_mul(1_000_000));
1943 self.duration_ms = Some(duration_ms);
1944 self
1945 }
1946
1947 /// Sets the task execution duration in nanoseconds.
1948 pub fn with_duration_ns(mut self, duration_ns: u128) -> Self {
1949 self.duration_ns = Some(duration_ns);
1950 self.duration_ms = Some(duration_ns / 1_000_000);
1951 self
1952 }
1953
1954 /// Returns the structured result data produced by the task, if available.
1955 ///
1956 /// # Returns
1957 ///
1958 /// `Some(&Value)` if result data was set, `None` otherwise.
1959 pub fn result(&self) -> Option<&Value> {
1960 self.result.as_ref()
1961 }
1962
1963 /// Returns whether the task made changes to the target system.
1964 ///
1965 /// # Returns
1966 ///
1967 /// `true` if the task made changes, `false` if no changes were made.
1968 pub fn changed(&self) -> bool {
1969 self.changed
1970 }
1971
1972 /// Returns the textual representation of changes made, if available.
1973 ///
1974 /// # Returns
1975 ///
1976 /// `Some(&str)` if a diff was set, `None` otherwise.
1977 pub fn diff(&self) -> Option<&str> {
1978 self.diff.as_deref()
1979 }
1980
1981 /// Returns the task summary message, if available.
1982 ///
1983 /// # Returns
1984 ///
1985 /// `Some(&str)` if a summary was set, `None` otherwise.
1986 pub fn summary(&self) -> Option<&str> {
1987 self.summary.as_deref()
1988 }
1989
1990 /// Returns a slice of all warning messages generated during execution.
1991 ///
1992 /// # Returns
1993 ///
1994 /// A slice containing all warning messages. Returns an empty slice if no warnings
1995 /// were generated.
1996 pub fn warnings(&self) -> &[String] {
1997 &self.warnings
1998 }
1999
2000 /// Returns a slice of all structured messages generated during execution.
2001 ///
2002 /// # Returns
2003 ///
2004 /// A slice containing all `TaskMessage` instances. Returns an empty slice if no
2005 /// messages were generated.
2006 pub fn messages(&self) -> &[TaskMessage] {
2007 &self.messages
2008 }
2009
2010 /// Returns the additional structured metadata, if available.
2011 ///
2012 /// # Returns
2013 ///
2014 /// `Some(&Value)` if metadata was set, `None` otherwise.
2015 pub fn metadata(&self) -> Option<&Value> {
2016 self.metadata.as_ref()
2017 }
2018
2019 /// Returns the task execution start timestamp, if available.
2020 ///
2021 /// # Returns
2022 ///
2023 /// `Some(SystemTime)` if the start timestamp was set, `None` otherwise.
2024 pub fn started_at(&self) -> Option<SystemTime> {
2025 self.started_at
2026 }
2027
2028 /// Returns the task execution finish timestamp, if available.
2029 ///
2030 /// # Returns
2031 ///
2032 /// `Some(SystemTime)` if the finish timestamp was set, `None` otherwise.
2033 pub fn finished_at(&self) -> Option<SystemTime> {
2034 self.finished_at
2035 }
2036
2037 /// Returns the task execution duration in milliseconds, if available.
2038 ///
2039 /// # Returns
2040 ///
2041 /// `Some(u128)` if the duration was set, `None` otherwise.
2042 pub fn duration_ms(&self) -> Option<u128> {
2043 self.duration_ns
2044 .map(|duration_ns| duration_ns / 1_000_000)
2045 .or(self.duration_ms)
2046 }
2047
2048 /// Returns the task execution duration in nanoseconds, if available.
2049 pub fn duration_ns(&self) -> Option<u128> {
2050 self.duration_ns.or_else(|| {
2051 self.duration_ms
2052 .map(|duration_ms| duration_ms.saturating_mul(1_000_000))
2053 })
2054 }
2055
2056 /// Returns the task execution start timestamp in RFC 3339 format, if available.
2057 pub fn started_at_display(&self) -> Option<String> {
2058 self.started_at.map(format_timestamp_display)
2059 }
2060
2061 /// Returns the task execution finish timestamp in RFC 3339 format, if available.
2062 pub fn finished_at_display(&self) -> Option<String> {
2063 self.finished_at.map(format_timestamp_display)
2064 }
2065
2066 /// Returns the task execution duration in a human-readable format, if available.
2067 pub fn duration_display(&self) -> Option<String> {
2068 self.duration_ns().map(format_duration_display)
2069 }
2070}
2071
2072/// Represents a failed task execution with comprehensive error information and context.
2073///
2074/// `TaskFailure` captures detailed information about why a task failed, including the underlying
2075/// error, failure classification, retry hints, and any warnings or messages collected during
2076/// execution before the failure occurred. This structure provides rich context for error handling,
2077/// logging, and determining whether a failed task should be retried.
2078///
2079/// The failure information includes timing data, structured details about the error, and the
2080/// ability to downcast to specific error types for specialized error handling.
2081///
2082/// # Fields
2083///
2084/// * `error` - The underlying error that caused the task to fail. This is a thread-safe,
2085/// reference-counted error that can be downcast to specific error types. Not serialized.
2086///
2087/// * `kind` - The classification of the failure (e.g., Connection, Authentication, Timeout).
2088/// This helps categorize errors for reporting and handling purposes.
2089///
2090/// * `error_type` - A string representation of the error's type name, useful for debugging
2091/// and logging when the actual error type information is needed.
2092///
2093/// * `message` - A human-readable error message describing what went wrong. This is typically
2094/// derived from the error's `Display` implementation.
2095///
2096/// * `retryable` - Indicates whether this failure is potentially transient and the task could
2097/// succeed if retried. This helps automation systems decide retry strategies.
2098///
2099/// * `details` - Optional structured data providing additional context about the failure,
2100/// such as error codes, affected resources, or diagnostic information.
2101///
2102/// * `warnings` - A list of warning messages that were generated during execution before the
2103/// failure occurred. These can provide context about what led to the failure.
2104///
2105/// * `messages` - Structured messages with levels (Info, Warning, Error, Debug) that were
2106/// collected during execution, providing a detailed execution trace up to the point of failure.
2107///
2108/// * `started_at` - The timestamp when the task execution started, if available.
2109///
2110/// * `finished_at` - The timestamp when the task execution failed, if available.
2111///
2112/// * `duration_ms` - The duration of the task execution in milliseconds before it failed,
2113/// if available.
2114///
2115/// # Example
2116///
2117/// ```rust
2118/// use genja_core::task::{TaskFailure, TaskFailureKind, TaskMessage, MessageLevel};
2119/// use serde_json::json;
2120/// use std::io;
2121///
2122/// let failure = TaskFailure::new(io::Error::new(io::ErrorKind::TimedOut, "connection timeout"))
2123/// .with_kind(TaskFailureKind::Timeout)
2124/// .with_retryable(true)
2125/// .with_details(json!({"timeout_seconds": 30}))
2126/// .with_warning("Slow network detected")
2127/// .with_message(TaskMessage::new(MessageLevel::Error, "Failed to connect to host"));
2128///
2129/// assert!(failure.retryable());
2130/// assert!(matches!(failure.kind(), TaskFailureKind::Timeout));
2131/// assert_eq!(failure.warnings().len(), 1);
2132/// ```
2133#[derive(Debug, Clone, Serialize)]
2134pub struct TaskFailure {
2135 #[serde(skip)]
2136 error: TaskError,
2137 #[serde(skip)]
2138 source: Option<TaskFailureSource>,
2139 kind: TaskFailureKind,
2140 error_type: String,
2141 message: String,
2142 retryable: bool,
2143 details: Option<Value>,
2144 warnings: Vec<String>,
2145 messages: Vec<TaskMessage>,
2146 started_at: Option<SystemTime>,
2147 finished_at: Option<SystemTime>,
2148 duration_ns: Option<u128>,
2149 duration_ms: Option<u128>,
2150}
2151
2152impl TaskFailure {
2153 /// Creates a new `TaskFailure` instance from an error.
2154 ///
2155 /// This constructor wraps any error type that implements the standard `Error` trait
2156 /// in a `TaskFailure`, capturing the error message and type information. The failure
2157 /// is initialized with default values: classified as `Internal`, not retryable, with
2158 /// no additional details, warnings, or messages, and no timing information.
2159 ///
2160 /// The error is stored as a thread-safe, reference-counted pointer (`Arc<dyn Error>`),
2161 /// allowing it to be cloned and shared across threads while preserving the ability
2162 /// to downcast to the original error type for specialized error handling.
2163 ///
2164 /// # Parameters
2165 ///
2166 /// * `error` - The error that caused the task to fail. Must implement `Error + Send + Sync + 'static`,
2167 /// ensuring it can be safely shared across threads and stored for the lifetime of the program.
2168 ///
2169 /// # Returns
2170 ///
2171 /// A new `TaskFailure` instance with the error wrapped and default values for all other fields.
2172 /// The failure kind is set to `Internal`, retryable is `false`, and all optional fields are `None`.
2173 pub fn new<E>(error: E) -> Self
2174 where
2175 E: Error + Send + Sync + 'static,
2176 {
2177 let source = Arc::new(error);
2178 let message = source.to_string();
2179 let error_type = type_name::<E>().to_string();
2180
2181 Self {
2182 kind: TaskFailureKind::Internal,
2183 error_type: error_type.clone(),
2184 message,
2185 error: TaskError {
2186 error: source.clone(),
2187 error_type,
2188 source: Some(source.clone()),
2189 },
2190 source: Some(source),
2191 retryable: false,
2192 details: None,
2193 warnings: Vec::new(),
2194 messages: Vec::new(),
2195 started_at: None,
2196 finished_at: None,
2197 duration_ns: None,
2198 duration_ms: None,
2199 }
2200 }
2201
2202 /// Creates a new `TaskFailure` from any thread-safe `'static` payload.
2203 ///
2204 /// This is useful when the failure value does not implement [`Error`] but
2205 /// still carries meaningful type and display information that should be
2206 /// stored with the task result.
2207 pub fn capture<E>(error: E) -> Self
2208 where
2209 E: fmt::Debug + fmt::Display + Send + Sync + 'static,
2210 {
2211 let source = Arc::new(error);
2212 let message = source.to_string();
2213 let error_type = type_name::<E>().to_string();
2214 let wrapped_error = Arc::new(CapturedTaskFailure {
2215 message: message.clone(),
2216 });
2217
2218 Self {
2219 kind: TaskFailureKind::Internal,
2220 error_type: error_type.clone(),
2221 message,
2222 error: TaskError {
2223 error: wrapped_error,
2224 error_type,
2225 source: Some(source.clone()),
2226 },
2227 source: Some(source),
2228 retryable: false,
2229 details: None,
2230 warnings: Vec::new(),
2231 messages: Vec::new(),
2232 started_at: None,
2233 finished_at: None,
2234 duration_ns: None,
2235 duration_ms: None,
2236 }
2237 }
2238
2239 /// Creates a new `TaskFailure` from an already-erased task error.
2240 ///
2241 /// Failures captured through the task execution boundary default to
2242 /// [`TaskFailureKind::External`], because the error originated outside the
2243 /// Genja framework itself.
2244 pub fn from_task_error(error: TaskError) -> Self {
2245 Self {
2246 kind: TaskFailureKind::External,
2247 error_type: error.error_type().to_string(),
2248 message: error.to_string(),
2249 error: error.clone(),
2250 source: error.source,
2251 retryable: false,
2252 details: None,
2253 warnings: Vec::new(),
2254 messages: Vec::new(),
2255 started_at: None,
2256 finished_at: None,
2257 duration_ns: None,
2258 duration_ms: None,
2259 }
2260 }
2261
2262 /// Sets the failure classification kind.
2263 ///
2264 /// This is a builder method that consumes `self` and returns the modified instance,
2265 /// allowing for method chaining. The kind categorizes the failure type (e.g., Connection,
2266 /// Authentication, Timeout) to help with error handling and reporting.
2267 ///
2268 /// # Parameters
2269 ///
2270 /// * `kind` - The `TaskFailureKind` classification for this failure.
2271 ///
2272 /// # Returns
2273 ///
2274 /// The modified `TaskFailure` instance with the failure kind set.
2275 pub fn with_kind(mut self, kind: TaskFailureKind) -> Self {
2276 self.kind = kind;
2277 self
2278 }
2279
2280 /// Sets whether this failure is retryable.
2281 ///
2282 /// This is a builder method that consumes `self` and returns the modified instance,
2283 /// allowing for method chaining. The retryable flag indicates whether the failure
2284 /// is potentially transient and the task could succeed if retried.
2285 ///
2286 /// # Parameters
2287 ///
2288 /// * `retryable` - `true` if the task should be retried after this failure, `false` if
2289 /// the failure is permanent and retrying would not help.
2290 ///
2291 /// # Returns
2292 ///
2293 /// The modified `TaskFailure` instance with the retryable flag set.
2294 pub fn with_retryable(mut self, retryable: bool) -> Self {
2295 self.retryable = retryable;
2296 self
2297 }
2298
2299 /// Sets additional structured details about the failure.
2300 ///
2301 /// This is a builder method that consumes `self` and returns the modified instance,
2302 /// allowing for method chaining. The details can contain arbitrary JSON data providing
2303 /// additional context about the failure, such as error codes, affected resources, or
2304 /// diagnostic information.
2305 ///
2306 /// # Parameters
2307 ///
2308 /// * `details` - A `serde_json::Value` containing the structured failure details.
2309 ///
2310 /// # Returns
2311 ///
2312 /// The modified `TaskFailure` instance with the details set.
2313 pub fn with_details(mut self, details: Value) -> Self {
2314 self.details = Some(details);
2315 self
2316 }
2317
2318 /// Adds a warning message to the list of warnings generated before the failure.
2319 ///
2320 /// This is a builder method that consumes `self` and returns the modified instance,
2321 /// allowing for method chaining. Warnings provide context about what occurred during
2322 /// execution before the failure happened. Multiple warnings can be added by calling
2323 /// this method multiple times.
2324 ///
2325 /// # Parameters
2326 ///
2327 /// * `warning` - A warning message. Can be any type that implements `Into<String>`,
2328 /// such as `&str`, `String`, or other string-like types.
2329 ///
2330 /// # Returns
2331 ///
2332 /// The modified `TaskFailure` instance with the warning added to the warnings list.
2333 pub fn with_warning(mut self, warning: impl Into<String>) -> Self {
2334 self.warnings.push(warning.into());
2335 self
2336 }
2337
2338 /// Adds a structured message to the list of messages generated before the failure.
2339 ///
2340 /// This is a builder method that consumes `self` and returns the modified instance,
2341 /// allowing for method chaining. Messages provide detailed execution information with
2342 /// associated severity levels that were collected before the failure occurred. Multiple
2343 /// messages can be added by calling this method multiple times.
2344 ///
2345 /// # Parameters
2346 ///
2347 /// * `message` - A `TaskMessage` containing the message text, severity level, and
2348 /// optional code and timestamp.
2349 ///
2350 /// # Returns
2351 ///
2352 /// The modified `TaskFailure` instance with the message added to the messages list.
2353 pub fn with_message(mut self, message: TaskMessage) -> Self {
2354 self.messages.push(message);
2355 self
2356 }
2357
2358 /// Sets the task execution start timestamp.
2359 ///
2360 /// This is a builder method that consumes `self` and returns the modified instance,
2361 /// allowing for method chaining.
2362 ///
2363 /// # Parameters
2364 ///
2365 /// * `started_at` - The timestamp when the task execution started.
2366 ///
2367 /// # Returns
2368 ///
2369 /// The modified `TaskFailure` instance with the start timestamp set.
2370 pub fn with_started_at(mut self, started_at: SystemTime) -> Self {
2371 self.started_at = Some(started_at);
2372 self
2373 }
2374
2375 /// Sets the task execution finish timestamp.
2376 ///
2377 /// This is a builder method that consumes `self` and returns the modified instance,
2378 /// allowing for method chaining.
2379 ///
2380 /// # Parameters
2381 ///
2382 /// * `finished_at` - The timestamp when the task execution failed.
2383 ///
2384 /// # Returns
2385 ///
2386 /// The modified `TaskFailure` instance with the finish timestamp set.
2387 pub fn with_finished_at(mut self, finished_at: SystemTime) -> Self {
2388 self.finished_at = Some(finished_at);
2389 self
2390 }
2391
2392 /// Sets the task execution duration in milliseconds.
2393 ///
2394 /// This is a builder method that consumes `self` and returns the modified instance,
2395 /// allowing for method chaining.
2396 ///
2397 /// # Parameters
2398 ///
2399 /// * `duration_ms` - The duration of the task execution in milliseconds before it failed.
2400 ///
2401 /// # Returns
2402 ///
2403 /// The modified `TaskFailure` instance with the duration set.
2404 pub fn with_duration_ms(mut self, duration_ms: u128) -> Self {
2405 self.duration_ns = Some(duration_ms.saturating_mul(1_000_000));
2406 self.duration_ms = Some(duration_ms);
2407 self
2408 }
2409
2410 /// Sets the task execution duration in nanoseconds.
2411 pub fn with_duration_ns(mut self, duration_ns: u128) -> Self {
2412 self.duration_ns = Some(duration_ns);
2413 self.duration_ms = Some(duration_ns / 1_000_000);
2414 self
2415 }
2416
2417 /// Attempts to downcast the underlying error to a specific error type.
2418 ///
2419 /// This method provides type-safe access to the original error type, allowing
2420 /// specialized error handling based on the concrete error type. If the underlying
2421 /// error is of type `E`, this returns a reference to it; otherwise, it returns `None`.
2422 ///
2423 /// # Type Parameters
2424 ///
2425 /// * `E` - The concrete error type to downcast to. Must implement `Error + 'static`.
2426 ///
2427 /// # Returns
2428 ///
2429 /// `Some(&E)` if the underlying error is of type `E`, `None` otherwise.
2430 pub fn downcast_ref<E>(&self) -> Option<&E>
2431 where
2432 E: 'static,
2433 {
2434 self.source
2435 .as_ref()
2436 .and_then(|source| source.downcast_ref::<E>())
2437 }
2438
2439 /// Returns a reference to the underlying error as a trait object.
2440 ///
2441 /// This method provides access to the error through the `Error` trait interface,
2442 /// allowing generic error handling without knowing the concrete error type.
2443 ///
2444 /// # Returns
2445 ///
2446 /// A reference to the underlying error as a trait object implementing
2447 /// `Error + Send + Sync + 'static`.
2448 pub fn error(&self) -> &(dyn Error + Send + Sync + 'static) {
2449 self.error.error()
2450 }
2451
2452 /// Returns the type name of the underlying error.
2453 ///
2454 /// This method provides the fully qualified type name of the error as a string,
2455 /// which is useful for debugging and logging when you need to know the concrete
2456 /// error type without downcasting.
2457 ///
2458 /// # Returns
2459 ///
2460 /// A string slice containing the fully qualified type name of the error.
2461 pub fn error_type(&self) -> &str {
2462 &self.error_type
2463 }
2464
2465 /// Returns the human-readable error message.
2466 ///
2467 /// This message is derived from the error's `Display` implementation and provides
2468 /// a description of what went wrong during task execution.
2469 ///
2470 /// # Returns
2471 ///
2472 /// A string slice containing the error message.
2473 pub fn message(&self) -> &str {
2474 &self.message
2475 }
2476
2477 /// Returns the failure classification kind.
2478 ///
2479 /// # Returns
2480 ///
2481 /// A reference to the `TaskFailureKind` indicating the category of this failure.
2482 pub fn kind(&self) -> &TaskFailureKind {
2483 &self.kind
2484 }
2485
2486 /// Returns whether this failure is retryable.
2487 ///
2488 /// # Returns
2489 ///
2490 /// `true` if the task should be retried after this failure, `false` if the failure
2491 /// is permanent and retrying would not help.
2492 pub fn retryable(&self) -> bool {
2493 self.retryable
2494 }
2495
2496 /// Returns the additional structured details about the failure, if available.
2497 ///
2498 /// # Returns
2499 ///
2500 /// `Some(&Value)` if failure details were set, `None` otherwise.
2501 pub fn details(&self) -> Option<&Value> {
2502 self.details.as_ref()
2503 }
2504
2505 /// Returns a slice of all warning messages generated before the failure.
2506 ///
2507 /// # Returns
2508 ///
2509 /// A slice containing all warning messages. Returns an empty slice if no warnings
2510 /// were generated.
2511 pub fn warnings(&self) -> &[String] {
2512 &self.warnings
2513 }
2514
2515 /// Returns a slice of all structured messages generated before the failure.
2516 ///
2517 /// # Returns
2518 ///
2519 /// A slice containing all `TaskMessage` instances. Returns an empty slice if no
2520 /// messages were generated.
2521 pub fn messages(&self) -> &[TaskMessage] {
2522 &self.messages
2523 }
2524
2525 /// Returns the task execution start timestamp, if available.
2526 ///
2527 /// # Returns
2528 ///
2529 /// `Some(SystemTime)` if the start timestamp was set, `None` otherwise.
2530 pub fn started_at(&self) -> Option<SystemTime> {
2531 self.started_at
2532 }
2533
2534 /// Returns the task execution finish timestamp, if available.
2535 ///
2536 /// # Returns
2537 ///
2538 /// `Some(SystemTime)` if the finish timestamp was set, `None` otherwise.
2539 pub fn finished_at(&self) -> Option<SystemTime> {
2540 self.finished_at
2541 }
2542
2543 /// Returns the task execution duration in milliseconds, if available.
2544 ///
2545 /// # Returns
2546 ///
2547 /// `Some(u128)` if the duration was set, `None` otherwise.
2548 pub fn duration_ms(&self) -> Option<u128> {
2549 self.duration_ns
2550 .map(|duration_ns| duration_ns / 1_000_000)
2551 .or(self.duration_ms)
2552 }
2553
2554 /// Returns the task execution duration in nanoseconds, if available.
2555 pub fn duration_ns(&self) -> Option<u128> {
2556 self.duration_ns.or_else(|| {
2557 self.duration_ms
2558 .map(|duration_ms| duration_ms.saturating_mul(1_000_000))
2559 })
2560 }
2561
2562 /// Returns the task execution start timestamp in RFC 3339 format, if available.
2563 pub fn started_at_display(&self) -> Option<String> {
2564 self.started_at.map(format_timestamp_display)
2565 }
2566
2567 /// Returns the task execution finish timestamp in RFC 3339 format, if available.
2568 pub fn finished_at_display(&self) -> Option<String> {
2569 self.finished_at.map(format_timestamp_display)
2570 }
2571
2572 /// Returns the task execution duration in a human-readable format, if available.
2573 pub fn duration_display(&self) -> Option<String> {
2574 self.duration_ns().map(format_duration_display)
2575 }
2576}
2577
2578/// Represents information about a skipped task execution.
2579///
2580/// `TaskSkip` captures details about why a task was not executed on a host.
2581/// Tasks can be skipped for various reasons, such as conditional logic (when clauses),
2582/// failed dependencies, maintenance mode, or other runtime conditions that prevent
2583/// execution. This structure provides both a machine-readable reason and a human-readable
2584/// message to explain the skip.
2585///
2586/// # Fields
2587///
2588/// * `reason` - An optional machine-readable reason code or identifier explaining why
2589/// the task was skipped (e.g., "parent_failed", "condition_not_met", "maintenance_mode").
2590///
2591/// * `message` - An optional human-readable message providing additional context about
2592/// why the task was skipped.
2593///
2594/// # Example
2595///
2596/// ```rust
2597/// use genja_core::task::TaskSkip;
2598///
2599/// let skip = TaskSkip::new()
2600/// .with_reason("condition_not_met")
2601/// .with_message("Host is not in the target environment");
2602///
2603/// assert_eq!(skip.reason(), Some("condition_not_met"));
2604/// assert_eq!(skip.message(), Some("Host is not in the target environment"));
2605/// ```
2606#[derive(Debug, Clone, Default, Serialize)]
2607pub struct TaskSkip {
2608 reason: Option<String>,
2609 message: Option<String>,
2610}
2611
2612impl TaskSkip {
2613 /// Creates a new `TaskSkip` instance with default values.
2614 ///
2615 /// This constructor initializes a `TaskSkip` with no reason or message set.
2616 /// Both fields will be `None` until explicitly set using the builder methods.
2617 ///
2618 /// # Returns
2619 ///
2620 /// A new `TaskSkip` instance with default values (no reason or message).
2621 pub fn new() -> Self {
2622 Self::default()
2623 }
2624
2625 /// Sets the machine-readable reason code explaining why the task was skipped.
2626 ///
2627 /// This is a builder method that consumes `self` and returns the modified instance,
2628 /// allowing for method chaining. The reason should be a concise identifier that
2629 /// can be used programmatically to categorize or filter skipped tasks.
2630 ///
2631 /// # Parameters
2632 ///
2633 /// * `reason` - A machine-readable reason code or identifier. Can be any type that
2634 /// implements `Into<String>`, such as `&str`, `String`, or other string-like types.
2635 /// Common examples include "condition_not_met", "parent_failed", or "maintenance_mode".
2636 ///
2637 /// # Returns
2638 ///
2639 /// The modified `TaskSkip` instance with the reason set.
2640 pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
2641 self.reason = Some(reason.into());
2642 self
2643 }
2644
2645 /// Sets a human-readable message providing additional context about why the task was skipped.
2646 ///
2647 /// This is a builder method that consumes `self` and returns the modified instance,
2648 /// allowing for method chaining. The message should provide clear, user-friendly
2649 /// information about why the task was not executed.
2650 ///
2651 /// # Parameters
2652 ///
2653 /// * `message` - A human-readable explanation message. Can be any type that implements
2654 /// `Into<String>`, such as `&str`, `String`, or other string-like types.
2655 ///
2656 /// # Returns
2657 ///
2658 /// The modified `TaskSkip` instance with the message set.
2659 pub fn with_message(mut self, message: impl Into<String>) -> Self {
2660 self.message = Some(message.into());
2661 self
2662 }
2663
2664 /// Returns the machine-readable reason code, if available.
2665 ///
2666 /// # Returns
2667 ///
2668 /// `Some(&str)` if a reason was set, `None` otherwise.
2669 pub fn reason(&self) -> Option<&str> {
2670 self.reason.as_deref()
2671 }
2672
2673 /// Returns the human-readable message, if available.
2674 ///
2675 /// # Returns
2676 ///
2677 /// `Some(&str)` if a message was set, `None` otherwise.
2678 pub fn message(&self) -> Option<&str> {
2679 self.message.as_deref()
2680 }
2681}
2682
2683/// Represents a structured message generated during task execution.
2684///
2685/// `TaskMessage` captures detailed execution information with an associated severity level,
2686/// allowing tasks to emit structured logs, warnings, errors, and debug information during
2687/// execution. These messages provide a detailed execution trace that can be used for
2688/// debugging, auditing, and understanding task behavior beyond simple success or failure.
2689///
2690/// Messages can optionally include a machine-readable code for categorization and a
2691/// timestamp indicating when the message was generated.
2692///
2693/// # Fields
2694///
2695/// * `level` - The severity level of the message (Info, Warning, Error, Debug).
2696///
2697/// * `text` - The human-readable message text describing what occurred.
2698///
2699/// * `code` - An optional machine-readable code or identifier for categorizing or
2700/// filtering messages (e.g., "CONFIG_001", "WARN_DEPRECATED").
2701///
2702/// * `timestamp` - An optional timestamp indicating when the message was generated.
2703///
2704/// # Example
2705///
2706/// ```rust
2707/// use genja_core::task::{TaskMessage, MessageLevel};
2708/// use std::time::SystemTime;
2709///
2710/// let message = TaskMessage::new(MessageLevel::Warning, "Using deprecated API")
2711/// .with_code("WARN_DEPRECATED")
2712/// .with_timestamp(SystemTime::now());
2713///
2714/// assert_eq!(message.text(), "Using deprecated API");
2715/// assert_eq!(message.code(), Some("WARN_DEPRECATED"));
2716/// ```
2717#[derive(Debug, Clone, Serialize)]
2718pub struct TaskMessage {
2719 level: MessageLevel,
2720 text: String,
2721 code: Option<String>,
2722 timestamp: Option<SystemTime>,
2723}
2724
2725impl TaskMessage {
2726 /// Creates a new `TaskMessage` with the specified severity level and message text.
2727 ///
2728 /// This constructor initializes a `TaskMessage` with the provided level and text,
2729 /// with no code or timestamp set. Additional metadata can be added using the
2730 /// builder methods `with_code()` and `with_timestamp()`.
2731 ///
2732 /// # Parameters
2733 ///
2734 /// * `level` - The severity level of the message (Info, Warning, Error, or Debug).
2735 /// * `text` - The human-readable message text. Can be any type that implements
2736 /// `Into<String>`, such as `&str`, `String`, or other string-like types.
2737 ///
2738 /// # Returns
2739 ///
2740 /// A new `TaskMessage` instance with the specified level and text, and no code
2741 /// or timestamp set.
2742 pub fn new(level: MessageLevel, text: impl Into<String>) -> Self {
2743 Self {
2744 level,
2745 text: text.into(),
2746 code: None,
2747 timestamp: None,
2748 }
2749 }
2750
2751 /// Sets a machine-readable code for categorizing or filtering the message.
2752 ///
2753 /// This is a builder method that consumes `self` and returns the modified instance,
2754 /// allowing for method chaining. The code can be used to programmatically identify
2755 /// specific types of messages or group related messages together.
2756 ///
2757 /// # Parameters
2758 ///
2759 /// * `code` - A machine-readable code or identifier. Can be any type that implements
2760 /// `Into<String>`, such as `&str`, `String`, or other string-like types. Common
2761 /// examples include "CONFIG_001", "WARN_DEPRECATED", or "ERR_TIMEOUT".
2762 ///
2763 /// # Returns
2764 ///
2765 /// The modified `TaskMessage` instance with the code set.
2766 pub fn with_code(mut self, code: impl Into<String>) -> Self {
2767 self.code = Some(code.into());
2768 self
2769 }
2770
2771 /// Sets the timestamp indicating when the message was generated.
2772 ///
2773 /// This is a builder method that consumes `self` and returns the modified instance,
2774 /// allowing for method chaining. The timestamp helps track when events occurred
2775 /// during task execution.
2776 ///
2777 /// # Parameters
2778 ///
2779 /// * `timestamp` - The timestamp when the message was generated.
2780 ///
2781 /// # Returns
2782 ///
2783 /// The modified `TaskMessage` instance with the timestamp set.
2784 pub fn with_timestamp(mut self, timestamp: SystemTime) -> Self {
2785 self.timestamp = Some(timestamp);
2786 self
2787 }
2788
2789 /// Returns the severity level of the message.
2790 ///
2791 /// # Returns
2792 ///
2793 /// A reference to the `MessageLevel` indicating the severity of this message.
2794 pub fn level(&self) -> &MessageLevel {
2795 &self.level
2796 }
2797
2798 /// Returns the human-readable message text.
2799 ///
2800 /// # Returns
2801 ///
2802 /// A string slice containing the message text.
2803 pub fn text(&self) -> &str {
2804 &self.text
2805 }
2806
2807 /// Returns the machine-readable code, if available.
2808 ///
2809 /// # Returns
2810 ///
2811 /// `Some(&str)` if a code was set, `None` otherwise.
2812 pub fn code(&self) -> Option<&str> {
2813 self.code.as_deref()
2814 }
2815
2816 /// Returns the timestamp when the message was generated, if available.
2817 ///
2818 /// # Returns
2819 ///
2820 /// `Some(SystemTime)` if a timestamp was set, `None` otherwise.
2821 pub fn timestamp(&self) -> Option<SystemTime> {
2822 self.timestamp
2823 }
2824}
2825
2826/// Represents the severity level of a task message.
2827///
2828/// `MessageLevel` categorizes messages generated during task execution by their importance
2829/// and purpose. This allows consumers to filter, route, or display messages appropriately
2830/// based on their severity. The levels follow common logging conventions, from informational
2831/// messages to debug output.
2832///
2833/// # Variants
2834///
2835/// * `Info` - Informational messages that describe normal task execution progress or state.
2836/// These messages provide context about what the task is doing but don't indicate any issues.
2837///
2838/// * `Warning` - Warning messages that indicate potential issues or non-ideal conditions that
2839/// don't prevent task success. These should be reviewed but don't require immediate action.
2840///
2841/// * `Error` - Error messages that indicate serious problems, typically associated with task
2842/// failures. These messages describe what went wrong during execution.
2843///
2844/// * `Debug` - Debug messages that provide detailed technical information useful for
2845/// troubleshooting and development. These are typically more verbose and technical than
2846/// other message types.
2847///
2848/// # Example
2849///
2850/// ```rust
2851/// use genja_core::task::{TaskMessage, MessageLevel};
2852///
2853/// let info = TaskMessage::new(MessageLevel::Info, "Starting configuration deployment");
2854/// let warning = TaskMessage::new(MessageLevel::Warning, "Using deprecated API endpoint");
2855/// let error = TaskMessage::new(MessageLevel::Error, "Failed to connect to device");
2856/// let debug = TaskMessage::new(MessageLevel::Debug, "Raw response: {...}");
2857/// ```
2858#[derive(Debug, Clone, Serialize)]
2859pub enum MessageLevel {
2860 Info,
2861 Warning,
2862 Error,
2863 Debug,
2864}
2865
2866/// Categorizes the type of failure that occurred during task execution.
2867///
2868/// `TaskFailureKind` provides a classification system for task failures, allowing
2869/// error handling logic to distinguish between different categories of errors and
2870/// respond appropriately. This classification helps with error reporting, retry
2871/// logic, and determining whether failures are transient or permanent.
2872///
2873/// # Variants
2874///
2875/// * `Connection` - The task failed due to a connection error, such as network
2876/// unreachability, connection refused, or connection dropped. These failures
2877/// are often transient and may succeed on retry.
2878///
2879/// * `Authentication` - The task failed due to authentication or authorization
2880/// issues, such as invalid credentials, expired tokens, or insufficient
2881/// permissions. These typically require credential updates or permission changes.
2882///
2883/// * `Validation` - The task failed due to validation errors in input data,
2884/// configuration, or parameters. This indicates that the task cannot proceed
2885/// with the provided data and requires correction.
2886///
2887/// * `Timeout` - The task failed because it exceeded a time limit. This could
2888/// indicate slow network conditions, an overloaded target system, or an
2889/// operation that takes longer than expected. Often retryable.
2890///
2891/// * `Command` - The task failed during command execution on the target system,
2892/// such as a command returning a non-zero exit code or producing unexpected
2893/// output. This indicates the operation itself failed on the remote system.
2894///
2895/// * `Unsupported` - The task failed because the requested operation is not
2896/// supported by the target system, plugin, or current configuration. This
2897/// typically indicates a permanent failure that won't succeed on retry.
2898///
2899/// * `Internal` - The task failed due to a Genja/framework internal error, such
2900/// as a programming error, resource exhaustion, or unexpected engine state.
2901///
2902/// * `External` - The task failed because a task implementation, plugin, or
2903/// external dependency returned an error that Genja captured and stored as a
2904/// host failure.
2905///
2906/// # Example
2907///
2908/// ```rust
2909/// use genja_core::task::{TaskFailure, TaskFailureKind};
2910/// use std::io;
2911///
2912/// let connection_failure = TaskFailure::new(
2913/// io::Error::new(io::ErrorKind::ConnectionRefused, "connection refused")
2914/// )
2915/// .with_kind(TaskFailureKind::Connection)
2916/// .with_retryable(true);
2917///
2918/// let auth_failure = TaskFailure::new(
2919/// io::Error::new(io::ErrorKind::PermissionDenied, "invalid credentials")
2920/// )
2921/// .with_kind(TaskFailureKind::Authentication)
2922/// .with_retryable(false);
2923///
2924/// assert!(matches!(connection_failure.kind(), TaskFailureKind::Connection));
2925/// assert!(matches!(auth_failure.kind(), TaskFailureKind::Authentication));
2926/// ```
2927#[derive(Debug, Clone, Serialize)]
2928pub enum TaskFailureKind {
2929 Connection,
2930 Authentication,
2931 Validation,
2932 Timeout,
2933 Command,
2934 Unsupported,
2935 Internal,
2936 External,
2937}
2938
2939/// Task metadata required for execution.
2940///
2941/// Task authoring macros such as `#[genja_task(...)]` implement this trait
2942/// automatically. You only need to implement it manually when you are building
2943/// a `Task` implementation without the macro.
2944pub trait TaskInfo {
2945 /// Return the task's name.
2946 fn name(&self) -> &str;
2947
2948 /// Return the task's connection plugin name, if the task needs a connection.
2949 fn connection_plugin_name(&self) -> Option<&str> {
2950 None
2951 }
2952
2953 /// Build the task's connection key for a host, if the task needs a connection.
2954 fn get_connection_key(&self, hostname: &str) -> Option<crate::inventory::ConnectionKey> {
2955 self.connection_plugin_name()
2956 .map(|plugin_name| crate::inventory::ConnectionKey::new(hostname, plugin_name))
2957 }
2958
2959 /// Return the task's options payload, if set.
2960 fn options(&self) -> Option<&Value> {
2961 None
2962 }
2963
2964 /// Return processor plugin names selected for this task.
2965 fn processor_names(&self) -> Vec<&str> {
2966 Vec::new()
2967 }
2968}
2969
2970/// Sub-task provider interface.
2971#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2972pub enum TaskExecutionMode {
2973 Blocking,
2974 Async,
2975}
2976
2977/// Core task interface required for execution.
2978///
2979/// # Example
2980/// ```rust
2981/// use genja_core::genja_task;
2982/// use genja_core::task::{Task, TaskRuntimeContext};
2983///
2984/// struct MyTask;
2985///
2986/// #[genja_task(name = "my_task", connection_plugin_name = "ssh")]
2987/// impl MyTask {
2988/// async fn start_async(
2989/// &self,
2990/// _host: &genja_core::inventory::Host,
2991/// _context: &TaskRuntimeContext,
2992/// ) -> Result<genja_core::task::HostTaskResult, genja_core::task::TaskError> {
2993/// Ok(genja_core::task::HostTaskResult::passed(
2994/// genja_core::task::TaskSuccess::new(),
2995/// ))
2996/// }
2997/// }
2998/// ```
2999#[async_trait]
3000pub trait Task: TaskInfo + Send + Sync {
3001 /// Start executing a blocking task with runtime execution context.
3002 fn start(
3003 &self,
3004 host: &Host,
3005 context: &BlockingTaskRuntimeContext,
3006 ) -> Result<HostTaskResult, TaskError> {
3007 let _ = (host, context);
3008 Err(TaskError::new(std::io::Error::other(
3009 "blocking start() not implemented",
3010 )))
3011 }
3012
3013 /// Start executing an async task with runtime execution context.
3014 async fn start_async(
3015 &self,
3016 host: &Host,
3017 context: &TaskRuntimeContext,
3018 ) -> Result<HostTaskResult, TaskError> {
3019 let _ = (host, context);
3020 Err(TaskError::new(std::io::Error::other(
3021 "async start_async() not implemented",
3022 )))
3023 }
3024
3025 /// Return any sub-tasks for this task.
3026 fn sub_tasks(&self) -> Vec<Arc<dyn Task>> {
3027 Vec::new()
3028 }
3029
3030 /// Declare how the runtime should execute this task.
3031 fn execution_mode(&self) -> TaskExecutionMode;
3032}
3033
3034/// Execution context passed into task implementations.
3035#[derive(Debug, Clone, PartialEq, Eq)]
3036pub struct TaskExecutionContext {
3037 current_depth: usize,
3038 max_depth: usize,
3039}
3040
3041impl TaskExecutionContext {
3042 pub fn new(current_depth: usize, max_depth: usize) -> Self {
3043 Self {
3044 current_depth,
3045 max_depth,
3046 }
3047 }
3048
3049 pub fn current_depth(&self) -> usize {
3050 self.current_depth
3051 }
3052
3053 pub fn max_depth(&self) -> usize {
3054 self.max_depth
3055 }
3056}
3057
3058/// Runtime context passed into task implementations.
3059#[derive(Debug, Clone)]
3060pub struct TaskRuntimeContext {
3061 execution: TaskExecutionContext,
3062 connection: Option<Arc<Mutex<dyn Connection>>>,
3063}
3064
3065impl TaskRuntimeContext {
3066 pub fn new(
3067 execution: TaskExecutionContext,
3068 connection: Option<Arc<Mutex<dyn Connection>>>,
3069 ) -> Self {
3070 Self {
3071 execution,
3072 connection,
3073 }
3074 }
3075
3076 pub fn execution(&self) -> &TaskExecutionContext {
3077 &self.execution
3078 }
3079
3080 pub fn current_depth(&self) -> usize {
3081 self.execution.current_depth()
3082 }
3083
3084 pub fn max_depth(&self) -> usize {
3085 self.execution.max_depth()
3086 }
3087
3088 pub fn connection(&self) -> Option<&Arc<Mutex<dyn Connection>>> {
3089 self.connection.as_ref()
3090 }
3091
3092 pub fn has_connection(&self) -> bool {
3093 self.connection.is_some()
3094 }
3095
3096 pub fn with_connection<R>(
3097 &self,
3098 f: impl FnOnce(&mut dyn Connection) -> Result<R, TaskError>,
3099 ) -> Result<Option<R>, TaskError> {
3100 let Some(connection) = &self.connection else {
3101 return Ok(None);
3102 };
3103
3104 let mut guard = connection.blocking_lock();
3105
3106 f(&mut *guard).map(Some)
3107 }
3108
3109 pub async fn execute_command(&self, command: &str) -> Result<Option<String>, TaskError> {
3110 let Some(connection) = &self.connection else {
3111 return Ok(None);
3112 };
3113
3114 let mut guard = connection.lock().await;
3115 guard
3116 .execute_command(command)
3117 .await
3118 .map(Some)
3119 .map_err(|err| TaskError::new(std::io::Error::other(err)))
3120 }
3121}
3122
3123#[derive(Clone)]
3124pub struct BlockingTaskConnection {
3125 inner: Arc<Mutex<dyn Connection>>,
3126 runtime_handle: Handle,
3127}
3128
3129impl fmt::Debug for BlockingTaskConnection {
3130 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3131 f.debug_struct("BlockingTaskConnection").finish()
3132 }
3133}
3134
3135impl BlockingTaskConnection {
3136 pub fn new(inner: Arc<Mutex<dyn Connection>>, runtime_handle: Handle) -> Self {
3137 Self {
3138 inner,
3139 runtime_handle,
3140 }
3141 }
3142
3143 pub fn with_connection<R>(
3144 &self,
3145 f: impl FnOnce(&mut dyn Connection) -> Result<R, TaskError>,
3146 ) -> Result<R, TaskError> {
3147 let mut guard = self.inner.blocking_lock();
3148 f(&mut *guard)
3149 }
3150
3151 pub fn execute_command(&self, command: &str) -> Result<String, TaskError> {
3152 let command = command.to_string();
3153 let connection = Arc::clone(&self.inner);
3154 self.runtime_handle
3155 .block_on(async move {
3156 let mut guard = connection.lock().await;
3157 guard.execute_command(&command).await
3158 })
3159 .map_err(|err| TaskError::new(std::io::Error::other(err)))
3160 }
3161}
3162
3163#[derive(Debug, Clone)]
3164pub struct BlockingTaskRuntimeContext {
3165 execution: TaskExecutionContext,
3166 connection: Option<BlockingTaskConnection>,
3167}
3168
3169impl BlockingTaskRuntimeContext {
3170 pub fn new(
3171 execution: TaskExecutionContext,
3172 connection: Option<Arc<Mutex<dyn Connection>>>,
3173 runtime_handle: Handle,
3174 ) -> Self {
3175 Self {
3176 execution,
3177 connection: connection
3178 .map(|connection| BlockingTaskConnection::new(connection, runtime_handle)),
3179 }
3180 }
3181
3182 pub fn execution(&self) -> &TaskExecutionContext {
3183 &self.execution
3184 }
3185
3186 pub fn current_depth(&self) -> usize {
3187 self.execution.current_depth()
3188 }
3189
3190 pub fn max_depth(&self) -> usize {
3191 self.execution.max_depth()
3192 }
3193
3194 pub fn connection(&self) -> Option<&BlockingTaskConnection> {
3195 self.connection.as_ref()
3196 }
3197
3198 pub fn has_connection(&self) -> bool {
3199 self.connection.is_some()
3200 }
3201
3202 pub fn with_connection<R>(
3203 &self,
3204 f: impl FnOnce(&mut dyn Connection) -> Result<R, TaskError>,
3205 ) -> Result<Option<R>, TaskError> {
3206 let Some(connection) = &self.connection else {
3207 return Ok(None);
3208 };
3209
3210 connection.with_connection(f).map(Some)
3211 }
3212
3213 pub fn execute_command(&self, command: &str) -> Result<Option<String>, TaskError> {
3214 let Some(connection) = &self.connection else {
3215 return Ok(None);
3216 };
3217
3218 connection.execute_command(command).map(Some)
3219 }
3220}
3221
3222/// Execution context passed to task processors.
3223///
3224/// Processors receive this context for both aggregate task lifecycle events and
3225/// per-host task instance events. Each task selects processors with
3226/// [`TaskInfo::processor_names`], so deeply nested sub-tasks can opt into their
3227/// own processing without relying on parent-name rules.
3228#[derive(Debug, Clone, PartialEq, Eq)]
3229pub struct TaskProcessorContext {
3230 task_name: String,
3231 parent_task_name: Option<String>,
3232 depth: usize,
3233 hostname: Option<String>,
3234}
3235
3236impl TaskProcessorContext {
3237 pub fn new(
3238 task_name: impl Into<String>,
3239 parent_task_name: Option<impl Into<String>>,
3240 depth: usize,
3241 hostname: Option<impl Into<String>>,
3242 ) -> Self {
3243 Self {
3244 task_name: task_name.into(),
3245 parent_task_name: parent_task_name.map(Into::into),
3246 depth,
3247 hostname: hostname.map(Into::into),
3248 }
3249 }
3250
3251 pub fn task_name(&self) -> &str {
3252 &self.task_name
3253 }
3254
3255 pub fn parent_task_name(&self) -> Option<&str> {
3256 self.parent_task_name.as_deref()
3257 }
3258
3259 pub fn depth(&self) -> usize {
3260 self.depth
3261 }
3262
3263 pub fn hostname(&self) -> Option<&str> {
3264 self.hostname.as_deref()
3265 }
3266
3267 pub fn is_sub_task(&self) -> bool {
3268 self.parent_task_name.is_some()
3269 }
3270}
3271
3272/// Processes task results during aggregate and per-host task lifecycles.
3273///
3274/// All methods are no-ops by default. Implementers can override only the hooks
3275/// they need. Returning an error aborts the current runner path.
3276pub trait TaskProcessor: Send + Sync {
3277 /// Called when a task result tree starts.
3278 fn on_task_start(
3279 &self,
3280 _context: &TaskProcessorContext,
3281 _results: &mut TaskResults,
3282 ) -> Result<(), crate::GenjaError> {
3283 Ok(())
3284 }
3285
3286 /// Called after a task result tree is complete.
3287 fn on_task_finish(
3288 &self,
3289 _context: &TaskProcessorContext,
3290 _results: &mut TaskResults,
3291 ) -> Result<(), crate::GenjaError> {
3292 Ok(())
3293 }
3294
3295 /// Called immediately before a task runs on a host.
3296 fn on_instance_start(&self, _context: &TaskProcessorContext) -> Result<(), crate::GenjaError> {
3297 Ok(())
3298 }
3299
3300 /// Called after a task has produced its host result and execution timing has
3301 /// been attached, before that result is inserted into [`TaskResults`].
3302 fn on_instance_finish(
3303 &self,
3304 _context: &TaskProcessorContext,
3305 _result: &mut HostTaskResult,
3306 ) -> Result<(), crate::GenjaError> {
3307 Ok(())
3308 }
3309}
3310
3311impl fmt::Debug for dyn TaskProcessor {
3312 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3313 write!(f, "TaskProcessor")
3314 }
3315}
3316
3317/// Resolves named task processors at execution time.
3318///
3319/// This keeps `genja-core` independent from the plugin manager while allowing
3320/// the runtime to pass its existing plugin registry into a [`TaskDefinition`].
3321pub trait TaskProcessorResolver: Send + Sync {
3322 fn resolve_task_processor(&self, name: &str) -> Option<Arc<dyn TaskProcessor>>;
3323}
3324
3325/// Opens or verifies task-scoped connections before execution.
3326///
3327/// The full runtime can implement this trait to ensure the connection selected by
3328/// a task is available before the task body runs. Core task execution remains
3329/// generic by depending only on this trait rather than on a concrete runtime type.
3330#[async_trait]
3331pub trait TaskConnectionResolver: Send + Sync {
3332 /// Open or retrieve the connection required by `task` for `hostname`.
3333 async fn resolve_task_connection(
3334 &self,
3335 task: &dyn Task,
3336 hostname: &str,
3337 ) -> Result<Option<Arc<Mutex<dyn Connection>>>, crate::GenjaError>;
3338}
3339
3340impl fmt::Debug for dyn TaskProcessorResolver {
3341 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3342 write!(f, "TaskProcessorResolver")
3343 }
3344}
3345
3346impl fmt::Debug for dyn TaskConnectionResolver {
3347 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3348 write!(f, "TaskConnectionResolver")
3349 }
3350}
3351
3352/// A wrapper around a task implementation that enforces the task trait flow.
3353///
3354/// `TaskDefinition` encapsulates a task that implements the `Task` trait, providing
3355/// a unified interface for task execution and management. This wrapper enables
3356/// polymorphic task handling while maintaining type safety through trait objects.
3357///
3358/// The wrapper provides access to the underlying task through trait object references,
3359/// allowing the task to be executed and queried without knowing its concrete type.
3360/// This is particularly useful for storing heterogeneous collections of tasks and
3361/// executing them uniformly.
3362///
3363/// # Fields
3364///
3365/// * `inner` - A boxed trait object containing the actual task implementation.
3366/// The task must implement the `Task` trait, providing metadata, execution
3367/// logic, and sub-task management.
3368///
3369/// # Example
3370///
3371/// ```rust
3372/// use genja_core::genja_task;
3373/// use genja_core::inventory::Host;
3374/// use genja_core::task::{
3375/// HostTaskResult, Task, TaskDefinition, TaskInfo, TaskRuntimeContext, TaskSuccess,
3376/// };
3377///
3378/// struct MyTask;
3379///
3380/// #[genja_task(name = "deploy", connection_plugin_name = "ssh")]
3381/// impl MyTask {
3382/// async fn start_async(
3383/// &self,
3384/// _host: &Host,
3385/// _context: &TaskRuntimeContext,
3386/// ) -> Result<HostTaskResult, genja_core::task::TaskError> {
3387/// Ok(HostTaskResult::passed(TaskSuccess::new()))
3388/// }
3389/// }
3390///
3391/// let task = MyTask;
3392/// let definition = TaskDefinition::new(task);
3393/// assert_eq!(definition.name(), "deploy");
3394/// ```
3395#[derive(Clone)]
3396pub struct TaskDefinition {
3397 inner: Arc<dyn Task>,
3398 processor_resolver: Option<Arc<dyn TaskProcessorResolver>>,
3399 processor_names: Arc<Vec<String>>,
3400}
3401
3402impl TaskDefinition {
3403 /// Wrap a user-defined task that implements the Task trait.
3404 pub fn new<T: Task + 'static>(task: T) -> Self {
3405 Self {
3406 inner: Arc::new(task),
3407 processor_resolver: None,
3408 processor_names: Arc::new(Vec::new()),
3409 }
3410 }
3411
3412 /// Borrow the inner task as a trait object.
3413 pub fn as_task(&self) -> &dyn Task {
3414 self.inner.as_ref()
3415 }
3416
3417 /// Select a processor for the root task definition.
3418 ///
3419 /// Sub-tasks do not inherit this selection. They select processors through
3420 /// their own [`TaskInfo::processor_names`] implementation.
3421 pub fn with_processor(mut self, processor_name: impl Into<String>) -> Self {
3422 Arc::make_mut(&mut self.processor_names).push(processor_name.into());
3423 self
3424 }
3425
3426 /// Select multiple processors for the root task definition.
3427 pub fn with_processors<I, S>(mut self, processor_names: I) -> Self
3428 where
3429 I: IntoIterator<Item = S>,
3430 S: Into<String>,
3431 {
3432 Arc::make_mut(&mut self.processor_names)
3433 .extend(processor_names.into_iter().map(Into::into));
3434 self
3435 }
3436
3437 /// Attach the processor resolver used during execution.
3438 pub fn with_processor_resolver(
3439 mut self,
3440 processor_resolver: Arc<dyn TaskProcessorResolver>,
3441 ) -> Self {
3442 self.processor_resolver = Some(processor_resolver);
3443 self
3444 }
3445
3446 /// Returns the root task's selected processor names.
3447 pub fn processor_names(&self) -> Vec<&str> {
3448 if self.processor_names.is_empty() {
3449 self.inner.processor_names()
3450 } else {
3451 self.processor_names.iter().map(String::as_str).collect()
3452 }
3453 }
3454
3455 fn processors_for(
3456 &self,
3457 task: &dyn Task,
3458 ) -> Result<Vec<Arc<dyn TaskProcessor>>, crate::GenjaError> {
3459 Self::resolve_processors(
3460 self.processor_resolver.as_deref(),
3461 &self.processor_names_for(task),
3462 )
3463 }
3464
3465 fn processor_names_for<'a>(&'a self, task: &'a dyn Task) -> Vec<&'a str> {
3466 if std::ptr::eq(task, self.inner.as_ref()) && !self.processor_names.is_empty() {
3467 self.processor_names.iter().map(String::as_str).collect()
3468 } else {
3469 task.processor_names()
3470 }
3471 }
3472
3473 fn resolve_processors(
3474 processor_resolver: Option<&dyn TaskProcessorResolver>,
3475 processor_names: &[&str],
3476 ) -> Result<Vec<Arc<dyn TaskProcessor>>, crate::GenjaError> {
3477 let Some(processor_resolver) = processor_resolver else {
3478 if processor_names.is_empty() {
3479 return Ok(Vec::new());
3480 }
3481
3482 return Err(crate::GenjaError::PluginNotFound(
3483 processor_names[0].to_string(),
3484 ));
3485 };
3486
3487 processor_names
3488 .iter()
3489 .map(|name| {
3490 processor_resolver
3491 .resolve_task_processor(name)
3492 .ok_or_else(|| crate::GenjaError::PluginNotFound((*name).to_string()))
3493 })
3494 .collect()
3495 }
3496}
3497
3498impl TaskDefinition {
3499 /// Execute this task and all its sub-tasks recursively up to a maximum depth.
3500 ///
3501 /// This method starts the task execution by calling the task's `start()` method,
3502 /// then recursively executes all sub-tasks returned by `sub_tasks()`. The recursion
3503 /// is limited by the `max_depth` parameter to prevent infinite loops or excessive
3504 /// nesting.
3505 ///
3506 /// # Parameters
3507 ///
3508 /// * `max_depth` - The maximum depth of task nesting allowed. Depth is zero-based:
3509 /// the root task runs at depth `0`, its immediate sub-tasks at depth `1`, and so on.
3510 /// This means `max_depth = 0` allows only the root task, `max_depth = 1` allows
3511 /// the root task plus one level of sub-tasks, and so on.
3512 ///
3513 /// # Returns
3514 ///
3515 /// Inserts the provided host's result into the shared `TaskResults` tree and
3516 /// recursively does the same for any sub-tasks. The parent task result is recorded
3517 /// before sub-task execution starts.
3518 ///
3519 /// # Errors
3520 ///
3521 /// This method currently does not return an error for depth overflow. When task
3522 /// nesting exceeds `max_depth`, it records an internal failed host result for that
3523 /// task node and returns `Ok(())`.
3524 pub async fn start(
3525 &self,
3526 hostname: &str,
3527 host: &Host,
3528 results: &mut TaskResults,
3529 max_depth: usize,
3530 ) -> Result<(), crate::GenjaError> {
3531 Self::start_with_depth(
3532 Arc::clone(&self.inner),
3533 hostname,
3534 host,
3535 results,
3536 None,
3537 self.processor_resolver.as_deref(),
3538 self.processor_names(),
3539 None,
3540 0,
3541 max_depth,
3542 )
3543 .await
3544 }
3545
3546 /// Executes this task definition while ensuring task-scoped connections are opened.
3547 pub async fn start_with_connection_resolver(
3548 &self,
3549 hostname: &str,
3550 host: &Host,
3551 results: &mut TaskResults,
3552 connection_resolver: Option<&dyn TaskConnectionResolver>,
3553 max_depth: usize,
3554 ) -> Result<(), crate::GenjaError> {
3555 Self::start_with_depth(
3556 Arc::clone(&self.inner),
3557 hostname,
3558 host,
3559 results,
3560 connection_resolver,
3561 self.processor_resolver.as_deref(),
3562 self.processor_names(),
3563 None,
3564 0,
3565 max_depth,
3566 )
3567 .await
3568 }
3569
3570 /// Run aggregate task-start processors for this definition.
3571 pub fn process_task_start(&self, results: &mut TaskResults) -> Result<(), crate::GenjaError> {
3572 let context = TaskProcessorContext::new(self.name(), None::<&str>, 0, None::<&str>);
3573 for processor in self.processors_for(self.inner.as_ref())? {
3574 processor.on_task_start(&context, results)?;
3575 }
3576 Ok(())
3577 }
3578
3579 /// Run aggregate task-finish processors for this definition.
3580 pub fn process_task_finish(&self, results: &mut TaskResults) -> Result<(), crate::GenjaError> {
3581 let context = TaskProcessorContext::new(self.name(), None::<&str>, 0, None::<&str>);
3582 for processor in self.processors_for(self.inner.as_ref())? {
3583 processor.on_task_finish(&context, results)?;
3584 }
3585 Ok(())
3586 }
3587
3588 /// Recursively executes a task and its sub-tasks with depth tracking.
3589 ///
3590 /// This internal helper method performs the actual recursive task execution,
3591 /// tracking the current depth to enforce the maximum depth limit. It executes
3592 /// the task by calling its runtime-aware `start()` method, stores the result,
3593 /// then recursively processes all sub-tasks returned by `sub_tasks()`.
3594 ///
3595 /// Sub-tasks are executed in iteration order. Results are grouped by task name, so
3596 /// a sub-task named `"validate"` produces a single `TaskResults` node containing
3597 /// host results for every host on which that sub-task ran.
3598 ///
3599 /// The method ensures that task nesting doesn't exceed the specified maximum
3600 /// depth, preventing infinite recursion or excessive nesting that could lead
3601 /// to stack overflow or performance issues.
3602 ///
3603 /// # Parameters
3604 ///
3605 /// * `task` - A reference to the task to execute, provided as a trait object.
3606 /// This allows handling any type that implements the `Task` trait.
3607 ///
3608 /// * `hostname` - The name of the host on which the task is being executed.
3609 /// Used as the key when storing task results.
3610 ///
3611 /// * `host` - A reference to the `Host` object representing the target system.
3612 /// This is passed to the task's `start()` method for execution.
3613 ///
3614 /// * `results` - A mutable reference to the `TaskResults` structure where
3615 /// execution results for this task and its sub-tasks will be stored.
3616 ///
3617 /// * `depth` - The current depth in the task execution tree. The root task
3618 /// starts at depth 0, its immediate sub-tasks are at depth 1, and so on.
3619 ///
3620 /// * `max_depth` - The maximum allowed depth for task nesting. If `depth`
3621 /// exceeds this value, the method returns an error and stops execution.
3622 /// Because the check is `depth > max_depth`, a task at depth exactly equal
3623 /// to `max_depth` is still allowed to run.
3624 ///
3625 /// # Returns
3626 ///
3627 /// * `Ok(())` if the task and all its sub-tasks executed successfully within
3628 /// the depth limit, or if any depth overflow was captured as a failed host result.
3629 ///
3630 /// # Errors
3631 ///
3632 /// Depth overflow is handled by inserting a failed host result with
3633 /// [`TaskFailureKind::Internal`]. This helper only returns an error if a future
3634 /// implementation path introduces one explicitly.
3635 #[async_recursion]
3636 #[allow(clippy::too_many_arguments)]
3637 async fn start_with_depth(
3638 task: Arc<dyn Task>,
3639 hostname: &str,
3640 host: &Host,
3641 results: &mut TaskResults,
3642 connection_resolver: Option<&dyn TaskConnectionResolver>,
3643 processor_resolver: Option<&dyn TaskProcessorResolver>,
3644 processor_names: Vec<&str>,
3645 parent_task_name: Option<&str>,
3646 depth: usize,
3647 max_depth: usize,
3648 ) -> Result<(), crate::GenjaError> {
3649 if depth > max_depth {
3650 let started_at = SystemTime::now();
3651 let finished_at = started_at;
3652 let error =
3653 crate::GenjaError::Message(format!("max task depth exceeded: {}", max_depth));
3654 warn!(
3655 "max task depth exceeded for task '{}' at depth {} with max_depth {}",
3656 task.name(),
3657 depth,
3658 max_depth
3659 );
3660 results.record_execution_timing(started_at, finished_at);
3661 results.insert_host_result(
3662 hostname,
3663 HostTaskResult::failed(
3664 TaskFailure::new(error)
3665 .with_kind(TaskFailureKind::Internal)
3666 .with_started_at(started_at)
3667 .with_finished_at(finished_at)
3668 .with_duration_ns(0),
3669 ),
3670 );
3671 return Ok(());
3672 }
3673
3674 let started_at = SystemTime::now();
3675 let parent_task = parent_task_name.unwrap_or("none");
3676 debug!(
3677 "starting task '{}' for host '{}' parent_task='{}' depth={} max_depth={} has_connection={}",
3678 task.name(),
3679 hostname,
3680 parent_task,
3681 depth,
3682 max_depth,
3683 connection_resolver.is_some()
3684 );
3685 let processor_context =
3686 TaskProcessorContext::new(task.name(), parent_task_name, depth, Some(hostname));
3687 let processors = Self::resolve_processors(processor_resolver, &processor_names)?;
3688 for processor in &processors {
3689 processor.on_instance_start(&processor_context)?;
3690 }
3691
3692 let connection = if let Some(connection_resolver) = connection_resolver {
3693 match connection_resolver
3694 .resolve_task_connection(task.as_ref(), hostname)
3695 .await
3696 {
3697 Ok(connection) => connection,
3698 Err(error) => {
3699 let finished_at = SystemTime::now();
3700 let duration_ns = finished_at
3701 .duration_since(started_at)
3702 .map(|duration| duration.as_nanos())
3703 .unwrap_or(0);
3704 results.record_execution_timing(started_at, finished_at);
3705
3706 warn!(
3707 "task '{}' failed to open connection for host '{}': {}",
3708 task.name(),
3709 hostname,
3710 error
3711 );
3712
3713 let mut host_result = HostTaskResult::failed(
3714 TaskFailure::new(error)
3715 .with_kind(TaskFailureKind::Connection)
3716 .with_started_at(started_at)
3717 .with_finished_at(finished_at)
3718 .with_duration_ns(duration_ns),
3719 );
3720 for processor in &processors {
3721 processor.on_instance_finish(&processor_context, &mut host_result)?;
3722 }
3723 results.insert_host_result(hostname, host_result);
3724 return Ok(());
3725 }
3726 }
3727 } else {
3728 None
3729 };
3730
3731 let execution_context = TaskExecutionContext::new(depth, max_depth);
3732 let host_result = match task.execution_mode() {
3733 TaskExecutionMode::Async => {
3734 let runtime_context =
3735 TaskRuntimeContext::new(execution_context, connection.clone());
3736 task.start_async(host, &runtime_context).await
3737 }
3738 TaskExecutionMode::Blocking => {
3739 let blocking_context = BlockingTaskRuntimeContext::new(
3740 execution_context,
3741 connection.clone(),
3742 Handle::current(),
3743 );
3744 let task = Arc::clone(&task);
3745 let host = host.clone();
3746 match task::spawn_blocking(move || task.start(&host, &blocking_context)).await {
3747 Ok(result) => result,
3748 Err(err) => Err(TaskError::new(std::io::Error::other(format!(
3749 "blocking task join error: {err}"
3750 )))),
3751 }
3752 }
3753 };
3754 let finished_at = SystemTime::now();
3755 let duration_ns = finished_at
3756 .duration_since(started_at)
3757 .map(|duration| duration.as_nanos())
3758 .unwrap_or(0);
3759
3760 results.record_execution_timing(started_at, finished_at);
3761
3762 let host_result = match host_result {
3763 Ok(host_result) => host_result,
3764 Err(error) => {
3765 let failure = TaskFailure::from_task_error(error)
3766 .with_started_at(started_at)
3767 .with_finished_at(finished_at)
3768 .with_duration_ns(duration_ns);
3769 warn!(
3770 "task '{}' failed for host '{}': {}",
3771 task.name(),
3772 hostname,
3773 failure.message()
3774 );
3775 let duration_display = failure
3776 .duration_display()
3777 .unwrap_or_else(|| format_duration_display(duration_ns));
3778 info!(
3779 "finished task '{}' for host '{}' with status=failed duration_ms={} duration={}",
3780 task.name(),
3781 hostname,
3782 duration_ns / 1_000_000,
3783 duration_display
3784 );
3785 let mut host_result = HostTaskResult::failed(failure);
3786 for processor in &processors {
3787 processor.on_instance_finish(&processor_context, &mut host_result)?;
3788 }
3789 results.insert_host_result(hostname, host_result);
3790 return Ok(());
3791 }
3792 };
3793
3794 if let Some(failure) = host_result.failure() {
3795 warn!(
3796 "task '{}' failed for host '{}': {}",
3797 task.name(),
3798 hostname,
3799 failure.message()
3800 );
3801 }
3802
3803 if let Some(skip) = host_result.skipped_detail() {
3804 info!(
3805 "task '{}' skipped for host '{}' reason='{}' message='{}'",
3806 task.name(),
3807 hostname,
3808 skip.reason().unwrap_or("none"),
3809 skip.message().unwrap_or("")
3810 );
3811 }
3812
3813 let status = if host_result.is_passed() {
3814 "passed"
3815 } else if host_result.is_failed() {
3816 "failed"
3817 } else {
3818 "skipped"
3819 };
3820
3821 let mut host_result =
3822 host_result.with_execution_timing(started_at, finished_at, duration_ns);
3823 let duration_display = match &host_result {
3824 HostTaskResult::Passed(success) => success
3825 .duration_display()
3826 .unwrap_or_else(|| format_duration_display(duration_ns)),
3827 HostTaskResult::Failed(failure) => failure
3828 .duration_display()
3829 .unwrap_or_else(|| format_duration_display(duration_ns)),
3830 HostTaskResult::Skipped(_) => format_duration_display(duration_ns),
3831 };
3832
3833 info!(
3834 "finished task '{}' for host '{}' with status={} duration_ms={} duration={}",
3835 task.name(),
3836 hostname,
3837 status,
3838 duration_ns / 1_000_000,
3839 duration_display
3840 );
3841
3842 for processor in &processors {
3843 processor.on_instance_finish(&processor_context, &mut host_result)?;
3844 }
3845 results.insert_host_result(hostname, host_result);
3846
3847 for sub in task.sub_tasks() {
3848 let sub_task_name = sub.name().to_string();
3849 if results.sub_task(&sub_task_name).is_none() {
3850 results.insert_sub_task(sub_task_name.clone(), TaskResults::new(&sub_task_name));
3851 }
3852 let sub_results = results
3853 .sub_task_mut(&sub_task_name)
3854 .expect("sub task results should exist after insertion");
3855 let sub_processor_names = sub.processor_names();
3856 let sub_processors =
3857 Self::resolve_processors(processor_resolver, &sub_processor_names)?;
3858 let mut sub_task_started = false;
3859 if !sub_processors.is_empty()
3860 && sub_results.hosts().is_empty()
3861 && sub_results.sub_tasks().is_empty()
3862 {
3863 let sub_context = TaskProcessorContext::new(
3864 sub_task_name.as_str(),
3865 Some(task.name()),
3866 depth + 1,
3867 None::<&str>,
3868 );
3869 for processor in &sub_processors {
3870 processor.on_task_start(&sub_context, sub_results)?;
3871 }
3872 sub_task_started = true;
3873 }
3874 Self::start_with_depth(
3875 Arc::clone(&sub),
3876 hostname,
3877 host,
3878 sub_results,
3879 connection_resolver,
3880 processor_resolver,
3881 sub_processor_names,
3882 Some(task.name()),
3883 depth + 1,
3884 max_depth,
3885 )
3886 .await?;
3887 if sub_task_started {
3888 let sub_context = TaskProcessorContext::new(
3889 sub_task_name.as_str(),
3890 Some(task.name()),
3891 depth + 1,
3892 None::<&str>,
3893 );
3894 for processor in &sub_processors {
3895 processor.on_task_finish(&sub_context, sub_results)?;
3896 }
3897 }
3898 }
3899
3900 Ok(())
3901 }
3902}
3903
3904impl TaskInfo for TaskDefinition {
3905 /// Returns the name of the task.
3906 ///
3907 /// This method delegates to the inner task's `name()` implementation, providing
3908 /// access to the task's identifier through the `TaskDefinition` wrapper.
3909 ///
3910 /// # Returns
3911 ///
3912 /// A string slice containing the task's name.
3913 fn name(&self) -> &str {
3914 self.inner.name()
3915 }
3916
3917 /// Returns the name of the plugin associated with this task.
3918 ///
3919 /// This method delegates to the inner task's `connection_plugin_name()` implementation,
3920 /// providing access to the plugin identifier that will handle the task's execution.
3921 ///
3922 /// # Returns
3923 ///
3924 /// A string slice containing the connection plugin's name (e.g., "ssh", "netconf", "restconf").
3925 fn connection_plugin_name(&self) -> Option<&str> {
3926 self.inner.connection_plugin_name()
3927 }
3928
3929 /// Builds a connection key for the specified host.
3930 ///
3931 /// This method delegates to the inner task's `get_connection_key()` implementation,
3932 /// constructing a unique identifier that combines the hostname with the plugin name
3933 /// to identify the connection to be used for task execution.
3934 ///
3935 /// # Parameters
3936 ///
3937 /// * `hostname` - The name of the host for which to build the connection key.
3938 ///
3939 /// # Returns
3940 ///
3941 /// An optional `ConnectionKey` that uniquely identifies the connection to the
3942 /// specified host when this task declares a connection plugin.
3943 fn get_connection_key(&self, hostname: &str) -> Option<crate::inventory::ConnectionKey> {
3944 self.inner.get_connection_key(hostname)
3945 }
3946
3947 /// Returns the task's options payload, if available.
3948 ///
3949 /// This method delegates to the inner task's `options()` implementation, providing
3950 /// access to any structured configuration or parameters associated with the task.
3951 ///
3952 /// # Returns
3953 ///
3954 /// `Some(&Value)` if the task has options configured, `None` otherwise.
3955 fn options(&self) -> Option<&Value> {
3956 self.inner.options()
3957 }
3958
3959 fn processor_names(&self) -> Vec<&str> {
3960 self.processor_names()
3961 }
3962}
3963
3964/// A collection of task definitions that can be executed together.
3965///
3966/// `Tasks` is a wrapper around a vector of [`TaskDefinition`] instances, providing
3967/// a convenient way to manage and organize multiple tasks. It implements `Deref` and
3968/// `DerefMut` to allow direct access to the underlying vector's methods.
3969///
3970/// This type is particularly useful when building a sequence of tasks to be executed
3971/// as part of a larger workflow or playbook. Tasks can be added individually using
3972/// the [`add_task`](Tasks::add_task) method.
3973///
3974/// # Example
3975///
3976/// ```rust
3977/// use genja_core::genja_task;
3978/// use genja_core::inventory::Host;
3979/// use genja_core::task::{HostTaskResult, Task, TaskRuntimeContext, TaskSuccess, Tasks};
3980///
3981/// struct MyTask;
3982///
3983/// #[genja_task(name = "task")]
3984/// impl MyTask {
3985/// async fn start_async(
3986/// &self,
3987/// _host: &Host,
3988/// _context: &TaskRuntimeContext,
3989/// ) -> Result<HostTaskResult, genja_core::task::TaskError> {
3990/// Ok(HostTaskResult::passed(TaskSuccess::new()))
3991/// }
3992/// }
3993///
3994/// let mut tasks = Tasks::new();
3995/// tasks.add_task(MyTask);
3996/// tasks.add_task(MyTask);
3997/// assert_eq!(tasks.len(), 2);
3998/// ```
3999#[derive(Default)]
4000pub struct Tasks(Vec<TaskDefinition>);
4001
4002impl Tasks {
4003 pub fn new() -> Self {
4004 Self(Vec::new())
4005 }
4006
4007 pub fn add_task<T: Task + 'static>(&mut self, task: T) {
4008 self.0.push(TaskDefinition::new(task));
4009 }
4010}
4011
4012impl Deref for Tasks {
4013 type Target = Vec<TaskDefinition>;
4014
4015 fn deref(&self) -> &Self::Target {
4016 &self.0
4017 }
4018}
4019
4020impl DerefMut for Tasks {
4021 fn deref_mut(&mut self) -> &mut Self::Target {
4022 &mut self.0
4023 }
4024}
4025
4026#[cfg(test)]
4027mod tests {
4028 use super::*;
4029 use crate::inventory::{
4030 BaseBuilderHost, Connection, ConnectionKey, Host, ResolvedConnectionParams,
4031 };
4032 use async_trait::async_trait;
4033 use log::{LevelFilter, Log, Metadata, Record};
4034 use serde_json::json;
4035 use std::fmt;
4036 use std::future::Future;
4037 use std::sync::atomic::{AtomicUsize, Ordering};
4038 use std::sync::{Arc, Mutex, OnceLock};
4039 use tokio::runtime::Builder;
4040
4041 fn run_async<F: Future>(future: F) -> F::Output {
4042 Builder::new_current_thread()
4043 .enable_all()
4044 .build()
4045 .expect("test runtime should build")
4046 .block_on(future)
4047 }
4048
4049 #[derive(Debug)]
4050 struct TestTaskFailureError;
4051
4052 impl fmt::Display for TestTaskFailureError {
4053 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4054 write!(f, "task failure test error")
4055 }
4056 }
4057
4058 impl Error for TestTaskFailureError {}
4059
4060 #[derive(Debug)]
4061 struct ExternalFailurePayload {
4062 code: u16,
4063 }
4064
4065 impl fmt::Display for ExternalFailurePayload {
4066 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4067 write!(f, "external failure code {}", self.code)
4068 }
4069 }
4070
4071 struct TestTask {
4072 name: &'static str,
4073 subs: Vec<Arc<dyn Task>>,
4074 counter: Arc<AtomicUsize>,
4075 }
4076
4077 struct ProcessorTask {
4078 name: &'static str,
4079 processors: Vec<String>,
4080 }
4081
4082 struct CountingProcessor {
4083 calls: Arc<AtomicUsize>,
4084 }
4085
4086 struct TestProcessorResolver {
4087 processor: Arc<dyn TaskProcessor>,
4088 }
4089
4090 struct FailingTask;
4091
4092 struct SkippingTask;
4093
4094 #[derive(Debug)]
4095 struct TestConnection {
4096 key: ConnectionKey,
4097 alive: bool,
4098 }
4099
4100 #[async_trait]
4101 impl Connection for TestConnection {
4102 fn create(&self, key: &ConnectionKey) -> Box<dyn Connection> {
4103 Box::new(Self {
4104 key: key.clone(),
4105 alive: false,
4106 })
4107 }
4108
4109 fn is_alive(&self) -> bool {
4110 self.alive
4111 }
4112
4113 async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
4114 self.alive = true;
4115 Ok(())
4116 }
4117
4118 fn close(&mut self) -> ConnectionKey {
4119 self.alive = false;
4120 self.key.clone()
4121 }
4122 }
4123
4124 impl TaskInfo for TestTask {
4125 fn name(&self) -> &str {
4126 self.name
4127 }
4128
4129 fn connection_plugin_name(&self) -> Option<&str> {
4130 Some("ssh")
4131 }
4132
4133 fn options(&self) -> Option<&Value> {
4134 None
4135 }
4136 }
4137
4138 #[async_trait]
4139 impl Task for TestTask {
4140 async fn start_async(
4141 &self,
4142 _host: &Host,
4143 _context: &TaskRuntimeContext,
4144 ) -> Result<HostTaskResult, TaskError> {
4145 self.counter.fetch_add(1, Ordering::SeqCst);
4146 Ok(HostTaskResult::passed(TaskSuccess::new()))
4147 }
4148
4149 fn sub_tasks(&self) -> Vec<Arc<dyn Task>> {
4150 self.subs.clone()
4151 }
4152
4153 fn execution_mode(&self) -> TaskExecutionMode {
4154 TaskExecutionMode::Async
4155 }
4156 }
4157
4158 impl TaskInfo for ProcessorTask {
4159 fn name(&self) -> &str {
4160 self.name
4161 }
4162
4163 fn connection_plugin_name(&self) -> Option<&str> {
4164 Some("ssh")
4165 }
4166
4167 fn options(&self) -> Option<&Value> {
4168 None
4169 }
4170
4171 fn processor_names(&self) -> Vec<&str> {
4172 self.processors.iter().map(String::as_str).collect()
4173 }
4174 }
4175
4176 #[async_trait]
4177 impl Task for ProcessorTask {
4178 async fn start_async(
4179 &self,
4180 _host: &Host,
4181 _context: &TaskRuntimeContext,
4182 ) -> Result<HostTaskResult, TaskError> {
4183 Ok(HostTaskResult::passed(TaskSuccess::new()))
4184 }
4185
4186 fn execution_mode(&self) -> TaskExecutionMode {
4187 TaskExecutionMode::Async
4188 }
4189 }
4190
4191 impl TaskProcessor for CountingProcessor {
4192 fn on_task_start(
4193 &self,
4194 _context: &TaskProcessorContext,
4195 _results: &mut TaskResults,
4196 ) -> Result<(), crate::GenjaError> {
4197 self.calls.fetch_add(1, Ordering::SeqCst);
4198 Ok(())
4199 }
4200
4201 fn on_task_finish(
4202 &self,
4203 _context: &TaskProcessorContext,
4204 _results: &mut TaskResults,
4205 ) -> Result<(), crate::GenjaError> {
4206 self.calls.fetch_add(1, Ordering::SeqCst);
4207 Ok(())
4208 }
4209
4210 fn on_instance_start(
4211 &self,
4212 _context: &TaskProcessorContext,
4213 ) -> Result<(), crate::GenjaError> {
4214 self.calls.fetch_add(1, Ordering::SeqCst);
4215 Ok(())
4216 }
4217
4218 fn on_instance_finish(
4219 &self,
4220 _context: &TaskProcessorContext,
4221 _result: &mut HostTaskResult,
4222 ) -> Result<(), crate::GenjaError> {
4223 self.calls.fetch_add(1, Ordering::SeqCst);
4224 Ok(())
4225 }
4226 }
4227
4228 impl TaskProcessorResolver for TestProcessorResolver {
4229 fn resolve_task_processor(&self, name: &str) -> Option<Arc<dyn TaskProcessor>> {
4230 (name == "selected").then(|| Arc::clone(&self.processor))
4231 }
4232 }
4233
4234 impl TaskInfo for FailingTask {
4235 fn name(&self) -> &str {
4236 "failing"
4237 }
4238
4239 fn connection_plugin_name(&self) -> Option<&str> {
4240 Some("ssh")
4241 }
4242
4243 fn options(&self) -> Option<&Value> {
4244 None
4245 }
4246 }
4247
4248 #[async_trait]
4249 impl Task for FailingTask {
4250 async fn start_async(
4251 &self,
4252 _host: &Host,
4253 _context: &TaskRuntimeContext,
4254 ) -> Result<HostTaskResult, TaskError> {
4255 Ok(HostTaskResult::failed(TaskFailure::new(
4256 TestTaskFailureError,
4257 )))
4258 }
4259
4260 fn execution_mode(&self) -> TaskExecutionMode {
4261 TaskExecutionMode::Async
4262 }
4263 }
4264
4265 impl TaskInfo for SkippingTask {
4266 fn name(&self) -> &str {
4267 "skipping"
4268 }
4269
4270 fn connection_plugin_name(&self) -> Option<&str> {
4271 Some("ssh")
4272 }
4273
4274 fn options(&self) -> Option<&Value> {
4275 None
4276 }
4277 }
4278
4279 #[async_trait]
4280 impl Task for SkippingTask {
4281 async fn start_async(
4282 &self,
4283 _host: &Host,
4284 _context: &TaskRuntimeContext,
4285 ) -> Result<HostTaskResult, TaskError> {
4286 Ok(HostTaskResult::Skipped(
4287 TaskSkip::new().with_reason("filtered"),
4288 ))
4289 }
4290
4291 fn execution_mode(&self) -> TaskExecutionMode {
4292 TaskExecutionMode::Async
4293 }
4294 }
4295
4296 #[derive(Default)]
4297 struct TestLogger {
4298 entries: Mutex<Vec<String>>,
4299 }
4300
4301 impl TestLogger {
4302 fn clear(&self) {
4303 self.entries
4304 .lock()
4305 .expect("logger lock should not be poisoned")
4306 .clear();
4307 }
4308
4309 fn entries(&self) -> Vec<String> {
4310 self.entries
4311 .lock()
4312 .expect("logger lock should not be poisoned")
4313 .clone()
4314 }
4315 }
4316
4317 impl Log for TestLogger {
4318 fn enabled(&self, _metadata: &Metadata<'_>) -> bool {
4319 true
4320 }
4321
4322 fn log(&self, record: &Record<'_>) {
4323 if self.enabled(record.metadata()) {
4324 self.entries
4325 .lock()
4326 .expect("logger lock should not be poisoned")
4327 .push(format!("{} {}", record.level(), record.args()));
4328 }
4329 }
4330
4331 fn flush(&self) {}
4332 }
4333
4334 fn test_logger() -> &'static TestLogger {
4335 static LOGGER: OnceLock<&'static TestLogger> = OnceLock::new();
4336 LOGGER.get_or_init(|| {
4337 let logger = Box::leak(Box::new(TestLogger::default()));
4338 let _ = log::set_logger(logger);
4339 log::set_max_level(LevelFilter::Debug);
4340 logger
4341 })
4342 }
4343
4344 fn log_lock() -> &'static Mutex<()> {
4345 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
4346 LOCK.get_or_init(|| Mutex::new(()))
4347 }
4348
4349 fn chain(depth: usize, counter: Arc<AtomicUsize>) -> Arc<dyn Task> {
4350 if depth == 1 {
4351 return Arc::new(TestTask {
4352 name: "leaf",
4353 subs: Vec::new(),
4354 counter,
4355 });
4356 }
4357
4358 let child = chain(depth - 1, counter.clone());
4359 Arc::new(TestTask {
4360 name: "node",
4361 subs: vec![child],
4362 counter,
4363 })
4364 }
4365
4366 #[test]
4367 fn start_runs_within_max_depth() {
4368 let counter = Arc::new(AtomicUsize::new(0));
4369 let root = chain(3, counter.clone());
4370 let task = TaskDefinition::new(TestTask {
4371 name: "root",
4372 subs: vec![root],
4373 counter: counter.clone(),
4374 });
4375 let host = Host::builder().hostname("router1").build();
4376
4377 let mut results = TaskResults::new("root");
4378 run_async(task.start("router1", &host, &mut results, 4)).expect("start should succeed");
4379 assert_eq!(counter.load(Ordering::SeqCst), 4);
4380 assert!(results.host_result("router1").is_some());
4381 assert!(results.sub_task("node").is_some());
4382 assert!(results.started_at().is_some());
4383 assert!(results.finished_at().is_some());
4384 assert!(results.duration_display().is_some());
4385 let node_results = results
4386 .sub_task("node")
4387 .expect("sub-task results should exist after execution");
4388 assert!(node_results.started_at().is_some());
4389 assert!(node_results.finished_at().is_some());
4390 assert!(node_results.duration_display().is_some());
4391 }
4392
4393 #[test]
4394 fn start_captures_host_failure_when_depth_exceeds_limit() {
4395 let counter = Arc::new(AtomicUsize::new(0));
4396 let root = chain(5, counter.clone());
4397 let task = TaskDefinition::new(TestTask {
4398 name: "root",
4399 subs: vec![root],
4400 counter: counter.clone(),
4401 });
4402 let host = Host::builder().hostname("router1").build();
4403
4404 let mut results = TaskResults::new("root");
4405 run_async(task.start("router1", &host, &mut results, 4))
4406 .expect("start should capture depth overflow as a host failure");
4407
4408 assert_eq!(counter.load(Ordering::SeqCst), 5);
4409
4410 let level_one = results
4411 .sub_task("node")
4412 .expect("first nested node should exist");
4413 let level_two = level_one
4414 .sub_task("node")
4415 .expect("second nested node should exist");
4416 let level_three = level_two
4417 .sub_task("node")
4418 .expect("third nested node should exist");
4419 let level_four = level_three
4420 .sub_task("node")
4421 .expect("fourth nested node should exist");
4422 let level_five = level_four
4423 .sub_task("leaf")
4424 .expect("leaf task should capture failure");
4425
4426 let failure = level_five
4427 .host_result("router1")
4428 .and_then(HostTaskResult::failure)
4429 .expect("depth overflow should be recorded as a host failure");
4430 assert!(failure.message().contains("max task depth exceeded"));
4431 assert!(matches!(failure.kind(), TaskFailureKind::Internal));
4432 assert!(failure.started_at().is_some());
4433 assert!(failure.finished_at().is_some());
4434 assert_eq!(failure.duration_ns(), Some(0));
4435 }
4436
4437 #[test]
4438 fn start_attaches_timing_to_passed_host_results() {
4439 let counter = Arc::new(AtomicUsize::new(0));
4440 let task = TaskDefinition::new(TestTask {
4441 name: "root",
4442 subs: Vec::new(),
4443 counter,
4444 });
4445 let host = Host::builder().hostname("router1").build();
4446 let mut results = TaskResults::new("root");
4447
4448 run_async(task.start("router1", &host, &mut results, 0)).expect("start should succeed");
4449
4450 let success = results
4451 .host_result("router1")
4452 .and_then(HostTaskResult::success)
4453 .expect("host result should be passed");
4454 assert!(success.started_at().is_some());
4455 assert!(success.finished_at().is_some());
4456 assert!(success.duration_ns().is_some());
4457 assert!(success.duration_display().is_some());
4458 }
4459
4460 #[test]
4461 fn start_attaches_timing_to_failed_host_results() {
4462 let task = TaskDefinition::new(FailingTask);
4463 let host = Host::builder().hostname("router1").build();
4464 let mut results = TaskResults::new("failing");
4465
4466 run_async(task.start("router1", &host, &mut results, 0))
4467 .expect("start should record a failed result");
4468
4469 let failure = results
4470 .host_result("router1")
4471 .and_then(HostTaskResult::failure)
4472 .expect("host result should be failed");
4473 assert!(failure.started_at().is_some());
4474 assert!(failure.finished_at().is_some());
4475 assert!(failure.duration_ns().is_some());
4476 assert!(failure.duration_display().is_some());
4477 }
4478
4479 #[test]
4480 fn start_does_not_attach_timing_to_skipped_host_results() {
4481 let task = TaskDefinition::new(SkippingTask);
4482 let host = Host::builder().hostname("router1").build();
4483 let mut results = TaskResults::new("skipping");
4484
4485 run_async(task.start("router1", &host, &mut results, 0))
4486 .expect("start should record a skipped result");
4487
4488 let skip = results
4489 .host_result("router1")
4490 .and_then(HostTaskResult::skipped_detail)
4491 .expect("host result should be skipped");
4492 assert_eq!(skip.reason(), Some("filtered"));
4493 assert_eq!(skip.message(), None);
4494 }
4495
4496 #[test]
4497 fn start_logs_per_host_finish_for_passed_results() {
4498 let _guard = log_lock().lock().expect("log lock should not be poisoned");
4499 let logger = test_logger();
4500 logger.clear();
4501
4502 let counter = Arc::new(AtomicUsize::new(0));
4503 let task = TaskDefinition::new(TestTask {
4504 name: "root",
4505 subs: Vec::new(),
4506 counter,
4507 });
4508 let host = Host::builder().hostname("router1").build();
4509 let mut results = TaskResults::new("root");
4510
4511 run_async(task.start("router1", &host, &mut results, 0)).expect("start should succeed");
4512
4513 let entries = logger.entries();
4514 assert!(entries.iter().any(|entry| {
4515 entry.contains(
4516 "DEBUG starting task 'root' for host 'router1' parent_task='none' depth=0",
4517 )
4518 }));
4519 assert!(entries.iter().any(|entry| {
4520 entry.contains(
4521 "INFO finished task 'root' for host 'router1' with status=passed duration_ms=",
4522 ) && entry.contains(" duration=")
4523 }));
4524 }
4525
4526 #[test]
4527 fn start_logs_per_host_failure_warning_and_finish() {
4528 let _guard = log_lock().lock().expect("log lock should not be poisoned");
4529 let logger = test_logger();
4530 logger.clear();
4531
4532 let task = TaskDefinition::new(FailingTask);
4533 let host = Host::builder().hostname("router1").build();
4534 let mut results = TaskResults::new("failing");
4535
4536 run_async(task.start("router1", &host, &mut results, 0))
4537 .expect("start should record a failed result");
4538
4539 let entries = logger.entries();
4540 assert!(entries.iter().any(|entry| {
4541 entry == "WARN task 'failing' failed for host 'router1': task failure test error"
4542 }));
4543 assert!(entries.iter().any(|entry| {
4544 entry.contains(
4545 "INFO finished task 'failing' for host 'router1' with status=failed duration_ms=",
4546 ) && entry.contains(" duration=")
4547 }));
4548 }
4549
4550 #[test]
4551 fn start_logs_per_host_skip_event_and_finish() {
4552 let _guard = log_lock().lock().expect("log lock should not be poisoned");
4553 let logger = test_logger();
4554 logger.clear();
4555
4556 let task = TaskDefinition::new(SkippingTask);
4557 let host = Host::builder().hostname("router1").build();
4558 let mut results = TaskResults::new("skipping");
4559
4560 run_async(task.start("router1", &host, &mut results, 0))
4561 .expect("start should record a skipped result");
4562
4563 let entries = logger.entries();
4564 assert!(entries.iter().any(|entry| {
4565 entry == "INFO task 'skipping' skipped for host 'router1' reason='filtered' message=''"
4566 }));
4567 assert!(entries.iter().any(|entry| {
4568 entry.contains(
4569 "INFO finished task 'skipping' for host 'router1' with status=skipped duration_ms=",
4570 ) && entry.contains(" duration=")
4571 }));
4572 }
4573
4574 #[test]
4575 fn task_failure_preserves_metadata_and_supports_downcast() {
4576 let failure = TaskFailure::new(TestTaskFailureError)
4577 .with_kind(TaskFailureKind::Connection)
4578 .with_retryable(true)
4579 .with_details(json!({"port": 22}))
4580 .with_warning("intermittent reachability")
4581 .with_message(TaskMessage::new(MessageLevel::Error, "ssh session failed"));
4582
4583 assert_eq!(failure.message(), "task failure test error");
4584 assert_eq!(failure.error().to_string(), "task failure test error");
4585 assert!(matches!(failure.kind(), TaskFailureKind::Connection));
4586 assert!(failure.retryable());
4587 assert_eq!(failure.details(), Some(&json!({"port": 22})));
4588 assert_eq!(failure.warnings(), ["intermittent reachability"]);
4589 assert_eq!(failure.messages()[0].text(), "ssh session failed");
4590 assert!(
4591 failure
4592 .error_type()
4593 .ends_with("task::tests::TestTaskFailureError")
4594 );
4595 assert!(failure.downcast_ref::<TestTaskFailureError>().is_some());
4596 }
4597
4598 #[test]
4599 fn task_failure_capture_supports_non_error_payloads() {
4600 let failure = TaskFailure::capture(ExternalFailurePayload { code: 42 })
4601 .with_kind(TaskFailureKind::Internal);
4602
4603 assert_eq!(failure.message(), "external failure code 42");
4604 assert!(
4605 failure
4606 .error()
4607 .to_string()
4608 .contains("external failure code 42")
4609 );
4610 assert!(
4611 failure
4612 .error_type()
4613 .ends_with("task::tests::ExternalFailurePayload")
4614 );
4615 let payload = failure
4616 .downcast_ref::<ExternalFailurePayload>()
4617 .expect("captured payload should be downcastable");
4618 assert_eq!(payload.code, 42);
4619 }
4620
4621 #[derive(Debug)]
4622 struct ExternalTaskError;
4623
4624 impl fmt::Display for ExternalTaskError {
4625 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4626 write!(f, "external task error")
4627 }
4628 }
4629
4630 impl Error for ExternalTaskError {}
4631
4632 struct ErroringTask;
4633
4634 impl TaskInfo for ErroringTask {
4635 fn name(&self) -> &str {
4636 "erroring"
4637 }
4638
4639 fn connection_plugin_name(&self) -> Option<&str> {
4640 Some("ssh")
4641 }
4642
4643 fn options(&self) -> Option<&Value> {
4644 None
4645 }
4646 }
4647
4648 #[async_trait]
4649 impl Task for ErroringTask {
4650 async fn start_async(
4651 &self,
4652 _host: &Host,
4653 _context: &TaskRuntimeContext,
4654 ) -> Result<HostTaskResult, TaskError> {
4655 Err(TaskError::new(ExternalTaskError))
4656 }
4657
4658 fn execution_mode(&self) -> TaskExecutionMode {
4659 TaskExecutionMode::Async
4660 }
4661 }
4662
4663 #[test]
4664 fn start_captures_task_errors_as_external_failures() {
4665 let task = TaskDefinition::new(ErroringTask);
4666 let host = Host::builder().hostname("router1").build();
4667 let mut results = TaskResults::new("erroring");
4668
4669 run_async(task.start("router1", &host, &mut results, 0))
4670 .expect("start should capture task error as host failure");
4671
4672 let failure = results
4673 .host_result("router1")
4674 .and_then(HostTaskResult::failure)
4675 .expect("task error should be recorded as failure");
4676 assert_eq!(failure.message(), "external task error");
4677 assert!(matches!(failure.kind(), TaskFailureKind::External));
4678 assert!(
4679 failure
4680 .error_type()
4681 .ends_with("task::tests::ExternalTaskError")
4682 );
4683 }
4684
4685 #[test]
4686 fn task_success_builders_expose_extended_metadata() {
4687 let started_at = SystemTime::UNIX_EPOCH;
4688 let finished_at = SystemTime::UNIX_EPOCH
4689 .checked_add(std::time::Duration::from_secs(2))
4690 .expect("valid timestamp");
4691 let success = TaskSuccess::new()
4692 .with_result(json!({"ok": true}))
4693 .with_changed(true)
4694 .with_diff("updated config")
4695 .with_summary("task completed")
4696 .with_warning("minor drift")
4697 .with_message(
4698 TaskMessage::new(MessageLevel::Info, "commit complete").with_code("commit_ok"),
4699 )
4700 .with_metadata(json!({"version": 1}))
4701 .with_started_at(started_at)
4702 .with_finished_at(finished_at)
4703 .with_duration_ms(2000);
4704
4705 assert_eq!(success.result(), Some(&json!({"ok": true})));
4706 assert!(success.changed());
4707 assert_eq!(success.diff(), Some("updated config"));
4708 assert_eq!(success.summary(), Some("task completed"));
4709 assert_eq!(success.warnings(), ["minor drift"]);
4710 assert_eq!(success.messages()[0].text(), "commit complete");
4711 assert_eq!(success.messages()[0].code(), Some("commit_ok"));
4712 assert!(matches!(success.messages()[0].level(), MessageLevel::Info));
4713 assert_eq!(success.metadata(), Some(&json!({"version": 1})));
4714 assert_eq!(success.started_at(), Some(started_at));
4715 assert_eq!(success.finished_at(), Some(finished_at));
4716 assert_eq!(success.duration_ms(), Some(2000));
4717 }
4718
4719 #[test]
4720 fn task_skip_and_host_task_result_expose_skip_metadata() {
4721 let skipped = HostTaskResult::Skipped(
4722 TaskSkip::new()
4723 .with_reason("filtered")
4724 .with_message("host excluded by selector"),
4725 );
4726
4727 assert!(skipped.is_skipped());
4728 assert_eq!(
4729 skipped.skipped_detail().and_then(TaskSkip::reason),
4730 Some("filtered")
4731 );
4732 assert_eq!(
4733 skipped.skipped_detail().and_then(TaskSkip::message),
4734 Some("host excluded by selector")
4735 );
4736
4737 let skipped_with_reason = HostTaskResult::skipped_with_reason("parent_failed");
4738 assert_eq!(
4739 skipped_with_reason
4740 .skipped_detail()
4741 .and_then(TaskSkip::reason),
4742 Some("parent_failed")
4743 );
4744 }
4745
4746 #[test]
4747 fn task_message_builders_expose_message_metadata() {
4748 let timestamp = SystemTime::UNIX_EPOCH
4749 .checked_add(std::time::Duration::from_secs(1))
4750 .expect("valid timestamp");
4751 let message = TaskMessage::new(MessageLevel::Warning, "latency threshold exceeded")
4752 .with_code("latency_warn")
4753 .with_timestamp(timestamp);
4754
4755 assert!(matches!(message.level(), MessageLevel::Warning));
4756 assert_eq!(message.text(), "latency threshold exceeded");
4757 assert_eq!(message.code(), Some("latency_warn"));
4758 assert_eq!(message.timestamp(), Some(timestamp));
4759 }
4760
4761 #[test]
4762 fn task_results_builders_expose_summary_and_timing_metadata() {
4763 let started_at = SystemTime::UNIX_EPOCH;
4764 let finished_at = SystemTime::UNIX_EPOCH
4765 .checked_add(std::time::Duration::from_secs(3))
4766 .expect("valid timestamp");
4767 let results = TaskResults::new("deploy")
4768 .with_summary("deploy finished")
4769 .with_started_at(started_at)
4770 .with_finished_at(finished_at)
4771 .with_duration_ms(3000);
4772
4773 assert_eq!(results.task_name(), "deploy");
4774 assert_eq!(results.summary(), Some("deploy finished"));
4775 assert_eq!(results.started_at(), Some(started_at));
4776 assert_eq!(results.finished_at(), Some(finished_at));
4777 assert_eq!(results.duration_ns(), Some(3_000_000_000));
4778 assert_eq!(results.duration_ms(), Some(3000));
4779 assert_eq!(
4780 results.started_at_display(),
4781 Some("1970-01-01T00:00:00Z".to_string())
4782 );
4783 assert_eq!(
4784 results.finished_at_display(),
4785 Some("1970-01-01T00:00:03Z".to_string())
4786 );
4787 assert_eq!(results.duration_display(), Some("3s".to_string()));
4788
4789 let json = results
4790 .to_json_string()
4791 .expect("human json should serialize");
4792 assert!(json.contains("\"started_at\":\"1970-01-01T00:00:00Z\""));
4793 assert!(json.contains("\"finished_at\":\"1970-01-01T00:00:03Z\""));
4794 assert!(json.contains("\"duration\":\"3s\""));
4795 assert!(!json.contains("\"duration_ns\":"));
4796 assert!(!json.contains("\"duration_ms\":"));
4797
4798 let raw_json = results
4799 .to_raw_json_string()
4800 .expect("raw json should serialize");
4801 assert!(!raw_json.contains("\"duration\":\"3s\""));
4802 assert!(raw_json.contains("\"duration_ns\":3000000000"));
4803 }
4804
4805 #[test]
4806 fn task_results_human_json_serializes_recursive_sub_tasks() {
4807 let child = TaskResults::new("child").with_duration_ms(250);
4808 let mut root = TaskResults::new("root").with_duration_ms(2000);
4809 root.insert_sub_task("child", child);
4810
4811 let json = root
4812 .to_json_string()
4813 .expect("human json should serialize recursively");
4814
4815 assert!(json.contains("\"task_name\":\"root\""));
4816 assert!(json.contains("\"sub_tasks\":{\"child\":{\"task_name\":\"child\""));
4817 assert!(json.contains("\"duration\":\"2s\""));
4818 assert!(json.contains("\"duration\":\"250ms\""));
4819 }
4820
4821 #[test]
4822 fn sub_task_results_human_json_includes_aggregate_timing() {
4823 let counter = Arc::new(AtomicUsize::new(0));
4824 let root = chain(2, counter.clone());
4825 let task = TaskDefinition::new(TestTask {
4826 name: "root",
4827 subs: vec![root],
4828 counter,
4829 });
4830 let host = Host::builder().hostname("router1").build();
4831 let mut results = TaskResults::new("root");
4832
4833 run_async(task.start("router1", &host, &mut results, 3)).expect("start should succeed");
4834
4835 let json = results
4836 .to_json_string()
4837 .expect("human json should serialize sub-task timing");
4838
4839 assert!(json.contains("\"sub_tasks\":{\"node\":{"));
4840 assert!(json.contains("\"started_at\":\""));
4841 assert!(json.contains("\"finished_at\":\""));
4842 assert!(json.contains("\"duration\":\""));
4843 }
4844
4845 #[test]
4846 fn processors_are_selected_per_task_not_inherited_by_sub_tasks() {
4847 let task_counter = Arc::new(AtomicUsize::new(0));
4848 let processor_calls = Arc::new(AtomicUsize::new(0));
4849 let child = Arc::new(ProcessorTask {
4850 name: "child",
4851 processors: vec!["selected".to_string()],
4852 });
4853 let root = TestTask {
4854 name: "root",
4855 subs: vec![child],
4856 counter: task_counter,
4857 };
4858 let processor = Arc::new(CountingProcessor {
4859 calls: Arc::clone(&processor_calls),
4860 });
4861 let resolver = Arc::new(TestProcessorResolver { processor });
4862 let task = TaskDefinition::new(root).with_processor_resolver(resolver);
4863 let host = Host::builder().hostname("router1").build();
4864 let mut results = TaskResults::new("root");
4865
4866 run_async(task.start("router1", &host, &mut results, 3))
4867 .expect("task execution should succeed");
4868
4869 assert_eq!(processor_calls.load(Ordering::SeqCst), 4);
4870 assert!(results.host_result("router1").is_some());
4871 assert!(results.sub_task("child").is_some());
4872 }
4873
4874 #[test]
4875 fn task_results_human_json_formats_host_timing_uniformly() {
4876 let started_at = SystemTime::UNIX_EPOCH;
4877 let finished_at = SystemTime::UNIX_EPOCH
4878 .checked_add(std::time::Duration::from_millis(2))
4879 .expect("valid timestamp");
4880 let mut results = TaskResults::new("deploy");
4881 results.insert_host_result(
4882 "router1",
4883 HostTaskResult::passed(
4884 TaskSuccess::new()
4885 .with_summary("ok")
4886 .with_started_at(started_at)
4887 .with_finished_at(finished_at)
4888 .with_duration_ns(2_000_000),
4889 ),
4890 );
4891 results.insert_host_result(
4892 "router2",
4893 HostTaskResult::failed(
4894 TaskFailure::new(TestTaskFailureError)
4895 .with_started_at(started_at)
4896 .with_finished_at(finished_at)
4897 .with_duration_ns(250_000),
4898 ),
4899 );
4900 results.insert_host_result(
4901 "router3",
4902 HostTaskResult::Skipped(TaskSkip::new().with_reason("filtered")),
4903 );
4904
4905 let json = results
4906 .to_json_string()
4907 .expect("human json should serialize host timing");
4908
4909 assert!(json.contains("\"router1\":{\"Passed\":{"));
4910 assert!(json.contains("\"summary\":\"ok\""));
4911 assert!(json.contains("\"started_at\":\"1970-01-01T00:00:00Z\""));
4912 assert!(json.contains("\"finished_at\":\"1970-01-01T00:00:00Z\""));
4913 assert!(json.contains("\"duration\":\"2ms\""));
4914 assert!(json.contains("\"router2\":{\"Failed\":"));
4915 assert!(json.contains("\"duration\":\"250us\""));
4916 assert!(json.contains("\"router3\":{\"Skipped\":{\"reason\":\"filtered\""));
4917 assert!(!json.contains("\"router3\":{\"Skipped\":{\"started_at\""));
4918 assert!(!json.contains("\"duration_ns\""));
4919 assert!(!json.contains("\"duration_ms\""));
4920 }
4921
4922 #[test]
4923 fn task_results_duration_display_preserves_sub_millisecond_precision() {
4924 let micros = TaskResults::new("micros").with_duration_ns(250_000);
4925 let nanos = TaskResults::new("nanos").with_duration_ns(250);
4926 let millis = TaskResults::new("millis").with_duration_ns(2_500_000);
4927 let seconds = TaskResults::new("seconds").with_duration_ns(1_500_587_737);
4928
4929 assert_eq!(micros.duration_ns(), Some(250_000));
4930 assert_eq!(micros.duration_ms(), Some(0));
4931 assert_eq!(micros.duration_display(), Some("250us".to_string()));
4932
4933 assert_eq!(nanos.duration_ns(), Some(250));
4934 assert_eq!(nanos.duration_ms(), Some(0));
4935 assert_eq!(nanos.duration_display(), Some("250ns".to_string()));
4936
4937 assert_eq!(millis.duration_display(), Some("2.5ms".to_string()));
4938 assert_eq!(seconds.duration_display(), Some("1.5s".to_string()));
4939 }
4940
4941 #[test]
4942 fn task_runtime_context_helpers_expose_execution_and_connection() {
4943 let execution = TaskExecutionContext::new(2, 5);
4944 let key = ConnectionKey::new("router1", "ssh");
4945 let connection = Arc::new(tokio::sync::Mutex::new(TestConnection {
4946 key: key.clone(),
4947 alive: true,
4948 }));
4949 let context = TaskRuntimeContext::new(execution.clone(), Some(connection));
4950
4951 assert_eq!(context.execution(), &execution);
4952 assert_eq!(context.current_depth(), 2);
4953 assert_eq!(context.max_depth(), 5);
4954 assert!(context.has_connection());
4955 assert!(context.connection().is_some());
4956
4957 let is_alive = context
4958 .with_connection(|connection| Ok(connection.is_alive()))
4959 .expect("connection helper should not fail");
4960 assert_eq!(is_alive, Some(true));
4961 }
4962
4963 #[test]
4964 fn task_runtime_context_with_connection_returns_none_without_connection() {
4965 let context = TaskRuntimeContext::new(TaskExecutionContext::new(0, 1), None);
4966
4967 let result = context
4968 .with_connection(|connection| Ok(connection.is_alive()))
4969 .expect("missing connection should not fail");
4970
4971 assert_eq!(result, None);
4972 assert!(!context.has_connection());
4973 }
4974
4975 #[test]
4976 fn task_results_store_recursive_host_and_sub_task_results() {
4977 let mut root = TaskResults::new("deploy").with_summary("deploy completed");
4978 root.insert_host_result(
4979 "router1",
4980 HostTaskResult::passed(
4981 TaskSuccess::new()
4982 .with_result(json!({"deployed": true}))
4983 .with_changed(true)
4984 .with_summary("config deployed")
4985 .with_warning("candidate config had comments")
4986 .with_message(TaskMessage::new(
4987 MessageLevel::Info,
4988 "candidate config committed",
4989 ))
4990 .with_metadata(json!({"version": "1.2.3"})),
4991 ),
4992 );
4993 root.insert_host_result(
4994 "router2",
4995 HostTaskResult::failed(
4996 TaskFailure::new(TestTaskFailureError)
4997 .with_kind(TaskFailureKind::Connection)
4998 .with_retryable(true),
4999 ),
5000 );
5001
5002 let mut validate = TaskResults::new("validate");
5003 validate.insert_host_result(
5004 "router1",
5005 HostTaskResult::passed(TaskSuccess::new().with_result(json!({"valid": true}))),
5006 );
5007 validate.insert_host_result(
5008 "router2",
5009 HostTaskResult::Skipped(
5010 TaskSkip::new()
5011 .with_reason("parent_failed")
5012 .with_message("validation skipped because deploy failed"),
5013 ),
5014 );
5015
5016 let mut collect_logs = TaskResults::new("collect_logs");
5017 collect_logs.insert_host_result(
5018 "router1",
5019 HostTaskResult::passed(TaskSuccess::new().with_diff("captured logs")),
5020 );
5021 collect_logs.insert_host_result(
5022 "router2",
5023 HostTaskResult::skipped_with_reason("parent_failed"),
5024 );
5025
5026 validate.insert_sub_task("collect_logs", collect_logs);
5027 root.insert_sub_task("validate", validate);
5028
5029 assert_eq!(root.task_name(), "deploy");
5030 assert_eq!(
5031 root.passed_hosts()
5032 .into_iter()
5033 .map(|host| host.as_str())
5034 .collect::<Vec<_>>(),
5035 vec!["router1"]
5036 );
5037 assert_eq!(
5038 root.failed_hosts()
5039 .into_iter()
5040 .map(|host| host.as_str())
5041 .collect::<Vec<_>>(),
5042 vec!["router2"]
5043 );
5044
5045 let validate = root
5046 .sub_task("validate")
5047 .expect("validate sub task should exist");
5048 assert_eq!(validate.task_name(), "validate");
5049 assert_eq!(root.summary(), Some("deploy completed"));
5050 assert_eq!(
5051 root.host_result("router1")
5052 .and_then(HostTaskResult::success)
5053 .and_then(TaskSuccess::summary),
5054 Some("config deployed")
5055 );
5056 assert_eq!(
5057 root.host_result("router1")
5058 .and_then(HostTaskResult::success)
5059 .map(TaskSuccess::warnings)
5060 .map(|warnings| warnings.len()),
5061 Some(1)
5062 );
5063 assert!(
5064 validate
5065 .host_result("router2")
5066 .expect("router2 validate result should exist")
5067 .is_skipped()
5068 );
5069 assert_eq!(
5070 validate
5071 .host_result("router2")
5072 .and_then(HostTaskResult::skipped_detail)
5073 .and_then(TaskSkip::reason),
5074 Some("parent_failed")
5075 );
5076
5077 let collect_logs = validate
5078 .sub_task("collect_logs")
5079 .expect("collect_logs sub task should exist");
5080 assert_eq!(collect_logs.task_name(), "collect_logs");
5081 assert_eq!(
5082 collect_logs
5083 .host_result("router1")
5084 .and_then(HostTaskResult::success)
5085 .and_then(TaskSuccess::diff),
5086 Some("captured logs")
5087 );
5088 assert_eq!(
5089 root.host_result("router2")
5090 .and_then(HostTaskResult::failure)
5091 .map(TaskFailure::retryable),
5092 Some(true)
5093 );
5094
5095 let root_summary = root.task_summary();
5096 assert_eq!(root_summary.task_name(), "deploy");
5097 assert_eq!(root_summary.hosts().passed(), 1);
5098 assert_eq!(root_summary.hosts().failed(), 1);
5099 assert_eq!(root_summary.hosts().skipped(), 0);
5100 assert_eq!(root_summary.hosts().total(), 2);
5101 assert_eq!(root_summary.duration_ms(), None);
5102
5103 let validate_summary = root_summary
5104 .sub_tasks()
5105 .get("validate")
5106 .expect("validate summary should exist");
5107 assert_eq!(validate_summary.task_name(), "validate");
5108 assert_eq!(validate_summary.hosts().passed(), 1);
5109 assert_eq!(validate_summary.hosts().failed(), 0);
5110 assert_eq!(validate_summary.hosts().skipped(), 1);
5111 assert_eq!(validate_summary.duration_ms(), None);
5112
5113 let collect_logs_summary = validate_summary
5114 .sub_tasks()
5115 .get("collect_logs")
5116 .expect("collect_logs summary should exist");
5117 assert_eq!(collect_logs_summary.task_name(), "collect_logs");
5118 assert_eq!(collect_logs_summary.hosts().passed(), 1);
5119 assert_eq!(collect_logs_summary.hosts().failed(), 0);
5120 assert_eq!(collect_logs_summary.hosts().skipped(), 1);
5121 }
5122}