use crate::error::CanoError;
pub trait WorkflowObserver: Send + Sync + 'static {
fn on_state_enter(&self, _state: &str) {}
fn on_task_start(&self, _task_id: &str) {}
fn on_task_success(&self, _task_id: &str) {}
fn on_task_failure(&self, _task_id: &str, _err: &CanoError) {}
fn on_retry(&self, _task_id: &str, _attempt: u32) {}
fn on_circuit_open(&self, _task_id: &str) {}
fn on_checkpoint(&self, _workflow_id: &str, _sequence: u64) {}
fn on_resume(&self, _workflow_id: &str, _sequence: u64) {}
fn on_workflow_timeout(&self, _elapsed: std::time::Duration, _limit: std::time::Duration) {}
fn on_checkpoint_clear_failed(&self, _workflow_id: &str, _error: &crate::error::CanoError) {}
fn on_unknown_resume_state(
&self,
_workflow_id: &str,
_sequence: u64,
_unknown_state_label: &str,
) {
}
}
#[cfg(feature = "tracing")]
#[derive(Debug, Clone, Copy, Default)]
pub struct TracingObserver;
#[cfg(feature = "tracing")]
impl TracingObserver {
#[must_use]
pub fn new() -> Self {
Self
}
}
#[cfg(feature = "tracing")]
impl WorkflowObserver for TracingObserver {
fn on_state_enter(&self, state: &str) {
tracing::debug!(state, "workflow entered state");
}
fn on_task_start(&self, task_id: &str) {
tracing::debug!(task_id, "task started");
}
fn on_task_success(&self, task_id: &str) {
tracing::info!(task_id, "task succeeded");
}
fn on_task_failure(&self, task_id: &str, err: &CanoError) {
tracing::error!(task_id, error = %err, "task failed");
}
fn on_retry(&self, task_id: &str, attempt: u32) {
tracing::warn!(task_id, attempt, "task retry");
}
fn on_circuit_open(&self, task_id: &str) {
tracing::warn!(task_id, "circuit breaker rejected task");
}
fn on_checkpoint(&self, workflow_id: &str, sequence: u64) {
tracing::debug!(workflow_id, sequence, "checkpoint appended");
}
fn on_resume(&self, workflow_id: &str, sequence: u64) {
tracing::info!(workflow_id, sequence, "workflow resumed from checkpoint");
}
fn on_workflow_timeout(&self, elapsed: std::time::Duration, limit: std::time::Duration) {
tracing::warn!(
elapsed_ms = elapsed.as_millis() as u64,
limit_ms = limit.as_millis() as u64,
"workflow total timeout exceeded"
);
}
fn on_checkpoint_clear_failed(&self, workflow_id: &str, error: &CanoError) {
tracing::warn!(workflow_id, error = %error, "checkpoint log clear failed");
}
fn on_unknown_resume_state(&self, workflow_id: &str, sequence: u64, unknown_state_label: &str) {
tracing::warn!(
workflow_id,
sequence,
unknown_state_label,
"checkpoint row references a state not registered on the current workflow — dropped from rehydrated path"
);
}
}
#[cfg(feature = "metrics")]
#[derive(Debug, Clone, Default)]
pub struct MetricsObserver;
#[cfg(feature = "metrics")]
impl MetricsObserver {
#[must_use]
pub fn new() -> Self {
Self
}
}
#[cfg(feature = "metrics")]
impl WorkflowObserver for MetricsObserver {
fn on_state_enter(&self, state: &str) {
crate::metrics::observed_state_enter(state);
}
fn on_task_success(&self, task_id: &str) {
crate::metrics::observed_task_run(task_id, true);
}
fn on_task_failure(&self, task_id: &str, _err: &CanoError) {
crate::metrics::observed_task_run(task_id, false);
}
fn on_retry(&self, task_id: &str, _attempt: u32) {
crate::metrics::observed_task_retry(task_id);
}
fn on_circuit_open(&self, task_id: &str) {
crate::metrics::observed_circuit_open(task_id);
}
fn on_checkpoint(&self, _workflow_id: &str, _sequence: u64) {
crate::metrics::observed_checkpoint();
}
fn on_resume(&self, _workflow_id: &str, _sequence: u64) {
crate::metrics::observed_resume();
}
fn on_workflow_timeout(&self, elapsed: std::time::Duration, limit: std::time::Duration) {
crate::metrics::observed_workflow_timeout(elapsed, limit);
}
fn on_checkpoint_clear_failed(&self, _workflow_id: &str, _error: &CanoError) {
crate::metrics::checkpoint_clear(false);
}
fn on_unknown_resume_state(
&self,
_workflow_id: &str,
_sequence: u64,
_unknown_state_label: &str,
) {
crate::metrics::observed_unknown_resume_state();
}
}
#[cfg(test)]
mod tests {
use crate::prelude::*;
use std::borrow::Cow;
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[derive(Default)]
struct RecordingObserver {
events: Mutex<Vec<String>>,
}
impl RecordingObserver {
fn events(&self) -> Vec<String> {
self.events.lock().unwrap().clone()
}
fn record(&self, event: String) {
self.events.lock().unwrap().push(event);
}
}
impl WorkflowObserver for RecordingObserver {
fn on_state_enter(&self, state: &str) {
self.record(format!("state_enter:{state}"));
}
fn on_task_start(&self, task_id: &str) {
self.record(format!("task_start:{task_id}"));
}
fn on_task_success(&self, task_id: &str) {
self.record(format!("task_success:{task_id}"));
}
fn on_task_failure(&self, task_id: &str, _err: &CanoError) {
self.record(format!("task_failure:{task_id}"));
}
fn on_retry(&self, task_id: &str, attempt: u32) {
self.record(format!("retry:{task_id}:{attempt}"));
}
fn on_circuit_open(&self, task_id: &str) {
self.record(format!("circuit_open:{task_id}"));
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
enum S {
Start,
Done,
}
struct OkTask;
#[task]
impl Task<S> for OkTask {
async fn run_bare(&self) -> Result<TaskResult<S>, CanoError> {
Ok(TaskResult::Single(S::Done))
}
fn name(&self) -> Cow<'static, str> {
"OkTask".into()
}
}
struct FailTask;
#[task]
impl Task<S> for FailTask {
async fn run_bare(&self) -> Result<TaskResult<S>, CanoError> {
Err(CanoError::task_execution("boom"))
}
fn config(&self) -> TaskConfig {
TaskConfig::new().with_fixed_retry(2, Duration::from_millis(1))
}
fn name(&self) -> Cow<'static, str> {
"FailTask".into()
}
}
struct CircuitTask {
breaker: Arc<CircuitBreaker>,
}
#[task]
impl Task<S> for CircuitTask {
async fn run_bare(&self) -> Result<TaskResult<S>, CanoError> {
Ok(TaskResult::Single(S::Done))
}
fn config(&self) -> TaskConfig {
TaskConfig::minimal().with_circuit_breaker(Arc::clone(&self.breaker))
}
fn name(&self) -> Cow<'static, str> {
"CircuitTask".into()
}
}
fn position(events: &[String], needle: &str) -> usize {
events
.iter()
.position(|e| e == needle)
.unwrap_or_else(|| panic!("event {needle:?} not found in {events:?}"))
}
#[tokio::test]
async fn observer_fires_on_success_path() {
let observer = Arc::new(RecordingObserver::default());
let workflow = Workflow::bare()
.register(S::Start, OkTask)
.add_exit_state(S::Done)
.with_observer(observer.clone());
assert_eq!(workflow.orchestrate(S::Start).await.unwrap(), S::Done);
let events = observer.events();
assert!(
events.contains(&"state_enter:Start".to_string()),
"{events:?}"
);
assert!(
events.contains(&"task_start:OkTask".to_string()),
"{events:?}"
);
assert!(
events.contains(&"task_success:OkTask".to_string()),
"{events:?}"
);
assert!(
events.contains(&"state_enter:Done".to_string()),
"{events:?}"
);
assert!(position(&events, "state_enter:Start") < position(&events, "task_start:OkTask"));
assert!(position(&events, "task_start:OkTask") < position(&events, "task_success:OkTask"));
assert!(position(&events, "task_success:OkTask") < position(&events, "state_enter:Done"));
}
#[tokio::test]
async fn observer_fires_on_retry_and_failure() {
let observer = Arc::new(RecordingObserver::default());
let workflow = Workflow::bare()
.register(S::Start, FailTask)
.add_exit_state(S::Done)
.with_observer(observer.clone());
assert!(workflow.orchestrate(S::Start).await.is_err());
let events = observer.events();
assert!(
events.contains(&"task_start:FailTask".to_string()),
"{events:?}"
);
assert!(
events.contains(&"retry:FailTask:1".to_string()),
"{events:?}"
);
assert!(
events.contains(&"retry:FailTask:2".to_string()),
"{events:?}"
);
assert!(
events.contains(&"task_failure:FailTask".to_string()),
"{events:?}"
);
assert!(
!events.contains(&"task_success:FailTask".to_string()),
"{events:?}"
);
}
#[tokio::test]
async fn observer_fires_on_circuit_open() {
let breaker = Arc::new(CircuitBreaker::new(CircuitPolicy {
failure_threshold: 1,
reset_timeout: Duration::from_secs(60),
..CircuitPolicy::default()
}));
let permit = breaker.try_acquire().expect("closed breaker admits");
breaker.record_failure(permit);
let observer = Arc::new(RecordingObserver::default());
let workflow = Workflow::bare()
.register(
S::Start,
CircuitTask {
breaker: Arc::clone(&breaker),
},
)
.add_exit_state(S::Done)
.with_observer(observer.clone());
let err = workflow.orchestrate(S::Start).await.unwrap_err();
assert!(matches!(err.inner(), CanoError::CircuitOpen(_)), "{err}");
let events = observer.events();
assert!(
events.contains(&"circuit_open:CircuitTask".to_string()),
"{events:?}"
);
assert!(
events.contains(&"task_failure:CircuitTask".to_string()),
"{events:?}"
);
}
#[tokio::test]
async fn no_observer_is_zero_overhead_and_still_runs() {
let workflow = Workflow::bare()
.register(S::Start, OkTask)
.add_exit_state(S::Done);
assert_eq!(workflow.orchestrate(S::Start).await.unwrap(), S::Done);
}
#[test]
fn default_task_name_is_type_name() {
struct Anon;
#[task]
impl Task<S> for Anon {
async fn run_bare(&self) -> Result<TaskResult<S>, CanoError> {
Ok(TaskResult::Single(S::Done))
}
}
assert!(Anon.name().contains("Anon"), "{}", Anon.name());
}
#[cfg(feature = "tracing")]
#[test]
fn tracing_observer_forwards_checkpoint_clear_failed() {
use std::sync::Arc;
let obs: Arc<dyn WorkflowObserver> = Arc::new(crate::observer::TracingObserver::new());
let err = CanoError::checkpoint_store("clear failed");
obs.on_checkpoint_clear_failed("wf-id", &err);
}
#[cfg(feature = "metrics")]
#[test]
fn metrics_observer_forwards_checkpoint_clear_failed() {
use std::sync::Arc;
let obs: Arc<dyn WorkflowObserver> = Arc::new(crate::observer::MetricsObserver::new());
let err = CanoError::checkpoint_store("clear failed");
obs.on_checkpoint_clear_failed("wf-id", &err);
}
#[test]
fn on_workflow_timeout_default_is_no_op_and_callable_via_trait_object() {
use std::sync::Arc;
use std::time::Duration;
struct NoopObs;
impl WorkflowObserver for NoopObs {}
let obs: Arc<dyn WorkflowObserver> = Arc::new(NoopObs);
obs.on_workflow_timeout(Duration::from_millis(150), Duration::from_millis(100));
}
#[test]
fn on_workflow_timeout_can_be_overridden_to_record_event() {
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[derive(Default)]
struct Recorder(Mutex<Vec<(Duration, Duration)>>);
impl WorkflowObserver for Recorder {
fn on_workflow_timeout(&self, elapsed: Duration, limit: Duration) {
self.0.lock().unwrap().push((elapsed, limit));
}
}
let obs = Arc::new(Recorder::default());
let dyn_obs: Arc<dyn WorkflowObserver> = obs.clone();
dyn_obs.on_workflow_timeout(Duration::from_millis(150), Duration::from_millis(100));
let events = obs.0.lock().unwrap().clone();
assert_eq!(
events,
vec![(Duration::from_millis(150), Duration::from_millis(100))]
);
}
}
#[cfg(all(test, feature = "metrics"))]
mod metrics_observer_tests {
use super::*;
use crate::metrics::test_support::*;
use crate::prelude::*;
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum S {
Start,
Mid,
Done,
}
struct GoTo(S);
#[crate::task]
impl Task<S> for GoTo {
fn name(&self) -> std::borrow::Cow<'static, str> {
std::borrow::Cow::Borrowed("GoTo")
}
async fn run_bare(&self) -> Result<TaskResult<S>, CanoError> {
Ok(TaskResult::Single(self.0.clone()))
}
}
#[test]
fn metrics_observer_emits_state_enters_and_task_runs() {
let (res, rows) = run_with_recorder(|| async {
Workflow::bare()
.with_observer(Arc::new(MetricsObserver::new()))
.register(S::Start, GoTo(S::Mid))
.register(S::Mid, GoTo(S::Done))
.add_exit_state(S::Done)
.orchestrate(S::Start)
.await
});
assert_eq!(res.unwrap(), S::Done);
assert_eq!(
counter(&rows, "cano_state_enters_total", &[("state", "Start")]),
1
);
assert_eq!(
counter(&rows, "cano_state_enters_total", &[("state", "Mid")]),
1
);
assert_eq!(
counter(&rows, "cano_state_enters_total", &[("state", "Done")]),
1
);
assert_eq!(
counter(
&rows,
"cano_observed_task_runs_total",
&[("task", "GoTo"), ("outcome", "completed")]
),
2
);
}
#[test]
fn metrics_observer_emits_on_workflow_timeout() {
struct SlowTask;
#[crate::task]
impl Task<S> for SlowTask {
fn name(&self) -> std::borrow::Cow<'static, str> {
std::borrow::Cow::Borrowed("SlowTask")
}
fn config(&self) -> TaskConfig {
TaskConfig::minimal()
}
async fn run_bare(&self) -> Result<TaskResult<S>, CanoError> {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
Ok(TaskResult::Single(S::Done))
}
}
let (res, rows) = run_with_recorder(|| async {
Workflow::bare()
.with_observer(Arc::new(MetricsObserver::new()))
.with_total_timeout(std::time::Duration::from_millis(20))
.register(S::Start, SlowTask)
.add_exit_state(S::Done)
.orchestrate(S::Start)
.await
});
assert!(res.is_err());
assert_eq!(
counter(&rows, "cano_observed_workflow_timeouts_total", &[]),
1,
"MetricsObserver must increment its workflow_timeouts counter on a total-timeout trip"
);
assert_eq!(
histogram_count(&rows, "cano_observed_workflow_timeout_limit_seconds", &[]),
1,
"must record the configured `limit` for at-a-glance budget analysis"
);
assert_eq!(
histogram_count(&rows, "cano_observed_workflow_timeout_elapsed_seconds", &[]),
1,
"must record actual elapsed so dashboards can compare against `limit`"
);
}
#[test]
fn metrics_observer_emits_retries() {
use std::sync::atomic::{AtomicUsize, Ordering};
let n = Arc::new(AtomicUsize::new(0));
let n2 = Arc::clone(&n);
struct Flaky(Arc<AtomicUsize>);
#[crate::task]
impl Task<S> for Flaky {
fn name(&self) -> std::borrow::Cow<'static, str> {
std::borrow::Cow::Borrowed("Flaky")
}
fn config(&self) -> TaskConfig {
TaskConfig::new().with_fixed_retry(3, std::time::Duration::from_millis(1))
}
async fn run_bare(&self) -> Result<TaskResult<S>, CanoError> {
if self.0.fetch_add(1, Ordering::SeqCst) < 2 {
Err(CanoError::task_execution("transient"))
} else {
Ok(TaskResult::Single(S::Done))
}
}
}
let (res, rows) = run_with_recorder(move || async move {
Workflow::bare()
.with_observer(Arc::new(MetricsObserver::new()))
.register(S::Start, Flaky(n2))
.add_exit_state(S::Done)
.orchestrate(S::Start)
.await
});
assert_eq!(res.unwrap(), S::Done);
assert_eq!(
counter(&rows, "cano_task_retries_total", &[("task", "Flaky")]),
2
);
assert_eq!(
counter(
&rows,
"cano_observed_task_runs_total",
&[("task", "Flaky"), ("outcome", "completed")]
),
1
);
}
}