#![cfg(feature = "tracing")]
use async_trait::async_trait;
use cano::prelude::*;
use std::time::Duration;
use tracing::{Instrument, info, info_span, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum WorkflowState {
Start,
Processing,
Complete,
Error,
}
#[derive(Clone)]
struct TracedDataProcessor {
processor_id: String,
simulate_delay_ms: u64,
}
impl TracedDataProcessor {
fn new(processor_id: &str, simulate_delay_ms: u64) -> Self {
Self {
processor_id: processor_id.to_string(),
simulate_delay_ms,
}
}
}
#[async_trait]
impl Node<WorkflowState> for TracedDataProcessor {
type PrepResult = Vec<String>;
type ExecResult = Vec<String>;
fn config(&self) -> TaskConfig {
TaskConfig::default()
}
async fn prep(&self, store: &MemoryStore) -> Result<Self::PrepResult, CanoError> {
info!(processor_id = %self.processor_id, "Starting data preparation");
tokio::time::sleep(Duration::from_millis(50)).await;
let data = vec![
"record_1".to_string(),
"record_2".to_string(),
"record_3".to_string(),
];
store.put("prep_timestamp", chrono::Utc::now().to_rfc3339())?;
info!(processor_id = %self.processor_id, record_count = data.len(), "Data preparation completed");
Ok(data)
}
async fn exec(&self, prep_result: Self::PrepResult) -> Self::ExecResult {
info!(processor_id = %self.processor_id, input_records = prep_result.len(), "Starting data processing");
let processing_span = info_span!("data_processing", processor_id = %self.processor_id);
let processed_data = async {
tokio::time::sleep(Duration::from_millis(self.simulate_delay_ms)).await;
prep_result
.into_iter()
.map(|item| format!("processed_{item}"))
.collect::<Vec<_>>()
}
.instrument(processing_span)
.await;
info!(processor_id = %self.processor_id, output_records = processed_data.len(), "Data processing completed");
processed_data
}
async fn post(
&self,
store: &MemoryStore,
exec_result: Self::ExecResult,
) -> Result<WorkflowState, CanoError> {
info!(processor_id = %self.processor_id, "Starting post-processing");
let result_summary = format!("Processed {} records", exec_result.len());
store.put("processing_result", result_summary)?;
store.put("completion_timestamp", chrono::Utc::now().to_rfc3339())?;
tokio::time::sleep(Duration::from_millis(30)).await;
info!(processor_id = %self.processor_id, processed_count = exec_result.len(), "Post-processing completed");
Ok(WorkflowState::Complete)
}
}
#[derive(Clone)]
struct ValidationNode {
should_pass: bool,
}
impl ValidationNode {
fn new(should_pass: bool) -> Self {
Self { should_pass }
}
}
#[async_trait]
impl Node<WorkflowState> for ValidationNode {
type PrepResult = String;
type ExecResult = bool;
async fn prep(&self, _store: &MemoryStore) -> Result<Self::PrepResult, CanoError> {
info!("Preparing validation");
Ok("validation_data".to_string())
}
async fn exec(&self, _prep_result: Self::PrepResult) -> Self::ExecResult {
info!(should_pass = self.should_pass, "Running validation");
tokio::time::sleep(Duration::from_millis(100)).await;
self.should_pass
}
async fn post(
&self,
_store: &MemoryStore,
exec_result: Self::ExecResult,
) -> Result<WorkflowState, CanoError> {
if exec_result {
info!("Validation passed");
Ok(WorkflowState::Processing)
} else {
warn!("Validation failed");
Ok(WorkflowState::Error)
}
}
}
#[derive(Clone)]
struct SimpleMathTask {
task_id: String,
operation: String,
}
impl SimpleMathTask {
fn new(task_id: &str, operation: &str) -> Self {
Self {
task_id: task_id.to_string(),
operation: operation.to_string(),
}
}
}
#[async_trait]
impl Task<WorkflowState> for SimpleMathTask {
fn config(&self) -> TaskConfig {
TaskConfig::default().with_fixed_retry(2, Duration::from_millis(100))
}
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<WorkflowState>, CanoError> {
info!(task_id = %self.task_id, operation = %self.operation, "Starting math task");
tokio::time::sleep(Duration::from_millis(150)).await;
let a: i32 = store.get("operand_a").unwrap_or(10);
let b: i32 = store.get("operand_b").unwrap_or(5);
let result = match self.operation.as_str() {
"add" => a + b,
"multiply" => a * b,
"subtract" => a - b,
_ => {
warn!(operation = %self.operation, "Unknown operation, defaulting to addition");
a + b
}
};
store.put("math_result", result)?;
store.put("task_completed_by", self.task_id.clone())?;
info!(
task_id = %self.task_id,
operation = %self.operation,
operand_a = a,
operand_b = b,
result,
"Math task completed"
);
Ok(TaskResult::Single(WorkflowState::Complete))
}
}
async fn setup_tracing() {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()),
)
.with(tracing_subscriber::fmt::layer().with_target(false))
.init();
}
#[tokio::main]
async fn main() -> CanoResult<()> {
setup_tracing().await;
info!("🔍 Starting Cano Tracing Demo");
info!("=============================");
{
info!("📋 Example 1: Basic Workflow with Tracing");
let store = MemoryStore::new();
let workflow = Workflow::new(store.clone())
.register(WorkflowState::Start, ValidationNode::new(true))
.register(
WorkflowState::Processing,
TracedDataProcessor::new("basic_processor", 200),
)
.add_exit_states(vec![WorkflowState::Complete, WorkflowState::Error]);
info!("Starting workflow execution...");
let result = workflow.orchestrate(WorkflowState::Start).await?;
info!(final_state = ?result, "Workflow completed");
println!("✅ Basic workflow completed with state: {result:?}\n");
}
{
info!("📋 Example 2: Task-based Workflow with Tracing and Retry");
let store = MemoryStore::new();
let task_workflow = Workflow::new(store.clone())
.register(
WorkflowState::Start,
SimpleMathTask::new("math_task_1", "multiply"),
)
.add_exit_state(WorkflowState::Complete);
store.put("operand_a", 7)?;
store.put("operand_b", 6)?;
info!("Starting task-based workflow execution...");
let result = task_workflow.orchestrate(WorkflowState::Start).await?;
let math_result: i32 = store.get("math_result").unwrap_or(0);
let completed_by: String = store.get("task_completed_by").unwrap_or_default();
info!(
final_state = ?result,
math_result,
completed_by = %completed_by,
"Task-based workflow completed"
);
println!("✅ Task-based workflow completed:");
println!(" - Final state: {result:?}");
println!(" - Math result: {math_result}");
println!(" - Completed by: {completed_by}\n");
}
#[cfg(feature = "scheduler")]
{
info!("📋 Example 3: Scheduler with Workflow Tracing");
let mut scheduler = Scheduler::new();
let store = MemoryStore::new();
let scheduled_workflow = Workflow::new(store.clone())
.register(WorkflowState::Start, ValidationNode::new(true))
.register(
WorkflowState::Processing,
TracedDataProcessor::new("scheduled_processor", 150),
)
.add_exit_state(WorkflowState::Complete);
scheduler.every_seconds(
"traced_workflow",
scheduled_workflow,
WorkflowState::Start,
2,
)?;
info!("Starting scheduler...");
let scheduler_clone = scheduler.clone();
tokio::spawn(async move {
scheduler.start().await.expect("Scheduler failed");
});
info!("Letting scheduler run for 6 seconds...");
tokio::time::sleep(Duration::from_secs(6)).await;
if let Some(flow_info) = scheduler_clone.status("traced_workflow").await {
info!(
workflow_id = %flow_info.id,
run_count = flow_info.run_count,
status = ?flow_info.status,
"Workflow status"
);
println!(
"✅ Scheduled workflow executed {} times",
flow_info.run_count
);
}
info!("Stopping scheduler...");
scheduler_clone.stop().await?;
}
{
info!("📋 Example 4: Error Handling with Tracing");
let store = MemoryStore::new();
let error_workflow = Workflow::new(store)
.register(WorkflowState::Start, ValidationNode::new(false)) .register(
WorkflowState::Processing,
TracedDataProcessor::new("error_processor", 100),
)
.add_exit_states(vec![WorkflowState::Complete, WorkflowState::Error]);
info!("Starting workflow that will encounter validation failure...");
let result = error_workflow.orchestrate(WorkflowState::Start).await?;
println!("✅ Error workflow completed with state: {result:?}");
match result {
WorkflowState::Error => {
println!(
" As expected, the workflow ended in error state due to validation failure"
);
}
_ => {
println!(" Unexpected result");
}
}
}
info!("🎉 Tracing demo completed!");
println!("\n🔍 Tracing Demo Summary:");
println!("• Workflows are automatically instrumented with tracing");
println!("• Node execution phases (prep, exec, post) are traced");
println!("• Tasks can add custom tracing spans");
println!("• Schedulers trace workflow executions with context");
println!("• Error paths are traced with appropriate log levels");
println!("\nTo see more detailed tracing output, run with RUST_LOG=debug");
Ok(())
}