use crate::error::CanoError;
use crate::resource::Resources;
use crate::task::TaskConfig;
use cano_macros::node;
use std::borrow::Cow;
use std::fmt;
use std::hash::Hash;
#[cfg(feature = "tracing")]
use tracing::Instrument;
pub type DefaultNodeResult = Result<Box<dyn std::any::Any + Send + Sync>, CanoError>;
#[node]
pub trait Node<TState, TResourceKey = Cow<'static, str>>: Send + Sync
where
TState: Clone + fmt::Debug + Send + Sync + 'static,
TResourceKey: Hash + Eq + Send + Sync + 'static,
{
type PrepResult: Send + Sync;
type ExecResult: Send + Sync;
fn config(&self) -> TaskConfig {
TaskConfig::default()
}
async fn prep(&self, res: &Resources<TResourceKey>) -> Result<Self::PrepResult, CanoError>;
async fn exec(&self, prep_res: Self::PrepResult) -> Self::ExecResult;
async fn post(
&self,
res: &Resources<TResourceKey>,
exec_res: Self::ExecResult,
) -> Result<TState, CanoError>;
async fn run(&self, res: &Resources<TResourceKey>) -> Result<TState, CanoError> {
let config = self.config();
self.run_with_retries(res, &config).await
}
async fn run_with_retries(
&self,
res: &Resources<TResourceKey>,
config: &TaskConfig,
) -> Result<TState, CanoError> {
#[cfg(feature = "tracing")]
let node_span = tracing::info_span!("node_execution");
#[cfg(feature = "tracing")]
let _enter = node_span.enter();
let max_attempts = config.retry_mode.max_attempts();
let mut attempt = 0;
#[cfg(feature = "tracing")]
tracing::debug!(
max_attempts = max_attempts,
"Starting node execution with retry logic"
);
loop {
#[cfg(feature = "tracing")]
tracing::debug!(attempt = attempt, "Starting node execution attempt");
#[cfg(feature = "tracing")]
let prep_result = {
let prep_span = tracing::debug_span!("node_prep", attempt = attempt);
self.prep(res).instrument(prep_span).await
};
#[cfg(not(feature = "tracing"))]
let prep_result = self.prep(res).await;
match prep_result {
Ok(prep_res) => {
#[cfg(feature = "tracing")]
tracing::debug!(attempt = attempt, "Prep phase completed successfully");
#[cfg(feature = "tracing")]
let exec_res = {
let exec_span = tracing::debug_span!("node_exec", attempt = attempt);
async { self.exec(prep_res).await }
.instrument(exec_span)
.await
};
#[cfg(not(feature = "tracing"))]
let exec_res = self.exec(prep_res).await;
#[cfg(feature = "tracing")]
tracing::debug!(attempt = attempt, "Exec phase completed");
#[cfg(feature = "tracing")]
let post_result = {
let post_span = tracing::debug_span!("node_post", attempt = attempt);
self.post(res, exec_res).instrument(post_span).await
};
#[cfg(not(feature = "tracing"))]
let post_result = self.post(res, exec_res).await;
match post_result {
Ok(result) => {
#[cfg(feature = "tracing")]
tracing::info!(attempt = attempt, final_result = ?result, "Node execution completed successfully");
return Ok(result);
}
Err(e) => {
attempt += 1;
#[cfg(feature = "tracing")]
tracing::warn!(attempt = attempt, error = ?e, max_attempts = max_attempts, "Post phase failed");
if attempt >= max_attempts {
#[cfg(feature = "tracing")]
tracing::error!(attempt = attempt, error = ?e, "Node execution failed after maximum attempts");
if max_attempts <= 1 {
return Err(e);
}
return Err(CanoError::retry_exhausted(format!(
"Node post phase failed after {} attempt(s): {}",
attempt, e
)));
} else if let Some(delay) =
config.retry_mode.delay_for_attempt(attempt - 1)
{
#[cfg(feature = "tracing")]
tracing::debug!(
attempt = attempt,
delay_ms = delay.as_millis(),
"Retrying after delay"
);
tokio::time::sleep(delay).await;
}
}
}
}
Err(e) => {
attempt += 1;
#[cfg(feature = "tracing")]
tracing::warn!(attempt = attempt, error = ?e, max_attempts = max_attempts, "Prep phase failed");
if attempt >= max_attempts {
#[cfg(feature = "tracing")]
tracing::error!(attempt = attempt, error = ?e, "Node execution failed after maximum attempts");
if max_attempts <= 1 {
return Err(e);
}
return Err(CanoError::retry_exhausted(format!(
"Node prep phase failed after {} attempt(s): {}",
attempt, e
)));
} else if let Some(delay) = config.retry_mode.delay_for_attempt(attempt - 1) {
#[cfg(feature = "tracing")]
tracing::debug!(
attempt = attempt,
delay_ms = delay.as_millis(),
"Retrying after delay"
);
tokio::time::sleep(delay).await;
}
}
}
}
}
}
pub type DynNode<TState, TResourceKey = Cow<'static, str>> = dyn Node<TState, TResourceKey, PrepResult = DefaultNodeResult, ExecResult = DefaultNodeResult>
+ Send
+ Sync;
pub type NodeObject<TState, TResourceKey = Cow<'static, str>> =
std::sync::Arc<DynNode<TState, TResourceKey>>;
#[cfg(test)]
mod tests {
use super::*;
use crate::resource::Resources;
use crate::task::RetryMode;
use cano_macros::node;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use tokio;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum TestAction {
#[allow(dead_code)]
Continue,
Complete,
Error,
#[allow(dead_code)]
Retry,
}
struct SimpleSuccessNode {
execution_count: Arc<AtomicU32>,
}
impl SimpleSuccessNode {
fn new() -> Self {
Self {
execution_count: Arc::new(AtomicU32::new(0)),
}
}
fn execution_count(&self) -> u32 {
self.execution_count.load(Ordering::SeqCst)
}
}
#[node]
impl Node<TestAction> for SimpleSuccessNode {
type PrepResult = String;
type ExecResult = bool;
async fn prep(&self, _res: &Resources) -> Result<Self::PrepResult, CanoError> {
Ok("prepared".to_string())
}
async fn exec(&self, prep_res: Self::PrepResult) -> Self::ExecResult {
self.execution_count.fetch_add(1, Ordering::SeqCst);
prep_res == "prepared"
}
async fn post(
&self,
_res: &Resources,
exec_res: Self::ExecResult,
) -> Result<TestAction, CanoError> {
if exec_res {
Ok(TestAction::Complete)
} else {
Ok(TestAction::Error)
}
}
}
struct PrepFailureNode {
error_message: String,
}
impl PrepFailureNode {
fn new(error_message: &str) -> Self {
Self {
error_message: error_message.to_string(),
}
}
}
#[node]
impl Node<TestAction> for PrepFailureNode {
type PrepResult = String;
type ExecResult = bool;
async fn prep(&self, _res: &Resources) -> Result<Self::PrepResult, CanoError> {
Err(CanoError::preparation(&self.error_message))
}
async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult {
true
}
async fn post(
&self,
_res: &Resources,
_exec_res: Self::ExecResult,
) -> Result<TestAction, CanoError> {
Ok(TestAction::Complete)
}
}
struct PostFailureNode;
#[node]
impl Node<TestAction> for PostFailureNode {
type PrepResult = ();
type ExecResult = ();
async fn prep(&self, _res: &Resources) -> Result<Self::PrepResult, CanoError> {
Ok(())
}
async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult {}
async fn post(
&self,
_res: &Resources,
_exec_res: Self::ExecResult,
) -> Result<TestAction, CanoError> {
Err(CanoError::node_execution("Post phase failure"))
}
}
#[tokio::test]
async fn test_simple_node_execution() {
let node = SimpleSuccessNode::new();
let res = Resources::new();
let result = node.run(&res).await.unwrap();
assert_eq!(result, TestAction::Complete);
assert_eq!(node.execution_count(), 1);
}
#[tokio::test]
async fn test_node_lifecycle_phases() {
let node = SimpleSuccessNode::new();
let res = Resources::new();
let prep_result = node.prep(&res).await.unwrap();
assert_eq!(prep_result, "prepared");
let exec_result = node.exec(prep_result).await;
assert!(exec_result);
let post_result = node.post(&res, exec_result).await.unwrap();
assert_eq!(post_result, TestAction::Complete);
}
#[tokio::test]
async fn test_prep_phase_failure() {
let node = PrepFailureNode::new("Test prep failure");
let res = Resources::new();
let result = node.run(&res).await;
assert!(result.is_err());
let error = result.unwrap_err();
assert!(error.to_string().contains("Test prep failure"));
}
#[tokio::test]
async fn test_post_phase_failure() {
let node = PostFailureNode;
let res = Resources::new();
let result = node.run(&res).await;
assert!(result.is_err());
let error = result.unwrap_err();
assert!(error.to_string().contains("Post phase failure"));
}
#[test]
fn test_node_config_creation() {
let config = TaskConfig::new();
assert_eq!(config.retry_mode.max_attempts(), 4);
}
#[test]
fn test_node_config_default() {
let config = TaskConfig::default();
assert_eq!(config.retry_mode.max_attempts(), 4);
}
#[test]
fn test_node_config_minimal() {
let config = TaskConfig::minimal();
assert_eq!(config.retry_mode.max_attempts(), 1);
}
#[test]
fn test_node_config_with_fixed_retry() {
let config = TaskConfig::new().with_fixed_retry(5, Duration::from_millis(100));
assert_eq!(config.retry_mode.max_attempts(), 6);
}
#[test]
fn test_node_config_builder_pattern() {
let config = TaskConfig::new().with_fixed_retry(10, Duration::from_secs(1));
assert_eq!(config.retry_mode.max_attempts(), 11);
}
#[tokio::test]
async fn test_multiple_node_executions() {
let node = SimpleSuccessNode::new();
let res = Resources::new();
for i in 1..=5 {
let result = node.run(&res).await.unwrap();
assert_eq!(result, TestAction::Complete);
assert_eq!(node.execution_count(), i);
}
}
#[tokio::test]
async fn test_error_propagation() {
let res = Resources::new();
let prep_fail_node = PrepFailureNode::new("Prep failed");
let result = prep_fail_node.run(&res).await;
assert!(result.is_err());
let post_fail_node = PostFailureNode;
let result = post_fail_node.run(&res).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_concurrent_node_execution() {
use tokio::task;
let node = Arc::new(SimpleSuccessNode::new());
let res = Arc::new(Resources::new());
let mut handles = vec![];
for _ in 0..10 {
let node_clone = Arc::clone(&node);
let res_clone = Arc::clone(&res);
let handle = task::spawn(async move { node_clone.run(&*res_clone).await });
handles.push(handle);
}
let mut success_count = 0;
for handle in handles {
let result = handle.await.unwrap();
if result.is_ok() && result.unwrap() == TestAction::Complete {
success_count += 1;
}
}
assert_eq!(success_count, 10);
assert_eq!(node.execution_count(), 10);
}
#[tokio::test]
async fn test_node_config_retry_behavior() {
use std::sync::atomic::{AtomicUsize, Ordering};
struct RetryNode {
attempt_count: Arc<AtomicUsize>,
max_retries: usize,
}
impl RetryNode {
fn new(max_retries: usize) -> Self {
Self {
attempt_count: Arc::new(AtomicUsize::new(0)),
max_retries,
}
}
fn attempt_count(&self) -> usize {
self.attempt_count.load(Ordering::SeqCst)
}
}
#[node]
impl Node<TestAction> for RetryNode {
type PrepResult = ();
type ExecResult = ();
fn config(&self) -> TaskConfig {
TaskConfig::new().with_fixed_retry(self.max_retries, Duration::from_millis(1))
}
async fn prep(&self, _res: &Resources) -> Result<Self::PrepResult, CanoError> {
let attempt = self.attempt_count.fetch_add(1, Ordering::SeqCst) + 1;
if attempt < 3 {
Err(CanoError::preparation("Simulated failure"))
} else {
Ok(())
}
}
async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult {}
async fn post(
&self,
_res: &Resources,
_exec_res: Self::ExecResult,
) -> Result<TestAction, CanoError> {
Ok(TestAction::Complete)
}
}
let res = Resources::new();
let node_success = RetryNode::new(5);
let result = node_success.run(&res).await.unwrap();
assert_eq!(result, TestAction::Complete);
assert_eq!(node_success.attempt_count(), 3);
let node_failure = RetryNode::new(1);
let result = node_failure.run(&res).await;
assert!(result.is_err());
assert_eq!(node_failure.attempt_count(), 2); }
#[tokio::test]
async fn test_node_config_variants() {
let res = Resources::new();
struct MinimalNode;
#[node]
impl Node<TestAction> for MinimalNode {
type PrepResult = ();
type ExecResult = ();
fn config(&self) -> TaskConfig {
TaskConfig::minimal()
}
async fn prep(&self, _res: &Resources) -> Result<Self::PrepResult, CanoError> {
Ok(())
}
async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult {}
async fn post(
&self,
_res: &Resources,
_exec_res: Self::ExecResult,
) -> Result<TestAction, CanoError> {
Ok(TestAction::Complete)
}
}
let minimal_node = MinimalNode;
let result = minimal_node.run(&res).await.unwrap();
assert_eq!(result, TestAction::Complete);
let config = minimal_node.config();
assert_eq!(config.retry_mode.max_attempts(), 1);
}
#[test]
fn test_retry_mode_none() {
let retry_mode = RetryMode::None;
assert_eq!(retry_mode.max_attempts(), 1);
assert_eq!(retry_mode.delay_for_attempt(0), None);
assert_eq!(retry_mode.delay_for_attempt(1), None);
}
#[test]
fn test_retry_mode_fixed() {
let retry_mode = RetryMode::fixed(3, Duration::from_millis(100));
assert_eq!(retry_mode.max_attempts(), 4);
assert_eq!(
retry_mode.delay_for_attempt(0),
Some(Duration::from_millis(100))
);
assert_eq!(
retry_mode.delay_for_attempt(1),
Some(Duration::from_millis(100))
);
assert_eq!(
retry_mode.delay_for_attempt(2),
Some(Duration::from_millis(100))
);
assert_eq!(retry_mode.delay_for_attempt(3), None); assert_eq!(retry_mode.delay_for_attempt(4), None);
}
#[test]
fn test_retry_mode_exponential_basic() {
let retry_mode = RetryMode::exponential(3);
assert_eq!(retry_mode.max_attempts(), 4);
let delay0 = retry_mode.delay_for_attempt(0).unwrap();
let delay1 = retry_mode.delay_for_attempt(1).unwrap();
let delay2 = retry_mode.delay_for_attempt(2).unwrap();
assert!(delay1.as_millis() >= delay0.as_millis() / 2); assert!(delay2.as_millis() >= delay1.as_millis() / 2);
assert_eq!(retry_mode.delay_for_attempt(3), None);
assert_eq!(retry_mode.delay_for_attempt(4), None);
}
#[test]
fn test_retry_mode_exponential_custom() {
let retry_mode = RetryMode::exponential_custom(
2, Duration::from_millis(50), 3.0, Duration::from_secs(5), 0.0, );
assert_eq!(retry_mode.max_attempts(), 3);
assert_eq!(
retry_mode.delay_for_attempt(0),
Some(Duration::from_millis(50))
);
assert_eq!(
retry_mode.delay_for_attempt(1),
Some(Duration::from_millis(150))
);
assert_eq!(retry_mode.delay_for_attempt(2), None);
}
#[test]
fn test_retry_mode_exponential_max_delay_cap() {
let retry_mode = RetryMode::exponential_custom(
5,
Duration::from_millis(100), 10.0, Duration::from_millis(500), 0.0, );
let delay0 = retry_mode.delay_for_attempt(0).unwrap();
let delay1 = retry_mode.delay_for_attempt(1).unwrap();
let delay2 = retry_mode.delay_for_attempt(2).unwrap();
assert_eq!(delay0, Duration::from_millis(100)); assert_eq!(delay1, Duration::from_millis(500)); assert_eq!(delay2, Duration::from_millis(500)); }
#[test]
fn test_retry_mode_exponential_jitter_bounds() {
let retry_mode = RetryMode::exponential_custom(
3,
Duration::from_millis(100),
2.0,
Duration::from_secs(30),
0.5, );
let mut delays = Vec::new();
for _ in 0..20 {
if let Some(delay) = retry_mode.delay_for_attempt(0) {
delays.push(delay.as_millis());
}
}
let min_delay = delays.iter().min().unwrap();
let max_delay = delays.iter().max().unwrap();
assert!(*min_delay >= 50); assert!(*max_delay <= 150); }
#[test]
fn test_retry_mode_jitter_clamping() {
let retry_mode1 = RetryMode::exponential_custom(
1,
Duration::from_millis(100),
2.0,
Duration::from_secs(30),
-0.5, );
let retry_mode2 = RetryMode::exponential_custom(
1,
Duration::from_millis(100),
2.0,
Duration::from_secs(30),
1.5, );
assert!(retry_mode1.delay_for_attempt(0).is_some());
assert!(retry_mode2.delay_for_attempt(0).is_some());
}
#[test]
fn test_retry_mode_default() {
let retry_mode = RetryMode::default();
assert_eq!(retry_mode.max_attempts(), 4);
assert!(retry_mode.delay_for_attempt(0).is_some());
assert!(retry_mode.delay_for_attempt(1).is_some());
assert!(retry_mode.delay_for_attempt(2).is_some());
assert!(retry_mode.delay_for_attempt(3).is_none());
}
#[test]
fn test_retry_mode_builder_methods() {
let fixed = RetryMode::fixed(2, Duration::from_millis(200));
assert_eq!(fixed.max_attempts(), 3);
let exponential = RetryMode::exponential(5);
assert_eq!(exponential.max_attempts(), 6);
if let RetryMode::ExponentialBackoff {
base_delay,
multiplier,
max_delay,
jitter,
..
} = exponential
{
assert_eq!(base_delay, Duration::from_millis(100));
assert_eq!(multiplier, 2.0);
assert_eq!(max_delay, Duration::from_secs(30));
assert_eq!(jitter, 0.1);
} else {
panic!("Expected ExponentialBackoff variant");
}
}
#[tokio::test]
async fn test_retry_mode_in_node_execution() {
use std::sync::atomic::{AtomicUsize, Ordering};
struct FailNTimesNode {
fail_count: usize,
attempt_counter: Arc<AtomicUsize>,
}
impl FailNTimesNode {
fn new(fail_count: usize) -> Self {
Self {
fail_count,
attempt_counter: Arc::new(AtomicUsize::new(0)),
}
}
fn attempt_count(&self) -> usize {
self.attempt_counter.load(Ordering::SeqCst)
}
}
#[node]
impl Node<TestAction> for FailNTimesNode {
type PrepResult = ();
type ExecResult = ();
fn config(&self) -> TaskConfig {
TaskConfig::new().with_fixed_retry(5, Duration::from_millis(1))
}
async fn prep(&self, _res: &Resources) -> Result<Self::PrepResult, CanoError> {
let attempt = self.attempt_counter.fetch_add(1, Ordering::SeqCst);
if attempt < self.fail_count {
Err(CanoError::preparation("Simulated failure"))
} else {
Ok(())
}
}
async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult {}
async fn post(
&self,
_res: &Resources,
_exec_res: Self::ExecResult,
) -> Result<TestAction, CanoError> {
Ok(TestAction::Complete)
}
}
let res = Resources::new();
let node1 = FailNTimesNode::new(2);
let result1 = node1.run(&res).await.unwrap();
assert_eq!(result1, TestAction::Complete);
assert_eq!(node1.attempt_count(), 3);
let node2 = FailNTimesNode::new(10); let result2 = node2.run(&res).await;
assert!(result2.is_err());
assert_eq!(node2.attempt_count(), 6); }
#[tokio::test]
async fn test_retry_mode_timing() {
use std::time::Instant;
struct AlwaysFailNode;
#[node]
impl Node<TestAction> for AlwaysFailNode {
type PrepResult = ();
type ExecResult = ();
fn config(&self) -> TaskConfig {
TaskConfig::new().with_fixed_retry(2, Duration::from_millis(50))
}
async fn prep(&self, _res: &Resources) -> Result<Self::PrepResult, CanoError> {
Err(CanoError::preparation("Always fails"))
}
async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult {}
async fn post(
&self,
_res: &Resources,
_exec_res: Self::ExecResult,
) -> Result<TestAction, CanoError> {
Ok(TestAction::Complete)
}
}
let res = Resources::new();
let node = AlwaysFailNode;
let start = Instant::now();
let result = node.run(&res).await;
let elapsed = start.elapsed();
assert!(result.is_err());
assert!(elapsed >= Duration::from_millis(90));
}
#[tokio::test]
async fn test_retry_reruns_all_phases() {
use std::sync::atomic::{AtomicUsize, Ordering};
struct CountedNode {
prep_counter: Arc<AtomicUsize>,
exec_counter: Arc<AtomicUsize>,
post_counter: Arc<AtomicUsize>,
}
#[node]
impl Node<TestAction> for CountedNode {
type PrepResult = ();
type ExecResult = ();
fn config(&self) -> TaskConfig {
TaskConfig::new().with_fixed_retry(2, Duration::from_millis(1))
}
async fn prep(&self, _res: &Resources) -> Result<Self::PrepResult, CanoError> {
self.prep_counter.fetch_add(1, Ordering::SeqCst);
Ok(())
}
async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult {
self.exec_counter.fetch_add(1, Ordering::SeqCst);
}
async fn post(
&self,
_res: &Resources,
_exec_res: Self::ExecResult,
) -> Result<TestAction, CanoError> {
let count = self.post_counter.fetch_add(1, Ordering::SeqCst) + 1;
if count < 2 {
Err(CanoError::node_execution("post fails first time"))
} else {
Ok(TestAction::Complete)
}
}
}
let prep_counter = Arc::new(AtomicUsize::new(0));
let exec_counter = Arc::new(AtomicUsize::new(0));
let post_counter = Arc::new(AtomicUsize::new(0));
let node = CountedNode {
prep_counter: Arc::clone(&prep_counter),
exec_counter: Arc::clone(&exec_counter),
post_counter: Arc::clone(&post_counter),
};
let res = Resources::new();
let result = node.run(&res).await.unwrap();
assert_eq!(result, TestAction::Complete);
assert_eq!(prep_counter.load(Ordering::SeqCst), 2, "prep ran twice");
assert_eq!(exec_counter.load(Ordering::SeqCst), 2, "exec ran twice");
assert_eq!(post_counter.load(Ordering::SeqCst), 2, "post ran twice");
}
#[tokio::test]
async fn test_node_retry_exhausted_error_type() {
use std::sync::atomic::{AtomicUsize, Ordering};
struct AlwaysFailNode {
attempt_counter: Arc<AtomicUsize>,
}
#[node]
impl Node<TestAction> for AlwaysFailNode {
type PrepResult = ();
type ExecResult = ();
fn config(&self) -> TaskConfig {
TaskConfig::new().with_fixed_retry(2, Duration::from_millis(1))
}
async fn prep(&self, _res: &Resources) -> Result<Self::PrepResult, CanoError> {
self.attempt_counter.fetch_add(1, Ordering::SeqCst);
Err(CanoError::preparation("always fails"))
}
async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult {}
async fn post(
&self,
_res: &Resources,
_exec_res: Self::ExecResult,
) -> Result<TestAction, CanoError> {
Ok(TestAction::Complete)
}
}
let node = AlwaysFailNode {
attempt_counter: Arc::new(AtomicUsize::new(0)),
};
let res = Resources::new();
let result = node.run(&res).await;
assert!(result.is_err());
assert_eq!(node.attempt_counter.load(Ordering::SeqCst), 3);
let err = result.unwrap_err();
assert!(
matches!(err, CanoError::RetryExhausted(_)),
"expected RetryExhausted after retry exhaustion, got: {err}"
);
}
#[tokio::test]
async fn test_node_no_retry_preserves_error_variant() {
use std::sync::atomic::{AtomicUsize, Ordering};
struct PrepFailNode {
attempt_counter: Arc<AtomicUsize>,
}
#[node]
impl Node<TestAction> for PrepFailNode {
type PrepResult = ();
type ExecResult = ();
fn config(&self) -> TaskConfig {
TaskConfig::minimal()
}
async fn prep(&self, _res: &Resources) -> Result<Self::PrepResult, CanoError> {
self.attempt_counter.fetch_add(1, Ordering::SeqCst);
Err(CanoError::preparation("prep boom"))
}
async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult {}
async fn post(
&self,
_res: &Resources,
_exec_res: Self::ExecResult,
) -> Result<TestAction, CanoError> {
Ok(TestAction::Complete)
}
}
let node = PrepFailNode {
attempt_counter: Arc::new(AtomicUsize::new(0)),
};
let res = Resources::new();
let err = node.run(&res).await.unwrap_err();
assert_eq!(node.attempt_counter.load(Ordering::SeqCst), 1);
assert!(
matches!(err, CanoError::Preparation(_)),
"expected original Preparation variant when retries disabled, got: {err}"
);
assert!(err.to_string().contains("prep boom"));
struct PostFailNode;
#[node]
impl Node<TestAction> for PostFailNode {
type PrepResult = ();
type ExecResult = ();
fn config(&self) -> TaskConfig {
TaskConfig::minimal()
}
async fn prep(&self, _res: &Resources) -> Result<Self::PrepResult, CanoError> {
Ok(())
}
async fn exec(&self, _prep_res: Self::PrepResult) -> Self::ExecResult {}
async fn post(
&self,
_res: &Resources,
_exec_res: Self::ExecResult,
) -> Result<TestAction, CanoError> {
Err(CanoError::node_execution("post boom"))
}
}
let err = PostFailNode.run(&res).await.unwrap_err();
assert!(
matches!(err, CanoError::NodeExecution(_)),
"expected original NodeExecution variant when retries disabled, got: {err}"
);
assert!(err.to_string().contains("post boom"));
}
}