use crate::error::CanoError;
pub trait WorkflowObserver: Send + Sync + 'static {
fn on_state_enter(&self, _state: &str) {}
fn on_task_start(&self, _task_id: &str) {}
fn on_task_success(&self, _task_id: &str) {}
fn on_task_failure(&self, _task_id: &str, _err: &CanoError) {}
fn on_retry(&self, _task_id: &str, _attempt: u32) {}
fn on_circuit_open(&self, _task_id: &str) {}
fn on_checkpoint(&self, _workflow_id: &str, _sequence: u64) {}
fn on_resume(&self, _workflow_id: &str, _sequence: u64) {}
}
#[cfg(feature = "tracing")]
#[derive(Debug, Clone, Copy, Default)]
pub struct TracingObserver;
#[cfg(feature = "tracing")]
impl TracingObserver {
#[must_use]
pub fn new() -> Self {
Self
}
}
#[cfg(feature = "tracing")]
impl WorkflowObserver for TracingObserver {
fn on_state_enter(&self, state: &str) {
tracing::debug!(state, "workflow entered state");
}
fn on_task_start(&self, task_id: &str) {
tracing::debug!(task_id, "task started");
}
fn on_task_success(&self, task_id: &str) {
tracing::info!(task_id, "task succeeded");
}
fn on_task_failure(&self, task_id: &str, err: &CanoError) {
tracing::error!(task_id, error = %err, "task failed");
}
fn on_retry(&self, task_id: &str, attempt: u32) {
tracing::warn!(task_id, attempt, "task retry");
}
fn on_circuit_open(&self, task_id: &str) {
tracing::warn!(task_id, "circuit breaker rejected task");
}
fn on_checkpoint(&self, workflow_id: &str, sequence: u64) {
tracing::debug!(workflow_id, sequence, "checkpoint appended");
}
fn on_resume(&self, workflow_id: &str, sequence: u64) {
tracing::info!(workflow_id, sequence, "workflow resumed from checkpoint");
}
}
#[cfg(feature = "metrics")]
#[derive(Debug, Clone, Default)]
pub struct MetricsObserver;
#[cfg(feature = "metrics")]
impl MetricsObserver {
#[must_use]
pub fn new() -> Self {
Self
}
}
#[cfg(feature = "metrics")]
impl WorkflowObserver for MetricsObserver {
fn on_state_enter(&self, state: &str) {
crate::metrics::observed_state_enter(state);
}
fn on_task_success(&self, task_id: &str) {
crate::metrics::observed_task_run(task_id, true);
}
fn on_task_failure(&self, task_id: &str, _err: &CanoError) {
crate::metrics::observed_task_run(task_id, false);
}
fn on_retry(&self, task_id: &str, _attempt: u32) {
crate::metrics::observed_task_retry(task_id);
}
fn on_circuit_open(&self, task_id: &str) {
crate::metrics::observed_circuit_open(task_id);
}
fn on_checkpoint(&self, _workflow_id: &str, _sequence: u64) {
crate::metrics::observed_checkpoint();
}
fn on_resume(&self, _workflow_id: &str, _sequence: u64) {
crate::metrics::observed_resume();
}
}
#[cfg(test)]
mod tests {
use crate::prelude::*;
use std::borrow::Cow;
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[derive(Default)]
struct RecordingObserver {
events: Mutex<Vec<String>>,
}
impl RecordingObserver {
fn events(&self) -> Vec<String> {
self.events.lock().unwrap().clone()
}
fn record(&self, event: String) {
self.events.lock().unwrap().push(event);
}
}
impl WorkflowObserver for RecordingObserver {
fn on_state_enter(&self, state: &str) {
self.record(format!("state_enter:{state}"));
}
fn on_task_start(&self, task_id: &str) {
self.record(format!("task_start:{task_id}"));
}
fn on_task_success(&self, task_id: &str) {
self.record(format!("task_success:{task_id}"));
}
fn on_task_failure(&self, task_id: &str, _err: &CanoError) {
self.record(format!("task_failure:{task_id}"));
}
fn on_retry(&self, task_id: &str, attempt: u32) {
self.record(format!("retry:{task_id}:{attempt}"));
}
fn on_circuit_open(&self, task_id: &str) {
self.record(format!("circuit_open:{task_id}"));
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
enum S {
Start,
Done,
}
struct OkTask;
#[task]
impl Task<S> for OkTask {
async fn run_bare(&self) -> Result<TaskResult<S>, CanoError> {
Ok(TaskResult::Single(S::Done))
}
fn name(&self) -> Cow<'static, str> {
"OkTask".into()
}
}
struct FailTask;
#[task]
impl Task<S> for FailTask {
async fn run_bare(&self) -> Result<TaskResult<S>, CanoError> {
Err(CanoError::task_execution("boom"))
}
fn config(&self) -> TaskConfig {
TaskConfig::new().with_fixed_retry(2, Duration::from_millis(1))
}
fn name(&self) -> Cow<'static, str> {
"FailTask".into()
}
}
struct CircuitTask {
breaker: Arc<CircuitBreaker>,
}
#[task]
impl Task<S> for CircuitTask {
async fn run_bare(&self) -> Result<TaskResult<S>, CanoError> {
Ok(TaskResult::Single(S::Done))
}
fn config(&self) -> TaskConfig {
TaskConfig::minimal().with_circuit_breaker(Arc::clone(&self.breaker))
}
fn name(&self) -> Cow<'static, str> {
"CircuitTask".into()
}
}
fn position(events: &[String], needle: &str) -> usize {
events
.iter()
.position(|e| e == needle)
.unwrap_or_else(|| panic!("event {needle:?} not found in {events:?}"))
}
#[tokio::test]
async fn observer_fires_on_success_path() {
let observer = Arc::new(RecordingObserver::default());
let workflow = Workflow::bare()
.register(S::Start, OkTask)
.add_exit_state(S::Done)
.with_observer(observer.clone());
assert_eq!(workflow.orchestrate(S::Start).await.unwrap(), S::Done);
let events = observer.events();
assert!(
events.contains(&"state_enter:Start".to_string()),
"{events:?}"
);
assert!(
events.contains(&"task_start:OkTask".to_string()),
"{events:?}"
);
assert!(
events.contains(&"task_success:OkTask".to_string()),
"{events:?}"
);
assert!(
events.contains(&"state_enter:Done".to_string()),
"{events:?}"
);
assert!(position(&events, "state_enter:Start") < position(&events, "task_start:OkTask"));
assert!(position(&events, "task_start:OkTask") < position(&events, "task_success:OkTask"));
assert!(position(&events, "task_success:OkTask") < position(&events, "state_enter:Done"));
}
#[tokio::test]
async fn observer_fires_on_retry_and_failure() {
let observer = Arc::new(RecordingObserver::default());
let workflow = Workflow::bare()
.register(S::Start, FailTask)
.add_exit_state(S::Done)
.with_observer(observer.clone());
assert!(workflow.orchestrate(S::Start).await.is_err());
let events = observer.events();
assert!(
events.contains(&"task_start:FailTask".to_string()),
"{events:?}"
);
assert!(
events.contains(&"retry:FailTask:1".to_string()),
"{events:?}"
);
assert!(
events.contains(&"retry:FailTask:2".to_string()),
"{events:?}"
);
assert!(
events.contains(&"task_failure:FailTask".to_string()),
"{events:?}"
);
assert!(
!events.contains(&"task_success:FailTask".to_string()),
"{events:?}"
);
}
#[tokio::test]
async fn observer_fires_on_circuit_open() {
let breaker = Arc::new(CircuitBreaker::new(CircuitPolicy {
failure_threshold: 1,
reset_timeout: Duration::from_secs(60),
..CircuitPolicy::default()
}));
let permit = breaker.try_acquire().expect("closed breaker admits");
breaker.record_failure(permit);
let observer = Arc::new(RecordingObserver::default());
let workflow = Workflow::bare()
.register(
S::Start,
CircuitTask {
breaker: Arc::clone(&breaker),
},
)
.add_exit_state(S::Done)
.with_observer(observer.clone());
let err = workflow.orchestrate(S::Start).await.unwrap_err();
assert!(matches!(err, CanoError::CircuitOpen(_)), "{err}");
let events = observer.events();
assert!(
events.contains(&"circuit_open:CircuitTask".to_string()),
"{events:?}"
);
assert!(
events.contains(&"task_failure:CircuitTask".to_string()),
"{events:?}"
);
}
#[tokio::test]
async fn no_observer_is_zero_overhead_and_still_runs() {
let workflow = Workflow::bare()
.register(S::Start, OkTask)
.add_exit_state(S::Done);
assert_eq!(workflow.orchestrate(S::Start).await.unwrap(), S::Done);
}
#[test]
fn default_task_name_is_type_name() {
struct Anon;
#[task]
impl Task<S> for Anon {
async fn run_bare(&self) -> Result<TaskResult<S>, CanoError> {
Ok(TaskResult::Single(S::Done))
}
}
assert!(Anon.name().contains("Anon"), "{}", Anon.name());
}
}
#[cfg(all(test, feature = "metrics"))]
mod metrics_observer_tests {
use super::*;
use crate::metrics::test_support::*;
use crate::prelude::*;
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum S {
Start,
Mid,
Done,
}
struct GoTo(S);
#[crate::task]
impl Task<S> for GoTo {
fn name(&self) -> std::borrow::Cow<'static, str> {
std::borrow::Cow::Borrowed("GoTo")
}
async fn run_bare(&self) -> Result<TaskResult<S>, CanoError> {
Ok(TaskResult::Single(self.0.clone()))
}
}
#[test]
fn metrics_observer_emits_state_enters_and_task_runs() {
let (res, rows) = run_with_recorder(|| async {
Workflow::bare()
.with_observer(Arc::new(MetricsObserver::new()))
.register(S::Start, GoTo(S::Mid))
.register(S::Mid, GoTo(S::Done))
.add_exit_state(S::Done)
.orchestrate(S::Start)
.await
});
assert_eq!(res.unwrap(), S::Done);
assert_eq!(
counter(&rows, "cano_state_enters_total", &[("state", "Start")]),
1
);
assert_eq!(
counter(&rows, "cano_state_enters_total", &[("state", "Mid")]),
1
);
assert_eq!(
counter(&rows, "cano_state_enters_total", &[("state", "Done")]),
1
);
assert_eq!(
counter(
&rows,
"cano_observed_task_runs_total",
&[("task", "GoTo"), ("outcome", "completed")]
),
2
);
}
#[test]
fn metrics_observer_emits_retries() {
use std::sync::atomic::{AtomicUsize, Ordering};
let n = Arc::new(AtomicUsize::new(0));
let n2 = Arc::clone(&n);
struct Flaky(Arc<AtomicUsize>);
#[crate::task]
impl Task<S> for Flaky {
fn name(&self) -> std::borrow::Cow<'static, str> {
std::borrow::Cow::Borrowed("Flaky")
}
fn config(&self) -> TaskConfig {
TaskConfig::new().with_fixed_retry(3, std::time::Duration::from_millis(1))
}
async fn run_bare(&self) -> Result<TaskResult<S>, CanoError> {
if self.0.fetch_add(1, Ordering::SeqCst) < 2 {
Err(CanoError::task_execution("transient"))
} else {
Ok(TaskResult::Single(S::Done))
}
}
}
let (res, rows) = run_with_recorder(move || async move {
Workflow::bare()
.with_observer(Arc::new(MetricsObserver::new()))
.register(S::Start, Flaky(n2))
.add_exit_state(S::Done)
.orchestrate(S::Start)
.await
});
assert_eq!(res.unwrap(), S::Done);
assert_eq!(
counter(&rows, "cano_task_retries_total", &[("task", "Flaky")]),
2
);
assert_eq!(
counter(
&rows,
"cano_observed_task_runs_total",
&[("task", "Flaky"), ("outcome", "completed")]
),
1
);
}
}