use crate::error::CanoError;
pub trait WorkflowObserver: Send + Sync + 'static {
fn on_state_enter(&self, _state: &str) {}
fn on_task_start(&self, _task_id: &str) {}
fn on_task_success(&self, _task_id: &str) {}
fn on_task_failure(&self, _task_id: &str, _err: &CanoError) {}
fn on_retry(&self, _task_id: &str, _attempt: u32) {}
fn on_circuit_open(&self, _task_id: &str) {}
fn on_checkpoint(&self, _workflow_id: &str, _sequence: u64) {}
fn on_resume(&self, _workflow_id: &str, _sequence: u64) {}
}
#[cfg(feature = "tracing")]
#[derive(Debug, Clone, Copy, Default)]
pub struct TracingObserver;
#[cfg(feature = "tracing")]
impl TracingObserver {
#[must_use]
pub fn new() -> Self {
Self
}
}
#[cfg(feature = "tracing")]
impl WorkflowObserver for TracingObserver {
fn on_state_enter(&self, state: &str) {
tracing::debug!(state, "workflow entered state");
}
fn on_task_start(&self, task_id: &str) {
tracing::debug!(task_id, "task started");
}
fn on_task_success(&self, task_id: &str) {
tracing::info!(task_id, "task succeeded");
}
fn on_task_failure(&self, task_id: &str, err: &CanoError) {
tracing::error!(task_id, error = %err, "task failed");
}
fn on_retry(&self, task_id: &str, attempt: u32) {
tracing::warn!(task_id, attempt, "task retry");
}
fn on_circuit_open(&self, task_id: &str) {
tracing::warn!(task_id, "circuit breaker rejected task");
}
fn on_checkpoint(&self, workflow_id: &str, sequence: u64) {
tracing::debug!(workflow_id, sequence, "checkpoint appended");
}
fn on_resume(&self, workflow_id: &str, sequence: u64) {
tracing::info!(workflow_id, sequence, "workflow resumed from checkpoint");
}
}
#[cfg(test)]
mod tests {
use crate::prelude::*;
use std::borrow::Cow;
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[derive(Default)]
struct RecordingObserver {
events: Mutex<Vec<String>>,
}
impl RecordingObserver {
fn events(&self) -> Vec<String> {
self.events.lock().unwrap().clone()
}
fn record(&self, event: String) {
self.events.lock().unwrap().push(event);
}
}
impl WorkflowObserver for RecordingObserver {
fn on_state_enter(&self, state: &str) {
self.record(format!("state_enter:{state}"));
}
fn on_task_start(&self, task_id: &str) {
self.record(format!("task_start:{task_id}"));
}
fn on_task_success(&self, task_id: &str) {
self.record(format!("task_success:{task_id}"));
}
fn on_task_failure(&self, task_id: &str, _err: &CanoError) {
self.record(format!("task_failure:{task_id}"));
}
fn on_retry(&self, task_id: &str, attempt: u32) {
self.record(format!("retry:{task_id}:{attempt}"));
}
fn on_circuit_open(&self, task_id: &str) {
self.record(format!("circuit_open:{task_id}"));
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
enum S {
Start,
Done,
}
struct OkTask;
#[task]
impl Task<S> for OkTask {
async fn run_bare(&self) -> Result<TaskResult<S>, CanoError> {
Ok(TaskResult::Single(S::Done))
}
fn name(&self) -> Cow<'static, str> {
"OkTask".into()
}
}
struct FailTask;
#[task]
impl Task<S> for FailTask {
async fn run_bare(&self) -> Result<TaskResult<S>, CanoError> {
Err(CanoError::task_execution("boom"))
}
fn config(&self) -> TaskConfig {
TaskConfig::new().with_fixed_retry(2, Duration::from_millis(1))
}
fn name(&self) -> Cow<'static, str> {
"FailTask".into()
}
}
struct CircuitTask {
breaker: Arc<CircuitBreaker>,
}
#[task]
impl Task<S> for CircuitTask {
async fn run_bare(&self) -> Result<TaskResult<S>, CanoError> {
Ok(TaskResult::Single(S::Done))
}
fn config(&self) -> TaskConfig {
TaskConfig::minimal().with_circuit_breaker(Arc::clone(&self.breaker))
}
fn name(&self) -> Cow<'static, str> {
"CircuitTask".into()
}
}
fn position(events: &[String], needle: &str) -> usize {
events
.iter()
.position(|e| e == needle)
.unwrap_or_else(|| panic!("event {needle:?} not found in {events:?}"))
}
#[tokio::test]
async fn observer_fires_on_success_path() {
let observer = Arc::new(RecordingObserver::default());
let workflow = Workflow::bare()
.register(S::Start, OkTask)
.add_exit_state(S::Done)
.with_observer(observer.clone());
assert_eq!(workflow.orchestrate(S::Start).await.unwrap(), S::Done);
let events = observer.events();
assert!(
events.contains(&"state_enter:Start".to_string()),
"{events:?}"
);
assert!(
events.contains(&"task_start:OkTask".to_string()),
"{events:?}"
);
assert!(
events.contains(&"task_success:OkTask".to_string()),
"{events:?}"
);
assert!(
events.contains(&"state_enter:Done".to_string()),
"{events:?}"
);
assert!(position(&events, "state_enter:Start") < position(&events, "task_start:OkTask"));
assert!(position(&events, "task_start:OkTask") < position(&events, "task_success:OkTask"));
assert!(position(&events, "task_success:OkTask") < position(&events, "state_enter:Done"));
}
#[tokio::test]
async fn observer_fires_on_retry_and_failure() {
let observer = Arc::new(RecordingObserver::default());
let workflow = Workflow::bare()
.register(S::Start, FailTask)
.add_exit_state(S::Done)
.with_observer(observer.clone());
assert!(workflow.orchestrate(S::Start).await.is_err());
let events = observer.events();
assert!(
events.contains(&"task_start:FailTask".to_string()),
"{events:?}"
);
assert!(
events.contains(&"retry:FailTask:1".to_string()),
"{events:?}"
);
assert!(
events.contains(&"retry:FailTask:2".to_string()),
"{events:?}"
);
assert!(
events.contains(&"task_failure:FailTask".to_string()),
"{events:?}"
);
assert!(
!events.contains(&"task_success:FailTask".to_string()),
"{events:?}"
);
}
#[tokio::test]
async fn observer_fires_on_circuit_open() {
let breaker = Arc::new(CircuitBreaker::new(CircuitPolicy {
failure_threshold: 1,
reset_timeout: Duration::from_secs(60),
..CircuitPolicy::default()
}));
let permit = breaker.try_acquire().expect("closed breaker admits");
breaker.record_failure(permit);
let observer = Arc::new(RecordingObserver::default());
let workflow = Workflow::bare()
.register(
S::Start,
CircuitTask {
breaker: Arc::clone(&breaker),
},
)
.add_exit_state(S::Done)
.with_observer(observer.clone());
let err = workflow.orchestrate(S::Start).await.unwrap_err();
assert!(matches!(err, CanoError::CircuitOpen(_)), "{err}");
let events = observer.events();
assert!(
events.contains(&"circuit_open:CircuitTask".to_string()),
"{events:?}"
);
assert!(
events.contains(&"task_failure:CircuitTask".to_string()),
"{events:?}"
);
}
#[tokio::test]
async fn no_observer_is_zero_overhead_and_still_runs() {
let workflow = Workflow::bare()
.register(S::Start, OkTask)
.add_exit_state(S::Done);
assert_eq!(workflow.orchestrate(S::Start).await.unwrap(), S::Done);
}
#[test]
fn default_task_name_is_type_name() {
struct Anon;
#[task]
impl Task<S> for Anon {
async fn run_bare(&self) -> Result<TaskResult<S>, CanoError> {
Ok(TaskResult::Single(S::Done))
}
}
assert!(Anon.name().contains("Anon"), "{}", Anon.name());
}
}