use crate::inventory::{Connection, Host};
use crate::types::{CustomTreeMap, NatString};
use async_recursion::async_recursion;
use async_trait::async_trait;
use log::{debug, info, warn};
use serde::Serialize;
use serde_json::Value;
use std::any::{Any, type_name};
use std::error::Error;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::SystemTime;
use tokio::runtime::Handle;
use tokio::sync::Mutex;
use tokio::task;
#[derive(Clone)]
pub struct TaskError {
error: Arc<dyn Error + Send + Sync + 'static>,
error_type: String,
source: Option<Arc<dyn Any + Send + Sync + 'static>>,
}
type TaskFailureSource = Arc<dyn Any + Send + Sync + 'static>;
impl TaskError {
pub fn new<E>(error: E) -> Self
where
E: Error + Send + Sync + 'static,
{
let error = Arc::new(error);
Self {
error_type: type_name::<E>().to_string(),
source: Some(error.clone()),
error,
}
}
pub fn from_arc(error: Arc<dyn Error + Send + Sync + 'static>) -> Self {
Self {
error,
error_type: "dyn core::error::Error".to_string(),
source: None,
}
}
pub fn error(&self) -> &(dyn Error + Send + Sync + 'static) {
self.error.as_ref()
}
pub fn error_type(&self) -> &str {
&self.error_type
}
pub fn downcast_ref<E>(&self) -> Option<&E>
where
E: 'static,
{
self.source
.as_ref()
.and_then(|source| source.downcast_ref::<E>())
}
}
impl fmt::Debug for TaskError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TaskError")
.field("error_type", &self.error_type)
.field("message", &self.error.to_string())
.finish()
}
}
impl fmt::Display for TaskError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.error)
}
}
impl Error for TaskError {}
#[derive(Debug)]
struct CapturedTaskFailure {
message: String,
}
impl fmt::Display for CapturedTaskFailure {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.message)
}
}
impl Error for CapturedTaskFailure {}
fn format_timestamp_display(timestamp: SystemTime) -> String {
humantime::format_rfc3339_seconds(timestamp).to_string()
}
fn format_duration_display(duration_ns: u128) -> String {
if duration_ns < 1_000 {
return format!("{duration_ns}ns");
}
if duration_ns < 1_000_000 {
return format_decimal_unit(duration_ns as f64 / 1_000.0, "us");
}
if duration_ns < 1_000_000_000 {
return format_decimal_unit(duration_ns as f64 / 1_000_000.0, "ms");
}
if duration_ns < 60_000_000_000 {
return format_decimal_unit(duration_ns as f64 / 1_000_000_000.0, "s");
}
if duration_ns < 3_600_000_000_000 {
return format_decimal_unit(duration_ns as f64 / 60_000_000_000.0, "m");
}
format_decimal_unit(duration_ns as f64 / 3_600_000_000_000.0, "h")
}
fn format_decimal_unit(value: f64, unit: &str) -> String {
let precision = if value >= 100.0 {
0
} else if value >= 10.0 {
1
} else {
2
};
let formatted = format!("{value:.precision$}");
let trimmed = if let Some((whole, fractional)) = formatted.split_once('.') {
let fractional = fractional.trim_end_matches('0');
if fractional.is_empty() {
whole.to_string()
} else {
format!("{whole}.{fractional}")
}
} else {
formatted
};
format!("{trimmed}{unit}")
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct TaskResults {
task_name: String,
started_at: Option<SystemTime>,
finished_at: Option<SystemTime>,
duration_ns: Option<u128>,
duration_ms: Option<u128>,
summary: Option<String>,
hosts: CustomTreeMap<HostTaskResult>,
sub_tasks: CustomTreeMap<TaskResults>,
}
#[derive(Serialize)]
struct TaskResultsHumanJson<'a> {
task_name: &'a str,
started_at: Option<String>,
finished_at: Option<String>,
duration: Option<String>,
summary: Option<&'a str>,
hosts: CustomTreeMap<HostTaskResultHumanJson<'a>>,
sub_tasks: CustomTreeMap<TaskResultsHumanJson<'a>>,
}
#[derive(Serialize)]
enum HostTaskResultHumanJson<'a> {
Passed(TaskSuccessHumanJson<'a>),
Failed(TaskFailureHumanJson<'a>),
Skipped(TaskSkipHumanJson<'a>),
}
#[derive(Serialize)]
struct TaskSuccessHumanJson<'a> {
result: Option<&'a Value>,
changed: bool,
diff: Option<&'a str>,
summary: Option<&'a str>,
warnings: &'a [String],
messages: &'a [TaskMessage],
metadata: Option<&'a Value>,
started_at: Option<String>,
finished_at: Option<String>,
duration: Option<String>,
}
#[derive(Serialize)]
struct TaskFailureHumanJson<'a> {
kind: &'a TaskFailureKind,
error_type: &'a str,
message: &'a str,
retryable: bool,
details: Option<&'a Value>,
warnings: &'a [String],
messages: &'a [TaskMessage],
started_at: Option<String>,
finished_at: Option<String>,
duration: Option<String>,
}
#[derive(Serialize)]
struct TaskSkipHumanJson<'a> {
reason: Option<&'a str>,
message: Option<&'a str>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Default)]
pub struct TaskHostSummary {
passed: usize,
failed: usize,
skipped: usize,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct TaskResultsSummary {
task_name: String,
hosts: TaskHostSummary,
duration_ns: Option<u128>,
sub_tasks: CustomTreeMap<TaskResultsSummary>,
}
impl TaskHostSummary {
pub fn new(passed: usize, failed: usize, skipped: usize) -> Self {
Self {
passed,
failed,
skipped,
}
}
pub fn passed(&self) -> usize {
self.passed
}
pub fn failed(&self) -> usize {
self.failed
}
pub fn skipped(&self) -> usize {
self.skipped
}
pub fn total(&self) -> usize {
self.passed + self.failed + self.skipped
}
}
impl TaskResultsSummary {
pub fn task_name(&self) -> &str {
&self.task_name
}
pub fn hosts(&self) -> TaskHostSummary {
self.hosts
}
pub fn duration_ms(&self) -> Option<u128> {
self.duration_ns.map(|duration_ns| duration_ns / 1_000_000)
}
pub fn duration_display(&self) -> Option<String> {
self.duration_ns.map(format_duration_display)
}
pub fn sub_tasks(&self) -> &CustomTreeMap<TaskResultsSummary> {
&self.sub_tasks
}
}
impl<'a> From<&'a HostTaskResult> for HostTaskResultHumanJson<'a> {
fn from(result: &'a HostTaskResult) -> Self {
match result {
HostTaskResult::Passed(success) => Self::Passed(TaskSuccessHumanJson::from(success)),
HostTaskResult::Failed(failure) => Self::Failed(TaskFailureHumanJson::from(failure)),
HostTaskResult::Skipped(skip) => Self::Skipped(TaskSkipHumanJson::from(skip)),
}
}
}
impl<'a> From<&'a TaskSuccess> for TaskSuccessHumanJson<'a> {
fn from(success: &'a TaskSuccess) -> Self {
Self {
result: success.result(),
changed: success.changed(),
diff: success.diff(),
summary: success.summary(),
warnings: success.warnings(),
messages: success.messages(),
metadata: success.metadata(),
started_at: success.started_at_display(),
finished_at: success.finished_at_display(),
duration: success.duration_display(),
}
}
}
impl<'a> From<&'a TaskFailure> for TaskFailureHumanJson<'a> {
fn from(failure: &'a TaskFailure) -> Self {
Self {
kind: failure.kind(),
error_type: failure.error_type(),
message: failure.message(),
retryable: failure.retryable(),
details: failure.details(),
warnings: failure.warnings(),
messages: failure.messages(),
started_at: failure.started_at_display(),
finished_at: failure.finished_at_display(),
duration: failure.duration_display(),
}
}
}
impl<'a> From<&'a TaskSkip> for TaskSkipHumanJson<'a> {
fn from(skip: &'a TaskSkip) -> Self {
Self {
reason: skip.reason(),
message: skip.message(),
}
}
}
impl<'a> From<&'a TaskResults> for TaskResultsHumanJson<'a> {
fn from(results: &'a TaskResults) -> Self {
let mut hosts = CustomTreeMap::new();
for (hostname, host_result) in results.hosts().iter() {
hosts.insert(hostname, HostTaskResultHumanJson::from(host_result));
}
let mut sub_tasks = CustomTreeMap::new();
for (task_name, task_results) in results.sub_tasks().iter() {
sub_tasks.insert(task_name, TaskResultsHumanJson::from(task_results));
}
Self {
task_name: results.task_name(),
started_at: results.started_at_display(),
finished_at: results.finished_at_display(),
duration: results.duration_display(),
summary: results.summary(),
hosts,
sub_tasks,
}
}
}
impl TaskResults {
pub fn new(task_name: impl Into<String>) -> Self {
Self {
task_name: task_name.into(),
started_at: None,
finished_at: None,
duration_ns: None,
duration_ms: None,
summary: None,
hosts: CustomTreeMap::new(),
sub_tasks: CustomTreeMap::new(),
}
}
pub fn task_name(&self) -> &str {
&self.task_name
}
pub fn with_started_at(mut self, started_at: SystemTime) -> Self {
self.started_at = Some(started_at);
self
}
pub fn with_finished_at(mut self, finished_at: SystemTime) -> Self {
self.finished_at = Some(finished_at);
self
}
pub fn with_duration_ms(mut self, duration_ms: u128) -> Self {
self.duration_ns = Some(duration_ms.saturating_mul(1_000_000));
self.duration_ms = Some(duration_ms);
self
}
pub fn with_duration_ns(mut self, duration_ns: u128) -> Self {
self.duration_ns = Some(duration_ns);
self.duration_ms = Some(duration_ns / 1_000_000);
self
}
pub fn with_summary(mut self, summary: impl Into<String>) -> Self {
self.summary = Some(summary.into());
self
}
pub fn merge(&mut self, other: TaskResults) {
let mut other = other;
debug_assert_eq!(self.task_name, other.task_name);
if let (Some(started_at), Some(finished_at)) = (other.started_at, other.finished_at) {
self.record_execution_timing(started_at, finished_at);
} else {
if self.started_at.is_none() {
self.started_at = other.started_at;
}
if self.finished_at.is_none() {
self.finished_at = other.finished_at;
}
if self.duration_ns.is_none() {
self.duration_ns = other.duration_ns;
}
if self.duration_ms.is_none() {
self.duration_ms = other.duration_ms;
}
}
if self.summary.is_none() {
self.summary = other.summary;
}
for (hostname, result) in std::mem::take(&mut other.hosts).into_iter() {
self.insert_host_result(hostname, result);
}
for (task_name, sub_results) in std::mem::take(&mut other.sub_tasks).into_iter() {
if let Some(existing) = self.sub_task_mut(task_name.as_str()) {
existing.merge(sub_results);
} else {
self.insert_sub_task(task_name, sub_results);
}
}
}
fn record_execution_timing(&mut self, started_at: SystemTime, finished_at: SystemTime) {
if self.started_at.is_none_or(|current| started_at < current) {
self.started_at = Some(started_at);
}
if self.finished_at.is_none_or(|current| finished_at > current) {
self.finished_at = Some(finished_at);
}
if let (Some(started_at), Some(finished_at)) = (self.started_at, self.finished_at) {
let duration_ns = finished_at
.duration_since(started_at)
.map(|duration| duration.as_nanos())
.unwrap_or(0);
self.duration_ns = Some(duration_ns);
self.duration_ms = Some(duration_ns / 1_000_000);
}
}
pub fn started_at(&self) -> Option<SystemTime> {
self.started_at
}
pub fn finished_at(&self) -> Option<SystemTime> {
self.finished_at
}
pub fn duration_ms(&self) -> Option<u128> {
self.duration_ns
.map(|duration_ns| duration_ns / 1_000_000)
.or(self.duration_ms)
}
pub fn duration_ns(&self) -> Option<u128> {
self.duration_ns.or_else(|| {
self.duration_ms
.map(|duration_ms| duration_ms.saturating_mul(1_000_000))
})
}
pub fn started_at_display(&self) -> Option<String> {
self.started_at.map(format_timestamp_display)
}
pub fn finished_at_display(&self) -> Option<String> {
self.finished_at.map(format_timestamp_display)
}
pub fn duration_display(&self) -> Option<String> {
self.duration_ns().map(format_duration_display)
}
pub fn to_json_string(&self) -> Result<String, serde_json::Error> {
serde_json::to_string(&TaskResultsHumanJson::from(self))
}
pub fn to_pretty_json_string(&self) -> Result<String, serde_json::Error> {
serde_json::to_string_pretty(&TaskResultsHumanJson::from(self))
}
pub fn to_raw_json_string(&self) -> Result<String, serde_json::Error> {
serde_json::to_string(self)
}
pub fn to_raw_pretty_json_string(&self) -> Result<String, serde_json::Error> {
serde_json::to_string_pretty(self)
}
pub fn summary(&self) -> Option<&str> {
self.summary.as_deref()
}
pub fn insert_host_result<K>(&mut self, hostname: K, result: HostTaskResult)
where
K: Into<NatString>,
{
self.hosts.insert(hostname.into(), result);
}
pub fn host_result(&self, hostname: &str) -> Option<&HostTaskResult> {
self.hosts.get(hostname)
}
pub fn host_result_mut(&mut self, hostname: &str) -> Option<&mut HostTaskResult> {
self.hosts.get_mut(hostname)
}
pub fn hosts(&self) -> &CustomTreeMap<HostTaskResult> {
&self.hosts
}
pub fn insert_sub_task<K>(&mut self, task_name: K, results: TaskResults)
where
K: Into<NatString>,
{
self.sub_tasks.insert(task_name.into(), results);
}
pub fn sub_task(&self, task_name: &str) -> Option<&TaskResults> {
self.sub_tasks.get(task_name)
}
pub fn sub_task_mut(&mut self, task_name: &str) -> Option<&mut TaskResults> {
self.sub_tasks.get_mut(task_name)
}
pub fn sub_tasks(&self) -> &CustomTreeMap<TaskResults> {
&self.sub_tasks
}
pub fn passed_hosts(&self) -> Vec<&NatString> {
self.hosts
.iter()
.filter_map(|(host, result)| result.is_passed().then_some(host))
.collect()
}
pub fn failed_hosts(&self) -> Vec<&NatString> {
self.hosts
.iter()
.filter_map(|(host, result)| result.is_failed().then_some(host))
.collect()
}
pub fn skipped_hosts(&self) -> Vec<&NatString> {
self.hosts
.iter()
.filter_map(|(host, result)| result.is_skipped().then_some(host))
.collect()
}
pub fn host_summary(&self) -> TaskHostSummary {
TaskHostSummary::new(
self.passed_hosts().len(),
self.failed_hosts().len(),
self.skipped_hosts().len(),
)
}
pub fn task_summary(&self) -> TaskResultsSummary {
let mut sub_tasks = CustomTreeMap::new();
for (task_name, task_results) in self.sub_tasks().iter() {
sub_tasks.insert(task_name, task_results.task_summary());
}
TaskResultsSummary {
task_name: self.task_name.clone(),
hosts: self.host_summary(),
duration_ns: self.duration_ns(),
sub_tasks,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub enum HostTaskResult {
Passed(TaskSuccess),
Failed(TaskFailure),
Skipped(TaskSkip),
}
impl HostTaskResult {
pub fn passed(result: TaskSuccess) -> Self {
Self::Passed(result)
}
pub fn failed(failure: TaskFailure) -> Self {
Self::Failed(failure)
}
pub fn skipped() -> Self {
Self::Skipped(TaskSkip::default())
}
pub fn skipped_with_reason(reason: impl Into<String>) -> Self {
Self::Skipped(TaskSkip::new().with_reason(reason))
}
pub fn is_passed(&self) -> bool {
matches!(self, Self::Passed(_))
}
pub fn is_failed(&self) -> bool {
matches!(self, Self::Failed(_))
}
pub fn is_skipped(&self) -> bool {
matches!(self, Self::Skipped(_))
}
pub fn success(&self) -> Option<&TaskSuccess> {
match self {
Self::Passed(success) => Some(success),
Self::Failed(_) | Self::Skipped(_) => None,
}
}
pub fn failure(&self) -> Option<&TaskFailure> {
match self {
Self::Failed(failure) => Some(failure),
Self::Passed(_) | Self::Skipped(_) => None,
}
}
pub fn skipped_detail(&self) -> Option<&TaskSkip> {
match self {
Self::Skipped(skip) => Some(skip),
Self::Passed(_) | Self::Failed(_) => None,
}
}
fn with_execution_timing(
self,
started_at: SystemTime,
finished_at: SystemTime,
duration_ns: u128,
) -> Self {
match self {
Self::Passed(success) => Self::Passed(
success
.with_started_at(started_at)
.with_finished_at(finished_at)
.with_duration_ns(duration_ns),
),
Self::Failed(failure) => Self::Failed(
failure
.with_started_at(started_at)
.with_finished_at(finished_at)
.with_duration_ns(duration_ns),
),
Self::Skipped(skip) => Self::Skipped(skip),
}
}
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct TaskSuccess {
result: Option<Value>,
changed: bool,
diff: Option<String>,
summary: Option<String>,
warnings: Vec<String>,
messages: Vec<TaskMessage>,
metadata: Option<Value>,
started_at: Option<SystemTime>,
finished_at: Option<SystemTime>,
duration_ns: Option<u128>,
duration_ms: Option<u128>,
}
impl TaskSuccess {
pub fn new() -> Self {
Self::default()
}
pub fn with_result(mut self, result: Value) -> Self {
self.result = Some(result);
self
}
pub fn with_changed(mut self, changed: bool) -> Self {
self.changed = changed;
self
}
pub fn with_diff(mut self, diff: impl Into<String>) -> Self {
self.diff = Some(diff.into());
self
}
pub fn with_summary(mut self, summary: impl Into<String>) -> Self {
self.summary = Some(summary.into());
self
}
pub fn with_warning(mut self, warning: impl Into<String>) -> Self {
self.warnings.push(warning.into());
self
}
pub fn with_message(mut self, message: TaskMessage) -> Self {
self.messages.push(message);
self
}
pub fn with_metadata(mut self, metadata: Value) -> Self {
self.metadata = Some(metadata);
self
}
pub fn with_started_at(mut self, started_at: SystemTime) -> Self {
self.started_at = Some(started_at);
self
}
pub fn with_finished_at(mut self, finished_at: SystemTime) -> Self {
self.finished_at = Some(finished_at);
self
}
pub fn with_duration_ms(mut self, duration_ms: u128) -> Self {
self.duration_ns = Some(duration_ms.saturating_mul(1_000_000));
self.duration_ms = Some(duration_ms);
self
}
pub fn with_duration_ns(mut self, duration_ns: u128) -> Self {
self.duration_ns = Some(duration_ns);
self.duration_ms = Some(duration_ns / 1_000_000);
self
}
pub fn result(&self) -> Option<&Value> {
self.result.as_ref()
}
pub fn changed(&self) -> bool {
self.changed
}
pub fn diff(&self) -> Option<&str> {
self.diff.as_deref()
}
pub fn summary(&self) -> Option<&str> {
self.summary.as_deref()
}
pub fn warnings(&self) -> &[String] {
&self.warnings
}
pub fn messages(&self) -> &[TaskMessage] {
&self.messages
}
pub fn metadata(&self) -> Option<&Value> {
self.metadata.as_ref()
}
pub fn started_at(&self) -> Option<SystemTime> {
self.started_at
}
pub fn finished_at(&self) -> Option<SystemTime> {
self.finished_at
}
pub fn duration_ms(&self) -> Option<u128> {
self.duration_ns
.map(|duration_ns| duration_ns / 1_000_000)
.or(self.duration_ms)
}
pub fn duration_ns(&self) -> Option<u128> {
self.duration_ns.or_else(|| {
self.duration_ms
.map(|duration_ms| duration_ms.saturating_mul(1_000_000))
})
}
pub fn started_at_display(&self) -> Option<String> {
self.started_at.map(format_timestamp_display)
}
pub fn finished_at_display(&self) -> Option<String> {
self.finished_at.map(format_timestamp_display)
}
pub fn duration_display(&self) -> Option<String> {
self.duration_ns().map(format_duration_display)
}
}
#[derive(Debug, Clone, Serialize)]
pub struct TaskFailure {
#[serde(skip)]
error: TaskError,
#[serde(skip)]
source: Option<TaskFailureSource>,
kind: TaskFailureKind,
error_type: String,
message: String,
retryable: bool,
details: Option<Value>,
warnings: Vec<String>,
messages: Vec<TaskMessage>,
started_at: Option<SystemTime>,
finished_at: Option<SystemTime>,
duration_ns: Option<u128>,
duration_ms: Option<u128>,
}
impl TaskFailure {
pub fn new<E>(error: E) -> Self
where
E: Error + Send + Sync + 'static,
{
let source = Arc::new(error);
let message = source.to_string();
let error_type = type_name::<E>().to_string();
Self {
kind: TaskFailureKind::Internal,
error_type: error_type.clone(),
message,
error: TaskError {
error: source.clone(),
error_type,
source: Some(source.clone()),
},
source: Some(source),
retryable: false,
details: None,
warnings: Vec::new(),
messages: Vec::new(),
started_at: None,
finished_at: None,
duration_ns: None,
duration_ms: None,
}
}
pub fn capture<E>(error: E) -> Self
where
E: fmt::Debug + fmt::Display + Send + Sync + 'static,
{
let source = Arc::new(error);
let message = source.to_string();
let error_type = type_name::<E>().to_string();
let wrapped_error = Arc::new(CapturedTaskFailure {
message: message.clone(),
});
Self {
kind: TaskFailureKind::Internal,
error_type: error_type.clone(),
message,
error: TaskError {
error: wrapped_error,
error_type,
source: Some(source.clone()),
},
source: Some(source),
retryable: false,
details: None,
warnings: Vec::new(),
messages: Vec::new(),
started_at: None,
finished_at: None,
duration_ns: None,
duration_ms: None,
}
}
pub fn from_task_error(error: TaskError) -> Self {
Self {
kind: TaskFailureKind::External,
error_type: error.error_type().to_string(),
message: error.to_string(),
error: error.clone(),
source: error.source,
retryable: false,
details: None,
warnings: Vec::new(),
messages: Vec::new(),
started_at: None,
finished_at: None,
duration_ns: None,
duration_ms: None,
}
}
pub fn with_kind(mut self, kind: TaskFailureKind) -> Self {
self.kind = kind;
self
}
pub fn with_retryable(mut self, retryable: bool) -> Self {
self.retryable = retryable;
self
}
pub fn with_details(mut self, details: Value) -> Self {
self.details = Some(details);
self
}
pub fn with_warning(mut self, warning: impl Into<String>) -> Self {
self.warnings.push(warning.into());
self
}
pub fn with_message(mut self, message: TaskMessage) -> Self {
self.messages.push(message);
self
}
pub fn with_started_at(mut self, started_at: SystemTime) -> Self {
self.started_at = Some(started_at);
self
}
pub fn with_finished_at(mut self, finished_at: SystemTime) -> Self {
self.finished_at = Some(finished_at);
self
}
pub fn with_duration_ms(mut self, duration_ms: u128) -> Self {
self.duration_ns = Some(duration_ms.saturating_mul(1_000_000));
self.duration_ms = Some(duration_ms);
self
}
pub fn with_duration_ns(mut self, duration_ns: u128) -> Self {
self.duration_ns = Some(duration_ns);
self.duration_ms = Some(duration_ns / 1_000_000);
self
}
pub fn downcast_ref<E>(&self) -> Option<&E>
where
E: 'static,
{
self.source
.as_ref()
.and_then(|source| source.downcast_ref::<E>())
}
pub fn error(&self) -> &(dyn Error + Send + Sync + 'static) {
self.error.error()
}
pub fn error_type(&self) -> &str {
&self.error_type
}
pub fn message(&self) -> &str {
&self.message
}
pub fn kind(&self) -> &TaskFailureKind {
&self.kind
}
pub fn retryable(&self) -> bool {
self.retryable
}
pub fn details(&self) -> Option<&Value> {
self.details.as_ref()
}
pub fn warnings(&self) -> &[String] {
&self.warnings
}
pub fn messages(&self) -> &[TaskMessage] {
&self.messages
}
pub fn started_at(&self) -> Option<SystemTime> {
self.started_at
}
pub fn finished_at(&self) -> Option<SystemTime> {
self.finished_at
}
pub fn duration_ms(&self) -> Option<u128> {
self.duration_ns
.map(|duration_ns| duration_ns / 1_000_000)
.or(self.duration_ms)
}
pub fn duration_ns(&self) -> Option<u128> {
self.duration_ns.or_else(|| {
self.duration_ms
.map(|duration_ms| duration_ms.saturating_mul(1_000_000))
})
}
pub fn started_at_display(&self) -> Option<String> {
self.started_at.map(format_timestamp_display)
}
pub fn finished_at_display(&self) -> Option<String> {
self.finished_at.map(format_timestamp_display)
}
pub fn duration_display(&self) -> Option<String> {
self.duration_ns().map(format_duration_display)
}
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct TaskSkip {
reason: Option<String>,
message: Option<String>,
}
impl TaskSkip {
pub fn new() -> Self {
Self::default()
}
pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
self.reason = Some(reason.into());
self
}
pub fn with_message(mut self, message: impl Into<String>) -> Self {
self.message = Some(message.into());
self
}
pub fn reason(&self) -> Option<&str> {
self.reason.as_deref()
}
pub fn message(&self) -> Option<&str> {
self.message.as_deref()
}
}
#[derive(Debug, Clone, Serialize)]
pub struct TaskMessage {
level: MessageLevel,
text: String,
code: Option<String>,
timestamp: Option<SystemTime>,
}
impl TaskMessage {
pub fn new(level: MessageLevel, text: impl Into<String>) -> Self {
Self {
level,
text: text.into(),
code: None,
timestamp: None,
}
}
pub fn with_code(mut self, code: impl Into<String>) -> Self {
self.code = Some(code.into());
self
}
pub fn with_timestamp(mut self, timestamp: SystemTime) -> Self {
self.timestamp = Some(timestamp);
self
}
pub fn level(&self) -> &MessageLevel {
&self.level
}
pub fn text(&self) -> &str {
&self.text
}
pub fn code(&self) -> Option<&str> {
self.code.as_deref()
}
pub fn timestamp(&self) -> Option<SystemTime> {
self.timestamp
}
}
#[derive(Debug, Clone, Serialize)]
pub enum MessageLevel {
Info,
Warning,
Error,
Debug,
}
#[derive(Debug, Clone, Serialize)]
pub enum TaskFailureKind {
Connection,
Authentication,
Validation,
Timeout,
Command,
Unsupported,
Internal,
External,
}
pub trait TaskInfo {
fn name(&self) -> &str;
fn connection_plugin_name(&self) -> Option<&str> {
None
}
fn get_connection_key(&self, hostname: &str) -> Option<crate::inventory::ConnectionKey> {
self.connection_plugin_name()
.map(|plugin_name| crate::inventory::ConnectionKey::new(hostname, plugin_name))
}
fn options(&self) -> Option<&Value> {
None
}
fn processor_names(&self) -> Vec<&str> {
Vec::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TaskExecutionMode {
Blocking,
Async,
}
#[async_trait]
pub trait Task: TaskInfo + Send + Sync {
fn start(
&self,
host: &Host,
context: &BlockingTaskRuntimeContext,
) -> Result<HostTaskResult, TaskError> {
let _ = (host, context);
Err(TaskError::new(std::io::Error::other(
"blocking start() not implemented",
)))
}
async fn start_async(
&self,
host: &Host,
context: &TaskRuntimeContext,
) -> Result<HostTaskResult, TaskError> {
let _ = (host, context);
Err(TaskError::new(std::io::Error::other(
"async start_async() not implemented",
)))
}
fn sub_tasks(&self) -> Vec<Arc<dyn Task>> {
Vec::new()
}
fn execution_mode(&self) -> TaskExecutionMode;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TaskExecutionContext {
current_depth: usize,
max_depth: usize,
}
impl TaskExecutionContext {
pub fn new(current_depth: usize, max_depth: usize) -> Self {
Self {
current_depth,
max_depth,
}
}
pub fn current_depth(&self) -> usize {
self.current_depth
}
pub fn max_depth(&self) -> usize {
self.max_depth
}
}
#[derive(Debug, Clone)]
pub struct TaskRuntimeContext {
execution: TaskExecutionContext,
connection: Option<Arc<Mutex<dyn Connection>>>,
}
impl TaskRuntimeContext {
pub fn new(
execution: TaskExecutionContext,
connection: Option<Arc<Mutex<dyn Connection>>>,
) -> Self {
Self {
execution,
connection,
}
}
pub fn execution(&self) -> &TaskExecutionContext {
&self.execution
}
pub fn current_depth(&self) -> usize {
self.execution.current_depth()
}
pub fn max_depth(&self) -> usize {
self.execution.max_depth()
}
pub fn connection(&self) -> Option<&Arc<Mutex<dyn Connection>>> {
self.connection.as_ref()
}
pub fn has_connection(&self) -> bool {
self.connection.is_some()
}
pub fn with_connection<R>(
&self,
f: impl FnOnce(&mut dyn Connection) -> Result<R, TaskError>,
) -> Result<Option<R>, TaskError> {
let Some(connection) = &self.connection else {
return Ok(None);
};
let mut guard = connection.blocking_lock();
f(&mut *guard).map(Some)
}
pub async fn execute_command(&self, command: &str) -> Result<Option<String>, TaskError> {
let Some(connection) = &self.connection else {
return Ok(None);
};
let mut guard = connection.lock().await;
guard
.execute_command(command)
.await
.map(Some)
.map_err(|err| TaskError::new(std::io::Error::other(err)))
}
}
#[derive(Clone)]
pub struct BlockingTaskConnection {
inner: Arc<Mutex<dyn Connection>>,
runtime_handle: Handle,
}
impl fmt::Debug for BlockingTaskConnection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BlockingTaskConnection").finish()
}
}
impl BlockingTaskConnection {
pub fn new(inner: Arc<Mutex<dyn Connection>>, runtime_handle: Handle) -> Self {
Self {
inner,
runtime_handle,
}
}
pub fn with_connection<R>(
&self,
f: impl FnOnce(&mut dyn Connection) -> Result<R, TaskError>,
) -> Result<R, TaskError> {
let mut guard = self.inner.blocking_lock();
f(&mut *guard)
}
pub fn execute_command(&self, command: &str) -> Result<String, TaskError> {
let command = command.to_string();
let connection = Arc::clone(&self.inner);
self.runtime_handle
.block_on(async move {
let mut guard = connection.lock().await;
guard.execute_command(&command).await
})
.map_err(|err| TaskError::new(std::io::Error::other(err)))
}
}
#[derive(Debug, Clone)]
pub struct BlockingTaskRuntimeContext {
execution: TaskExecutionContext,
connection: Option<BlockingTaskConnection>,
}
impl BlockingTaskRuntimeContext {
pub fn new(
execution: TaskExecutionContext,
connection: Option<Arc<Mutex<dyn Connection>>>,
runtime_handle: Handle,
) -> Self {
Self {
execution,
connection: connection
.map(|connection| BlockingTaskConnection::new(connection, runtime_handle)),
}
}
pub fn execution(&self) -> &TaskExecutionContext {
&self.execution
}
pub fn current_depth(&self) -> usize {
self.execution.current_depth()
}
pub fn max_depth(&self) -> usize {
self.execution.max_depth()
}
pub fn connection(&self) -> Option<&BlockingTaskConnection> {
self.connection.as_ref()
}
pub fn has_connection(&self) -> bool {
self.connection.is_some()
}
pub fn with_connection<R>(
&self,
f: impl FnOnce(&mut dyn Connection) -> Result<R, TaskError>,
) -> Result<Option<R>, TaskError> {
let Some(connection) = &self.connection else {
return Ok(None);
};
connection.with_connection(f).map(Some)
}
pub fn execute_command(&self, command: &str) -> Result<Option<String>, TaskError> {
let Some(connection) = &self.connection else {
return Ok(None);
};
connection.execute_command(command).map(Some)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TaskProcessorContext {
task_name: String,
parent_task_name: Option<String>,
depth: usize,
hostname: Option<String>,
}
impl TaskProcessorContext {
pub fn new(
task_name: impl Into<String>,
parent_task_name: Option<impl Into<String>>,
depth: usize,
hostname: Option<impl Into<String>>,
) -> Self {
Self {
task_name: task_name.into(),
parent_task_name: parent_task_name.map(Into::into),
depth,
hostname: hostname.map(Into::into),
}
}
pub fn task_name(&self) -> &str {
&self.task_name
}
pub fn parent_task_name(&self) -> Option<&str> {
self.parent_task_name.as_deref()
}
pub fn depth(&self) -> usize {
self.depth
}
pub fn hostname(&self) -> Option<&str> {
self.hostname.as_deref()
}
pub fn is_sub_task(&self) -> bool {
self.parent_task_name.is_some()
}
}
pub trait TaskProcessor: Send + Sync {
fn on_task_start(
&self,
_context: &TaskProcessorContext,
_results: &mut TaskResults,
) -> Result<(), crate::GenjaError> {
Ok(())
}
fn on_task_finish(
&self,
_context: &TaskProcessorContext,
_results: &mut TaskResults,
) -> Result<(), crate::GenjaError> {
Ok(())
}
fn on_instance_start(&self, _context: &TaskProcessorContext) -> Result<(), crate::GenjaError> {
Ok(())
}
fn on_instance_finish(
&self,
_context: &TaskProcessorContext,
_result: &mut HostTaskResult,
) -> Result<(), crate::GenjaError> {
Ok(())
}
}
impl fmt::Debug for dyn TaskProcessor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TaskProcessor")
}
}
pub trait TaskProcessorResolver: Send + Sync {
fn resolve_task_processor(&self, name: &str) -> Option<Arc<dyn TaskProcessor>>;
}
#[async_trait]
pub trait TaskConnectionResolver: Send + Sync {
async fn resolve_task_connection(
&self,
task: &dyn Task,
hostname: &str,
) -> Result<Option<Arc<Mutex<dyn Connection>>>, crate::GenjaError>;
}
impl fmt::Debug for dyn TaskProcessorResolver {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TaskProcessorResolver")
}
}
impl fmt::Debug for dyn TaskConnectionResolver {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TaskConnectionResolver")
}
}
#[derive(Clone)]
pub struct TaskDefinition {
inner: Arc<dyn Task>,
processor_resolver: Option<Arc<dyn TaskProcessorResolver>>,
processor_names: Arc<Vec<String>>,
}
impl TaskDefinition {
pub fn new<T: Task + 'static>(task: T) -> Self {
Self {
inner: Arc::new(task),
processor_resolver: None,
processor_names: Arc::new(Vec::new()),
}
}
pub fn as_task(&self) -> &dyn Task {
self.inner.as_ref()
}
pub fn with_processor(mut self, processor_name: impl Into<String>) -> Self {
Arc::make_mut(&mut self.processor_names).push(processor_name.into());
self
}
pub fn with_processors<I, S>(mut self, processor_names: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
Arc::make_mut(&mut self.processor_names)
.extend(processor_names.into_iter().map(Into::into));
self
}
pub fn with_processor_resolver(
mut self,
processor_resolver: Arc<dyn TaskProcessorResolver>,
) -> Self {
self.processor_resolver = Some(processor_resolver);
self
}
pub fn processor_names(&self) -> Vec<&str> {
if self.processor_names.is_empty() {
self.inner.processor_names()
} else {
self.processor_names.iter().map(String::as_str).collect()
}
}
fn processors_for(
&self,
task: &dyn Task,
) -> Result<Vec<Arc<dyn TaskProcessor>>, crate::GenjaError> {
Self::resolve_processors(
self.processor_resolver.as_deref(),
&self.processor_names_for(task),
)
}
fn processor_names_for<'a>(&'a self, task: &'a dyn Task) -> Vec<&'a str> {
if std::ptr::eq(task, self.inner.as_ref()) && !self.processor_names.is_empty() {
self.processor_names.iter().map(String::as_str).collect()
} else {
task.processor_names()
}
}
fn resolve_processors(
processor_resolver: Option<&dyn TaskProcessorResolver>,
processor_names: &[&str],
) -> Result<Vec<Arc<dyn TaskProcessor>>, crate::GenjaError> {
let Some(processor_resolver) = processor_resolver else {
if processor_names.is_empty() {
return Ok(Vec::new());
}
return Err(crate::GenjaError::PluginNotFound(
processor_names[0].to_string(),
));
};
processor_names
.iter()
.map(|name| {
processor_resolver
.resolve_task_processor(name)
.ok_or_else(|| crate::GenjaError::PluginNotFound((*name).to_string()))
})
.collect()
}
}
impl TaskDefinition {
pub async fn start(
&self,
hostname: &str,
host: &Host,
results: &mut TaskResults,
max_depth: usize,
) -> Result<(), crate::GenjaError> {
Self::start_with_depth(
Arc::clone(&self.inner),
hostname,
host,
results,
None,
self.processor_resolver.as_deref(),
self.processor_names(),
None,
0,
max_depth,
)
.await
}
pub async fn start_with_connection_resolver(
&self,
hostname: &str,
host: &Host,
results: &mut TaskResults,
connection_resolver: Option<&dyn TaskConnectionResolver>,
max_depth: usize,
) -> Result<(), crate::GenjaError> {
Self::start_with_depth(
Arc::clone(&self.inner),
hostname,
host,
results,
connection_resolver,
self.processor_resolver.as_deref(),
self.processor_names(),
None,
0,
max_depth,
)
.await
}
pub fn process_task_start(&self, results: &mut TaskResults) -> Result<(), crate::GenjaError> {
let context = TaskProcessorContext::new(self.name(), None::<&str>, 0, None::<&str>);
for processor in self.processors_for(self.inner.as_ref())? {
processor.on_task_start(&context, results)?;
}
Ok(())
}
pub fn process_task_finish(&self, results: &mut TaskResults) -> Result<(), crate::GenjaError> {
let context = TaskProcessorContext::new(self.name(), None::<&str>, 0, None::<&str>);
for processor in self.processors_for(self.inner.as_ref())? {
processor.on_task_finish(&context, results)?;
}
Ok(())
}
#[async_recursion]
#[allow(clippy::too_many_arguments)]
async fn start_with_depth(
task: Arc<dyn Task>,
hostname: &str,
host: &Host,
results: &mut TaskResults,
connection_resolver: Option<&dyn TaskConnectionResolver>,
processor_resolver: Option<&dyn TaskProcessorResolver>,
processor_names: Vec<&str>,
parent_task_name: Option<&str>,
depth: usize,
max_depth: usize,
) -> Result<(), crate::GenjaError> {
if depth > max_depth {
let started_at = SystemTime::now();
let finished_at = started_at;
let error =
crate::GenjaError::Message(format!("max task depth exceeded: {}", max_depth));
warn!(
"max task depth exceeded for task '{}' at depth {} with max_depth {}",
task.name(),
depth,
max_depth
);
results.record_execution_timing(started_at, finished_at);
results.insert_host_result(
hostname,
HostTaskResult::failed(
TaskFailure::new(error)
.with_kind(TaskFailureKind::Internal)
.with_started_at(started_at)
.with_finished_at(finished_at)
.with_duration_ns(0),
),
);
return Ok(());
}
let started_at = SystemTime::now();
let parent_task = parent_task_name.unwrap_or("none");
debug!(
"starting task '{}' for host '{}' parent_task='{}' depth={} max_depth={} has_connection={}",
task.name(),
hostname,
parent_task,
depth,
max_depth,
connection_resolver.is_some()
);
let processor_context =
TaskProcessorContext::new(task.name(), parent_task_name, depth, Some(hostname));
let processors = Self::resolve_processors(processor_resolver, &processor_names)?;
for processor in &processors {
processor.on_instance_start(&processor_context)?;
}
let connection = if let Some(connection_resolver) = connection_resolver {
match connection_resolver
.resolve_task_connection(task.as_ref(), hostname)
.await
{
Ok(connection) => connection,
Err(error) => {
let finished_at = SystemTime::now();
let duration_ns = finished_at
.duration_since(started_at)
.map(|duration| duration.as_nanos())
.unwrap_or(0);
results.record_execution_timing(started_at, finished_at);
warn!(
"task '{}' failed to open connection for host '{}': {}",
task.name(),
hostname,
error
);
let mut host_result = HostTaskResult::failed(
TaskFailure::new(error)
.with_kind(TaskFailureKind::Connection)
.with_started_at(started_at)
.with_finished_at(finished_at)
.with_duration_ns(duration_ns),
);
for processor in &processors {
processor.on_instance_finish(&processor_context, &mut host_result)?;
}
results.insert_host_result(hostname, host_result);
return Ok(());
}
}
} else {
None
};
let execution_context = TaskExecutionContext::new(depth, max_depth);
let host_result = match task.execution_mode() {
TaskExecutionMode::Async => {
let runtime_context =
TaskRuntimeContext::new(execution_context, connection.clone());
task.start_async(host, &runtime_context).await
}
TaskExecutionMode::Blocking => {
let blocking_context = BlockingTaskRuntimeContext::new(
execution_context,
connection.clone(),
Handle::current(),
);
let task = Arc::clone(&task);
let host = host.clone();
match task::spawn_blocking(move || task.start(&host, &blocking_context)).await {
Ok(result) => result,
Err(err) => Err(TaskError::new(std::io::Error::other(format!(
"blocking task join error: {err}"
)))),
}
}
};
let finished_at = SystemTime::now();
let duration_ns = finished_at
.duration_since(started_at)
.map(|duration| duration.as_nanos())
.unwrap_or(0);
results.record_execution_timing(started_at, finished_at);
let host_result = match host_result {
Ok(host_result) => host_result,
Err(error) => {
let failure = TaskFailure::from_task_error(error)
.with_started_at(started_at)
.with_finished_at(finished_at)
.with_duration_ns(duration_ns);
warn!(
"task '{}' failed for host '{}': {}",
task.name(),
hostname,
failure.message()
);
let duration_display = failure
.duration_display()
.unwrap_or_else(|| format_duration_display(duration_ns));
info!(
"finished task '{}' for host '{}' with status=failed duration_ms={} duration={}",
task.name(),
hostname,
duration_ns / 1_000_000,
duration_display
);
let mut host_result = HostTaskResult::failed(failure);
for processor in &processors {
processor.on_instance_finish(&processor_context, &mut host_result)?;
}
results.insert_host_result(hostname, host_result);
return Ok(());
}
};
if let Some(failure) = host_result.failure() {
warn!(
"task '{}' failed for host '{}': {}",
task.name(),
hostname,
failure.message()
);
}
if let Some(skip) = host_result.skipped_detail() {
info!(
"task '{}' skipped for host '{}' reason='{}' message='{}'",
task.name(),
hostname,
skip.reason().unwrap_or("none"),
skip.message().unwrap_or("")
);
}
let status = if host_result.is_passed() {
"passed"
} else if host_result.is_failed() {
"failed"
} else {
"skipped"
};
let mut host_result =
host_result.with_execution_timing(started_at, finished_at, duration_ns);
let duration_display = match &host_result {
HostTaskResult::Passed(success) => success
.duration_display()
.unwrap_or_else(|| format_duration_display(duration_ns)),
HostTaskResult::Failed(failure) => failure
.duration_display()
.unwrap_or_else(|| format_duration_display(duration_ns)),
HostTaskResult::Skipped(_) => format_duration_display(duration_ns),
};
info!(
"finished task '{}' for host '{}' with status={} duration_ms={} duration={}",
task.name(),
hostname,
status,
duration_ns / 1_000_000,
duration_display
);
for processor in &processors {
processor.on_instance_finish(&processor_context, &mut host_result)?;
}
results.insert_host_result(hostname, host_result);
for sub in task.sub_tasks() {
let sub_task_name = sub.name().to_string();
if results.sub_task(&sub_task_name).is_none() {
results.insert_sub_task(sub_task_name.clone(), TaskResults::new(&sub_task_name));
}
let sub_results = results
.sub_task_mut(&sub_task_name)
.expect("sub task results should exist after insertion");
let sub_processor_names = sub.processor_names();
let sub_processors =
Self::resolve_processors(processor_resolver, &sub_processor_names)?;
let mut sub_task_started = false;
if !sub_processors.is_empty()
&& sub_results.hosts().is_empty()
&& sub_results.sub_tasks().is_empty()
{
let sub_context = TaskProcessorContext::new(
sub_task_name.as_str(),
Some(task.name()),
depth + 1,
None::<&str>,
);
for processor in &sub_processors {
processor.on_task_start(&sub_context, sub_results)?;
}
sub_task_started = true;
}
Self::start_with_depth(
Arc::clone(&sub),
hostname,
host,
sub_results,
connection_resolver,
processor_resolver,
sub_processor_names,
Some(task.name()),
depth + 1,
max_depth,
)
.await?;
if sub_task_started {
let sub_context = TaskProcessorContext::new(
sub_task_name.as_str(),
Some(task.name()),
depth + 1,
None::<&str>,
);
for processor in &sub_processors {
processor.on_task_finish(&sub_context, sub_results)?;
}
}
}
Ok(())
}
}
impl TaskInfo for TaskDefinition {
fn name(&self) -> &str {
self.inner.name()
}
fn connection_plugin_name(&self) -> Option<&str> {
self.inner.connection_plugin_name()
}
fn get_connection_key(&self, hostname: &str) -> Option<crate::inventory::ConnectionKey> {
self.inner.get_connection_key(hostname)
}
fn options(&self) -> Option<&Value> {
self.inner.options()
}
fn processor_names(&self) -> Vec<&str> {
self.processor_names()
}
}
#[derive(Default)]
pub struct Tasks(Vec<TaskDefinition>);
impl Tasks {
pub fn new() -> Self {
Self(Vec::new())
}
pub fn add_task<T: Task + 'static>(&mut self, task: T) {
self.0.push(TaskDefinition::new(task));
}
}
impl Deref for Tasks {
type Target = Vec<TaskDefinition>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for Tasks {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::inventory::{
BaseBuilderHost, Connection, ConnectionKey, Host, ResolvedConnectionParams,
};
use async_trait::async_trait;
use log::{LevelFilter, Log, Metadata, Record};
use serde_json::json;
use std::fmt;
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use tokio::runtime::Builder;
fn run_async<F: Future>(future: F) -> F::Output {
Builder::new_current_thread()
.enable_all()
.build()
.expect("test runtime should build")
.block_on(future)
}
#[derive(Debug)]
struct TestTaskFailureError;
impl fmt::Display for TestTaskFailureError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "task failure test error")
}
}
impl Error for TestTaskFailureError {}
#[derive(Debug)]
struct ExternalFailurePayload {
code: u16,
}
impl fmt::Display for ExternalFailurePayload {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "external failure code {}", self.code)
}
}
struct TestTask {
name: &'static str,
subs: Vec<Arc<dyn Task>>,
counter: Arc<AtomicUsize>,
}
struct ProcessorTask {
name: &'static str,
processors: Vec<String>,
}
struct CountingProcessor {
calls: Arc<AtomicUsize>,
}
struct TestProcessorResolver {
processor: Arc<dyn TaskProcessor>,
}
struct FailingTask;
struct SkippingTask;
#[derive(Debug)]
struct TestConnection {
key: ConnectionKey,
alive: bool,
}
#[async_trait]
impl Connection for TestConnection {
fn create(&self, key: &ConnectionKey) -> Box<dyn Connection> {
Box::new(Self {
key: key.clone(),
alive: false,
})
}
fn is_alive(&self) -> bool {
self.alive
}
async fn open(&mut self, _params: &ResolvedConnectionParams) -> Result<(), String> {
self.alive = true;
Ok(())
}
fn close(&mut self) -> ConnectionKey {
self.alive = false;
self.key.clone()
}
}
impl TaskInfo for TestTask {
fn name(&self) -> &str {
self.name
}
fn connection_plugin_name(&self) -> Option<&str> {
Some("ssh")
}
fn options(&self) -> Option<&Value> {
None
}
}
#[async_trait]
impl Task for TestTask {
async fn start_async(
&self,
_host: &Host,
_context: &TaskRuntimeContext,
) -> Result<HostTaskResult, TaskError> {
self.counter.fetch_add(1, Ordering::SeqCst);
Ok(HostTaskResult::passed(TaskSuccess::new()))
}
fn sub_tasks(&self) -> Vec<Arc<dyn Task>> {
self.subs.clone()
}
fn execution_mode(&self) -> TaskExecutionMode {
TaskExecutionMode::Async
}
}
impl TaskInfo for ProcessorTask {
fn name(&self) -> &str {
self.name
}
fn connection_plugin_name(&self) -> Option<&str> {
Some("ssh")
}
fn options(&self) -> Option<&Value> {
None
}
fn processor_names(&self) -> Vec<&str> {
self.processors.iter().map(String::as_str).collect()
}
}
#[async_trait]
impl Task for ProcessorTask {
async fn start_async(
&self,
_host: &Host,
_context: &TaskRuntimeContext,
) -> Result<HostTaskResult, TaskError> {
Ok(HostTaskResult::passed(TaskSuccess::new()))
}
fn execution_mode(&self) -> TaskExecutionMode {
TaskExecutionMode::Async
}
}
impl TaskProcessor for CountingProcessor {
fn on_task_start(
&self,
_context: &TaskProcessorContext,
_results: &mut TaskResults,
) -> Result<(), crate::GenjaError> {
self.calls.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_task_finish(
&self,
_context: &TaskProcessorContext,
_results: &mut TaskResults,
) -> Result<(), crate::GenjaError> {
self.calls.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_instance_start(
&self,
_context: &TaskProcessorContext,
) -> Result<(), crate::GenjaError> {
self.calls.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_instance_finish(
&self,
_context: &TaskProcessorContext,
_result: &mut HostTaskResult,
) -> Result<(), crate::GenjaError> {
self.calls.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
impl TaskProcessorResolver for TestProcessorResolver {
fn resolve_task_processor(&self, name: &str) -> Option<Arc<dyn TaskProcessor>> {
(name == "selected").then(|| Arc::clone(&self.processor))
}
}
impl TaskInfo for FailingTask {
fn name(&self) -> &str {
"failing"
}
fn connection_plugin_name(&self) -> Option<&str> {
Some("ssh")
}
fn options(&self) -> Option<&Value> {
None
}
}
#[async_trait]
impl Task for FailingTask {
async fn start_async(
&self,
_host: &Host,
_context: &TaskRuntimeContext,
) -> Result<HostTaskResult, TaskError> {
Ok(HostTaskResult::failed(TaskFailure::new(
TestTaskFailureError,
)))
}
fn execution_mode(&self) -> TaskExecutionMode {
TaskExecutionMode::Async
}
}
impl TaskInfo for SkippingTask {
fn name(&self) -> &str {
"skipping"
}
fn connection_plugin_name(&self) -> Option<&str> {
Some("ssh")
}
fn options(&self) -> Option<&Value> {
None
}
}
#[async_trait]
impl Task for SkippingTask {
async fn start_async(
&self,
_host: &Host,
_context: &TaskRuntimeContext,
) -> Result<HostTaskResult, TaskError> {
Ok(HostTaskResult::Skipped(
TaskSkip::new().with_reason("filtered"),
))
}
fn execution_mode(&self) -> TaskExecutionMode {
TaskExecutionMode::Async
}
}
#[derive(Default)]
struct TestLogger {
entries: Mutex<Vec<String>>,
}
impl TestLogger {
fn clear(&self) {
self.entries
.lock()
.expect("logger lock should not be poisoned")
.clear();
}
fn entries(&self) -> Vec<String> {
self.entries
.lock()
.expect("logger lock should not be poisoned")
.clone()
}
}
impl Log for TestLogger {
fn enabled(&self, _metadata: &Metadata<'_>) -> bool {
true
}
fn log(&self, record: &Record<'_>) {
if self.enabled(record.metadata()) {
self.entries
.lock()
.expect("logger lock should not be poisoned")
.push(format!("{} {}", record.level(), record.args()));
}
}
fn flush(&self) {}
}
fn test_logger() -> &'static TestLogger {
static LOGGER: OnceLock<&'static TestLogger> = OnceLock::new();
LOGGER.get_or_init(|| {
let logger = Box::leak(Box::new(TestLogger::default()));
let _ = log::set_logger(logger);
log::set_max_level(LevelFilter::Debug);
logger
})
}
fn log_lock() -> &'static Mutex<()> {
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| Mutex::new(()))
}
fn chain(depth: usize, counter: Arc<AtomicUsize>) -> Arc<dyn Task> {
if depth == 1 {
return Arc::new(TestTask {
name: "leaf",
subs: Vec::new(),
counter,
});
}
let child = chain(depth - 1, counter.clone());
Arc::new(TestTask {
name: "node",
subs: vec![child],
counter,
})
}
#[test]
fn start_runs_within_max_depth() {
let counter = Arc::new(AtomicUsize::new(0));
let root = chain(3, counter.clone());
let task = TaskDefinition::new(TestTask {
name: "root",
subs: vec![root],
counter: counter.clone(),
});
let host = Host::builder().hostname("router1").build();
let mut results = TaskResults::new("root");
run_async(task.start("router1", &host, &mut results, 4)).expect("start should succeed");
assert_eq!(counter.load(Ordering::SeqCst), 4);
assert!(results.host_result("router1").is_some());
assert!(results.sub_task("node").is_some());
assert!(results.started_at().is_some());
assert!(results.finished_at().is_some());
assert!(results.duration_display().is_some());
let node_results = results
.sub_task("node")
.expect("sub-task results should exist after execution");
assert!(node_results.started_at().is_some());
assert!(node_results.finished_at().is_some());
assert!(node_results.duration_display().is_some());
}
#[test]
fn start_captures_host_failure_when_depth_exceeds_limit() {
let counter = Arc::new(AtomicUsize::new(0));
let root = chain(5, counter.clone());
let task = TaskDefinition::new(TestTask {
name: "root",
subs: vec![root],
counter: counter.clone(),
});
let host = Host::builder().hostname("router1").build();
let mut results = TaskResults::new("root");
run_async(task.start("router1", &host, &mut results, 4))
.expect("start should capture depth overflow as a host failure");
assert_eq!(counter.load(Ordering::SeqCst), 5);
let level_one = results
.sub_task("node")
.expect("first nested node should exist");
let level_two = level_one
.sub_task("node")
.expect("second nested node should exist");
let level_three = level_two
.sub_task("node")
.expect("third nested node should exist");
let level_four = level_three
.sub_task("node")
.expect("fourth nested node should exist");
let level_five = level_four
.sub_task("leaf")
.expect("leaf task should capture failure");
let failure = level_five
.host_result("router1")
.and_then(HostTaskResult::failure)
.expect("depth overflow should be recorded as a host failure");
assert!(failure.message().contains("max task depth exceeded"));
assert!(matches!(failure.kind(), TaskFailureKind::Internal));
assert!(failure.started_at().is_some());
assert!(failure.finished_at().is_some());
assert_eq!(failure.duration_ns(), Some(0));
}
#[test]
fn start_attaches_timing_to_passed_host_results() {
let counter = Arc::new(AtomicUsize::new(0));
let task = TaskDefinition::new(TestTask {
name: "root",
subs: Vec::new(),
counter,
});
let host = Host::builder().hostname("router1").build();
let mut results = TaskResults::new("root");
run_async(task.start("router1", &host, &mut results, 0)).expect("start should succeed");
let success = results
.host_result("router1")
.and_then(HostTaskResult::success)
.expect("host result should be passed");
assert!(success.started_at().is_some());
assert!(success.finished_at().is_some());
assert!(success.duration_ns().is_some());
assert!(success.duration_display().is_some());
}
#[test]
fn start_attaches_timing_to_failed_host_results() {
let task = TaskDefinition::new(FailingTask);
let host = Host::builder().hostname("router1").build();
let mut results = TaskResults::new("failing");
run_async(task.start("router1", &host, &mut results, 0))
.expect("start should record a failed result");
let failure = results
.host_result("router1")
.and_then(HostTaskResult::failure)
.expect("host result should be failed");
assert!(failure.started_at().is_some());
assert!(failure.finished_at().is_some());
assert!(failure.duration_ns().is_some());
assert!(failure.duration_display().is_some());
}
#[test]
fn start_does_not_attach_timing_to_skipped_host_results() {
let task = TaskDefinition::new(SkippingTask);
let host = Host::builder().hostname("router1").build();
let mut results = TaskResults::new("skipping");
run_async(task.start("router1", &host, &mut results, 0))
.expect("start should record a skipped result");
let skip = results
.host_result("router1")
.and_then(HostTaskResult::skipped_detail)
.expect("host result should be skipped");
assert_eq!(skip.reason(), Some("filtered"));
assert_eq!(skip.message(), None);
}
#[test]
fn start_logs_per_host_finish_for_passed_results() {
let _guard = log_lock().lock().expect("log lock should not be poisoned");
let logger = test_logger();
logger.clear();
let counter = Arc::new(AtomicUsize::new(0));
let task = TaskDefinition::new(TestTask {
name: "root",
subs: Vec::new(),
counter,
});
let host = Host::builder().hostname("router1").build();
let mut results = TaskResults::new("root");
run_async(task.start("router1", &host, &mut results, 0)).expect("start should succeed");
let entries = logger.entries();
assert!(entries.iter().any(|entry| {
entry.contains(
"DEBUG starting task 'root' for host 'router1' parent_task='none' depth=0",
)
}));
assert!(entries.iter().any(|entry| {
entry.contains(
"INFO finished task 'root' for host 'router1' with status=passed duration_ms=",
) && entry.contains(" duration=")
}));
}
#[test]
fn start_logs_per_host_failure_warning_and_finish() {
let _guard = log_lock().lock().expect("log lock should not be poisoned");
let logger = test_logger();
logger.clear();
let task = TaskDefinition::new(FailingTask);
let host = Host::builder().hostname("router1").build();
let mut results = TaskResults::new("failing");
run_async(task.start("router1", &host, &mut results, 0))
.expect("start should record a failed result");
let entries = logger.entries();
assert!(entries.iter().any(|entry| {
entry == "WARN task 'failing' failed for host 'router1': task failure test error"
}));
assert!(entries.iter().any(|entry| {
entry.contains(
"INFO finished task 'failing' for host 'router1' with status=failed duration_ms=",
) && entry.contains(" duration=")
}));
}
#[test]
fn start_logs_per_host_skip_event_and_finish() {
let _guard = log_lock().lock().expect("log lock should not be poisoned");
let logger = test_logger();
logger.clear();
let task = TaskDefinition::new(SkippingTask);
let host = Host::builder().hostname("router1").build();
let mut results = TaskResults::new("skipping");
run_async(task.start("router1", &host, &mut results, 0))
.expect("start should record a skipped result");
let entries = logger.entries();
assert!(entries.iter().any(|entry| {
entry == "INFO task 'skipping' skipped for host 'router1' reason='filtered' message=''"
}));
assert!(entries.iter().any(|entry| {
entry.contains(
"INFO finished task 'skipping' for host 'router1' with status=skipped duration_ms=",
) && entry.contains(" duration=")
}));
}
#[test]
fn task_failure_preserves_metadata_and_supports_downcast() {
let failure = TaskFailure::new(TestTaskFailureError)
.with_kind(TaskFailureKind::Connection)
.with_retryable(true)
.with_details(json!({"port": 22}))
.with_warning("intermittent reachability")
.with_message(TaskMessage::new(MessageLevel::Error, "ssh session failed"));
assert_eq!(failure.message(), "task failure test error");
assert_eq!(failure.error().to_string(), "task failure test error");
assert!(matches!(failure.kind(), TaskFailureKind::Connection));
assert!(failure.retryable());
assert_eq!(failure.details(), Some(&json!({"port": 22})));
assert_eq!(failure.warnings(), ["intermittent reachability"]);
assert_eq!(failure.messages()[0].text(), "ssh session failed");
assert!(
failure
.error_type()
.ends_with("task::tests::TestTaskFailureError")
);
assert!(failure.downcast_ref::<TestTaskFailureError>().is_some());
}
#[test]
fn task_failure_capture_supports_non_error_payloads() {
let failure = TaskFailure::capture(ExternalFailurePayload { code: 42 })
.with_kind(TaskFailureKind::Internal);
assert_eq!(failure.message(), "external failure code 42");
assert!(
failure
.error()
.to_string()
.contains("external failure code 42")
);
assert!(
failure
.error_type()
.ends_with("task::tests::ExternalFailurePayload")
);
let payload = failure
.downcast_ref::<ExternalFailurePayload>()
.expect("captured payload should be downcastable");
assert_eq!(payload.code, 42);
}
#[derive(Debug)]
struct ExternalTaskError;
impl fmt::Display for ExternalTaskError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "external task error")
}
}
impl Error for ExternalTaskError {}
struct ErroringTask;
impl TaskInfo for ErroringTask {
fn name(&self) -> &str {
"erroring"
}
fn connection_plugin_name(&self) -> Option<&str> {
Some("ssh")
}
fn options(&self) -> Option<&Value> {
None
}
}
#[async_trait]
impl Task for ErroringTask {
async fn start_async(
&self,
_host: &Host,
_context: &TaskRuntimeContext,
) -> Result<HostTaskResult, TaskError> {
Err(TaskError::new(ExternalTaskError))
}
fn execution_mode(&self) -> TaskExecutionMode {
TaskExecutionMode::Async
}
}
#[test]
fn start_captures_task_errors_as_external_failures() {
let task = TaskDefinition::new(ErroringTask);
let host = Host::builder().hostname("router1").build();
let mut results = TaskResults::new("erroring");
run_async(task.start("router1", &host, &mut results, 0))
.expect("start should capture task error as host failure");
let failure = results
.host_result("router1")
.and_then(HostTaskResult::failure)
.expect("task error should be recorded as failure");
assert_eq!(failure.message(), "external task error");
assert!(matches!(failure.kind(), TaskFailureKind::External));
assert!(
failure
.error_type()
.ends_with("task::tests::ExternalTaskError")
);
}
#[test]
fn task_success_builders_expose_extended_metadata() {
let started_at = SystemTime::UNIX_EPOCH;
let finished_at = SystemTime::UNIX_EPOCH
.checked_add(std::time::Duration::from_secs(2))
.expect("valid timestamp");
let success = TaskSuccess::new()
.with_result(json!({"ok": true}))
.with_changed(true)
.with_diff("updated config")
.with_summary("task completed")
.with_warning("minor drift")
.with_message(
TaskMessage::new(MessageLevel::Info, "commit complete").with_code("commit_ok"),
)
.with_metadata(json!({"version": 1}))
.with_started_at(started_at)
.with_finished_at(finished_at)
.with_duration_ms(2000);
assert_eq!(success.result(), Some(&json!({"ok": true})));
assert!(success.changed());
assert_eq!(success.diff(), Some("updated config"));
assert_eq!(success.summary(), Some("task completed"));
assert_eq!(success.warnings(), ["minor drift"]);
assert_eq!(success.messages()[0].text(), "commit complete");
assert_eq!(success.messages()[0].code(), Some("commit_ok"));
assert!(matches!(success.messages()[0].level(), MessageLevel::Info));
assert_eq!(success.metadata(), Some(&json!({"version": 1})));
assert_eq!(success.started_at(), Some(started_at));
assert_eq!(success.finished_at(), Some(finished_at));
assert_eq!(success.duration_ms(), Some(2000));
}
#[test]
fn task_skip_and_host_task_result_expose_skip_metadata() {
let skipped = HostTaskResult::Skipped(
TaskSkip::new()
.with_reason("filtered")
.with_message("host excluded by selector"),
);
assert!(skipped.is_skipped());
assert_eq!(
skipped.skipped_detail().and_then(TaskSkip::reason),
Some("filtered")
);
assert_eq!(
skipped.skipped_detail().and_then(TaskSkip::message),
Some("host excluded by selector")
);
let skipped_with_reason = HostTaskResult::skipped_with_reason("parent_failed");
assert_eq!(
skipped_with_reason
.skipped_detail()
.and_then(TaskSkip::reason),
Some("parent_failed")
);
}
#[test]
fn task_message_builders_expose_message_metadata() {
let timestamp = SystemTime::UNIX_EPOCH
.checked_add(std::time::Duration::from_secs(1))
.expect("valid timestamp");
let message = TaskMessage::new(MessageLevel::Warning, "latency threshold exceeded")
.with_code("latency_warn")
.with_timestamp(timestamp);
assert!(matches!(message.level(), MessageLevel::Warning));
assert_eq!(message.text(), "latency threshold exceeded");
assert_eq!(message.code(), Some("latency_warn"));
assert_eq!(message.timestamp(), Some(timestamp));
}
#[test]
fn task_results_builders_expose_summary_and_timing_metadata() {
let started_at = SystemTime::UNIX_EPOCH;
let finished_at = SystemTime::UNIX_EPOCH
.checked_add(std::time::Duration::from_secs(3))
.expect("valid timestamp");
let results = TaskResults::new("deploy")
.with_summary("deploy finished")
.with_started_at(started_at)
.with_finished_at(finished_at)
.with_duration_ms(3000);
assert_eq!(results.task_name(), "deploy");
assert_eq!(results.summary(), Some("deploy finished"));
assert_eq!(results.started_at(), Some(started_at));
assert_eq!(results.finished_at(), Some(finished_at));
assert_eq!(results.duration_ns(), Some(3_000_000_000));
assert_eq!(results.duration_ms(), Some(3000));
assert_eq!(
results.started_at_display(),
Some("1970-01-01T00:00:00Z".to_string())
);
assert_eq!(
results.finished_at_display(),
Some("1970-01-01T00:00:03Z".to_string())
);
assert_eq!(results.duration_display(), Some("3s".to_string()));
let json = results
.to_json_string()
.expect("human json should serialize");
assert!(json.contains("\"started_at\":\"1970-01-01T00:00:00Z\""));
assert!(json.contains("\"finished_at\":\"1970-01-01T00:00:03Z\""));
assert!(json.contains("\"duration\":\"3s\""));
assert!(!json.contains("\"duration_ns\":"));
assert!(!json.contains("\"duration_ms\":"));
let raw_json = results
.to_raw_json_string()
.expect("raw json should serialize");
assert!(!raw_json.contains("\"duration\":\"3s\""));
assert!(raw_json.contains("\"duration_ns\":3000000000"));
}
#[test]
fn task_results_human_json_serializes_recursive_sub_tasks() {
let child = TaskResults::new("child").with_duration_ms(250);
let mut root = TaskResults::new("root").with_duration_ms(2000);
root.insert_sub_task("child", child);
let json = root
.to_json_string()
.expect("human json should serialize recursively");
assert!(json.contains("\"task_name\":\"root\""));
assert!(json.contains("\"sub_tasks\":{\"child\":{\"task_name\":\"child\""));
assert!(json.contains("\"duration\":\"2s\""));
assert!(json.contains("\"duration\":\"250ms\""));
}
#[test]
fn sub_task_results_human_json_includes_aggregate_timing() {
let counter = Arc::new(AtomicUsize::new(0));
let root = chain(2, counter.clone());
let task = TaskDefinition::new(TestTask {
name: "root",
subs: vec![root],
counter,
});
let host = Host::builder().hostname("router1").build();
let mut results = TaskResults::new("root");
run_async(task.start("router1", &host, &mut results, 3)).expect("start should succeed");
let json = results
.to_json_string()
.expect("human json should serialize sub-task timing");
assert!(json.contains("\"sub_tasks\":{\"node\":{"));
assert!(json.contains("\"started_at\":\""));
assert!(json.contains("\"finished_at\":\""));
assert!(json.contains("\"duration\":\""));
}
#[test]
fn processors_are_selected_per_task_not_inherited_by_sub_tasks() {
let task_counter = Arc::new(AtomicUsize::new(0));
let processor_calls = Arc::new(AtomicUsize::new(0));
let child = Arc::new(ProcessorTask {
name: "child",
processors: vec!["selected".to_string()],
});
let root = TestTask {
name: "root",
subs: vec![child],
counter: task_counter,
};
let processor = Arc::new(CountingProcessor {
calls: Arc::clone(&processor_calls),
});
let resolver = Arc::new(TestProcessorResolver { processor });
let task = TaskDefinition::new(root).with_processor_resolver(resolver);
let host = Host::builder().hostname("router1").build();
let mut results = TaskResults::new("root");
run_async(task.start("router1", &host, &mut results, 3))
.expect("task execution should succeed");
assert_eq!(processor_calls.load(Ordering::SeqCst), 4);
assert!(results.host_result("router1").is_some());
assert!(results.sub_task("child").is_some());
}
#[test]
fn task_results_human_json_formats_host_timing_uniformly() {
let started_at = SystemTime::UNIX_EPOCH;
let finished_at = SystemTime::UNIX_EPOCH
.checked_add(std::time::Duration::from_millis(2))
.expect("valid timestamp");
let mut results = TaskResults::new("deploy");
results.insert_host_result(
"router1",
HostTaskResult::passed(
TaskSuccess::new()
.with_summary("ok")
.with_started_at(started_at)
.with_finished_at(finished_at)
.with_duration_ns(2_000_000),
),
);
results.insert_host_result(
"router2",
HostTaskResult::failed(
TaskFailure::new(TestTaskFailureError)
.with_started_at(started_at)
.with_finished_at(finished_at)
.with_duration_ns(250_000),
),
);
results.insert_host_result(
"router3",
HostTaskResult::Skipped(TaskSkip::new().with_reason("filtered")),
);
let json = results
.to_json_string()
.expect("human json should serialize host timing");
assert!(json.contains("\"router1\":{\"Passed\":{"));
assert!(json.contains("\"summary\":\"ok\""));
assert!(json.contains("\"started_at\":\"1970-01-01T00:00:00Z\""));
assert!(json.contains("\"finished_at\":\"1970-01-01T00:00:00Z\""));
assert!(json.contains("\"duration\":\"2ms\""));
assert!(json.contains("\"router2\":{\"Failed\":"));
assert!(json.contains("\"duration\":\"250us\""));
assert!(json.contains("\"router3\":{\"Skipped\":{\"reason\":\"filtered\""));
assert!(!json.contains("\"router3\":{\"Skipped\":{\"started_at\""));
assert!(!json.contains("\"duration_ns\""));
assert!(!json.contains("\"duration_ms\""));
}
#[test]
fn task_results_duration_display_preserves_sub_millisecond_precision() {
let micros = TaskResults::new("micros").with_duration_ns(250_000);
let nanos = TaskResults::new("nanos").with_duration_ns(250);
let millis = TaskResults::new("millis").with_duration_ns(2_500_000);
let seconds = TaskResults::new("seconds").with_duration_ns(1_500_587_737);
assert_eq!(micros.duration_ns(), Some(250_000));
assert_eq!(micros.duration_ms(), Some(0));
assert_eq!(micros.duration_display(), Some("250us".to_string()));
assert_eq!(nanos.duration_ns(), Some(250));
assert_eq!(nanos.duration_ms(), Some(0));
assert_eq!(nanos.duration_display(), Some("250ns".to_string()));
assert_eq!(millis.duration_display(), Some("2.5ms".to_string()));
assert_eq!(seconds.duration_display(), Some("1.5s".to_string()));
}
#[test]
fn task_runtime_context_helpers_expose_execution_and_connection() {
let execution = TaskExecutionContext::new(2, 5);
let key = ConnectionKey::new("router1", "ssh");
let connection = Arc::new(tokio::sync::Mutex::new(TestConnection {
key: key.clone(),
alive: true,
}));
let context = TaskRuntimeContext::new(execution.clone(), Some(connection));
assert_eq!(context.execution(), &execution);
assert_eq!(context.current_depth(), 2);
assert_eq!(context.max_depth(), 5);
assert!(context.has_connection());
assert!(context.connection().is_some());
let is_alive = context
.with_connection(|connection| Ok(connection.is_alive()))
.expect("connection helper should not fail");
assert_eq!(is_alive, Some(true));
}
#[test]
fn task_runtime_context_with_connection_returns_none_without_connection() {
let context = TaskRuntimeContext::new(TaskExecutionContext::new(0, 1), None);
let result = context
.with_connection(|connection| Ok(connection.is_alive()))
.expect("missing connection should not fail");
assert_eq!(result, None);
assert!(!context.has_connection());
}
#[test]
fn task_results_store_recursive_host_and_sub_task_results() {
let mut root = TaskResults::new("deploy").with_summary("deploy completed");
root.insert_host_result(
"router1",
HostTaskResult::passed(
TaskSuccess::new()
.with_result(json!({"deployed": true}))
.with_changed(true)
.with_summary("config deployed")
.with_warning("candidate config had comments")
.with_message(TaskMessage::new(
MessageLevel::Info,
"candidate config committed",
))
.with_metadata(json!({"version": "1.2.3"})),
),
);
root.insert_host_result(
"router2",
HostTaskResult::failed(
TaskFailure::new(TestTaskFailureError)
.with_kind(TaskFailureKind::Connection)
.with_retryable(true),
),
);
let mut validate = TaskResults::new("validate");
validate.insert_host_result(
"router1",
HostTaskResult::passed(TaskSuccess::new().with_result(json!({"valid": true}))),
);
validate.insert_host_result(
"router2",
HostTaskResult::Skipped(
TaskSkip::new()
.with_reason("parent_failed")
.with_message("validation skipped because deploy failed"),
),
);
let mut collect_logs = TaskResults::new("collect_logs");
collect_logs.insert_host_result(
"router1",
HostTaskResult::passed(TaskSuccess::new().with_diff("captured logs")),
);
collect_logs.insert_host_result(
"router2",
HostTaskResult::skipped_with_reason("parent_failed"),
);
validate.insert_sub_task("collect_logs", collect_logs);
root.insert_sub_task("validate", validate);
assert_eq!(root.task_name(), "deploy");
assert_eq!(
root.passed_hosts()
.into_iter()
.map(|host| host.as_str())
.collect::<Vec<_>>(),
vec!["router1"]
);
assert_eq!(
root.failed_hosts()
.into_iter()
.map(|host| host.as_str())
.collect::<Vec<_>>(),
vec!["router2"]
);
let validate = root
.sub_task("validate")
.expect("validate sub task should exist");
assert_eq!(validate.task_name(), "validate");
assert_eq!(root.summary(), Some("deploy completed"));
assert_eq!(
root.host_result("router1")
.and_then(HostTaskResult::success)
.and_then(TaskSuccess::summary),
Some("config deployed")
);
assert_eq!(
root.host_result("router1")
.and_then(HostTaskResult::success)
.map(TaskSuccess::warnings)
.map(|warnings| warnings.len()),
Some(1)
);
assert!(
validate
.host_result("router2")
.expect("router2 validate result should exist")
.is_skipped()
);
assert_eq!(
validate
.host_result("router2")
.and_then(HostTaskResult::skipped_detail)
.and_then(TaskSkip::reason),
Some("parent_failed")
);
let collect_logs = validate
.sub_task("collect_logs")
.expect("collect_logs sub task should exist");
assert_eq!(collect_logs.task_name(), "collect_logs");
assert_eq!(
collect_logs
.host_result("router1")
.and_then(HostTaskResult::success)
.and_then(TaskSuccess::diff),
Some("captured logs")
);
assert_eq!(
root.host_result("router2")
.and_then(HostTaskResult::failure)
.map(TaskFailure::retryable),
Some(true)
);
let root_summary = root.task_summary();
assert_eq!(root_summary.task_name(), "deploy");
assert_eq!(root_summary.hosts().passed(), 1);
assert_eq!(root_summary.hosts().failed(), 1);
assert_eq!(root_summary.hosts().skipped(), 0);
assert_eq!(root_summary.hosts().total(), 2);
assert_eq!(root_summary.duration_ms(), None);
let validate_summary = root_summary
.sub_tasks()
.get("validate")
.expect("validate summary should exist");
assert_eq!(validate_summary.task_name(), "validate");
assert_eq!(validate_summary.hosts().passed(), 1);
assert_eq!(validate_summary.hosts().failed(), 0);
assert_eq!(validate_summary.hosts().skipped(), 1);
assert_eq!(validate_summary.duration_ms(), None);
let collect_logs_summary = validate_summary
.sub_tasks()
.get("collect_logs")
.expect("collect_logs summary should exist");
assert_eq!(collect_logs_summary.task_name(), "collect_logs");
assert_eq!(collect_logs_summary.hosts().passed(), 1);
assert_eq!(collect_logs_summary.hosts().failed(), 0);
assert_eq!(collect_logs_summary.hosts().skipped(), 1);
}
}