use cano::prelude::*;
use rand::RngExt;
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum WorkflowAction {
Generate,
Count,
Complete,
Error,
}
#[derive(Resource)]
struct GeneratorConfig {
min_size: usize,
max_size: usize,
}
#[derive(FromResources)]
struct GeneratorDeps {
#[res("config")]
config: Arc<GeneratorConfig>,
}
#[derive(FromResources)]
struct StoreDeps {
#[res("store")]
store: Arc<MemoryStore>,
}
#[derive(Clone)]
struct GeneratorTask;
#[task(state = WorkflowAction)]
impl GeneratorTask {
async fn run(&self, res: &Resources) -> Result<TaskResult<WorkflowAction>, CanoError> {
let GeneratorDeps { config } = GeneratorDeps::from_resources(res)?;
let mut rng = rand::rng();
let size = rng.random_range(config.min_size..=config.max_size);
let numbers: Vec<u32> = (0..size).map(|_| rng.random_range(1..=1000)).collect();
println!("Generated {} random numbers", numbers.len());
println!("Sample: {:?}", &numbers[..std::cmp::min(10, numbers.len())]);
let evens: Vec<u32> = numbers.into_iter().filter(|&n| n % 2 == 0).collect();
println!("Filtered to {} even numbers", evens.len());
let StoreDeps { store } = StoreDeps::from_resources(res)?;
store.put("filtered_numbers", evens)?;
println!("Generator task completed");
Ok(TaskResult::Single(WorkflowAction::Count))
}
}
#[derive(Clone)]
struct CounterTask;
#[task(state = WorkflowAction)]
impl CounterTask {
async fn run(&self, res: &Resources) -> Result<TaskResult<WorkflowAction>, CanoError> {
let StoreDeps { store } = StoreDeps::from_resources(res)?;
let numbers: Vec<u32> = store.get("filtered_numbers").map_err(|e| {
CanoError::task_execution(format!("Failed to load filtered numbers: {e}"))
})?;
println!("Loaded {} numbers from memory", numbers.len());
let count = numbers.len();
println!("Counted {} even numbers", count);
store.put("number_count", count)?;
store.remove("filtered_numbers")?;
println!(
"Counter task completed — count {} stored, intermediate data cleaned",
count
);
Ok(TaskResult::Single(WorkflowAction::Complete))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
println!("Simple Two-Task Workflow");
println!("===============================================");
let store = MemoryStore::new();
let resources = Resources::new()
.insert(
"config",
GeneratorConfig {
min_size: 25,
max_size: 150,
},
)
.insert("store", store.clone());
let workflow = Workflow::new(resources)
.register(WorkflowAction::Generate, GeneratorTask)
.register(WorkflowAction::Count, CounterTask)
.add_exit_states(vec![WorkflowAction::Complete, WorkflowAction::Error]);
match workflow.orchestrate(WorkflowAction::Generate).await {
Ok(WorkflowAction::Complete) => {
println!("\nWorkflow completed successfully!");
match store.get::<usize>("number_count") {
Ok(final_count) => {
println!("\nFINAL RESULTS");
println!("================");
println!("Total even numbers found: {final_count}");
if store.get::<Vec<u32>>("filtered_numbers").is_err() {
println!("Original data successfully cleaned up");
}
}
Err(e) => {
return Err(CanoError::task_execution(format!(
"Failed to get final count: {e}"
)));
}
}
}
Ok(other) => {
eprintln!("Workflow ended in unexpected state: {other:?}");
return Err(CanoError::workflow(format!(
"unexpected final state: {other:?}"
)));
}
Err(e) => {
eprintln!("Workflow execution failed: {e}");
return Err(e);
}
}
Ok(())
}