use cano::prelude::*;
use rand::RngExt;
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum WorkflowAction {
Generate,
Count,
Complete,
Error,
}
#[derive(Resource)]
struct GeneratorConfig {
min_size: usize,
max_size: usize,
}
#[derive(FromResources)]
struct GeneratorDeps {
#[res("config")]
config: Arc<GeneratorConfig>,
}
#[derive(FromResources)]
struct StoreDeps {
#[res("store")]
store: Arc<MemoryStore>,
}
#[derive(Clone)]
struct GeneratorNode;
#[node(state = WorkflowAction)]
impl GeneratorNode {
async fn prep(&self, res: &Resources) -> Result<Vec<u32>, CanoError> {
let GeneratorDeps { config } = GeneratorDeps::from_resources(res)?;
let mut rng = rand::rng();
let size = rng.random_range(config.min_size..=config.max_size);
let numbers: Vec<u32> = (0..size).map(|_| rng.random_range(1..=1000)).collect();
println!("Generated {} random numbers", numbers.len());
println!("Sample: {:?}", &numbers[..std::cmp::min(10, numbers.len())]);
Ok(numbers)
}
async fn exec(&self, prep_res: Vec<u32>) -> Vec<u32> {
let evens: Vec<u32> = prep_res.into_iter().filter(|&n| n % 2 == 0).collect();
println!("Filtered to {} even numbers", evens.len());
evens
}
async fn post(&self, res: &Resources, exec_res: Vec<u32>) -> Result<WorkflowAction, CanoError> {
let StoreDeps { store } = StoreDeps::from_resources(res)?;
store.put("filtered_numbers", exec_res)?;
println!("✓ Generator node completed");
Ok(WorkflowAction::Count)
}
}
#[derive(Clone)]
struct CounterNode;
#[node(state = WorkflowAction)]
impl CounterNode {
async fn prep(&self, res: &Resources) -> Result<Vec<u32>, CanoError> {
let StoreDeps { store } = StoreDeps::from_resources(res)?;
let numbers: Vec<u32> = store
.get("filtered_numbers")
.map_err(|e| CanoError::preparation(format!("Failed to load filtered numbers: {e}")))?;
println!("Loaded {} numbers from memory", numbers.len());
Ok(numbers)
}
async fn exec(&self, prep_res: Vec<u32>) -> usize {
let count = prep_res.len();
println!("Counted {} even numbers", count);
count
}
async fn post(&self, res: &Resources, exec_res: usize) -> Result<WorkflowAction, CanoError> {
let StoreDeps { store } = StoreDeps::from_resources(res)?;
store.put("number_count", exec_res)?;
store.remove("filtered_numbers")?;
println!(
"✓ Counter node completed — count {} stored, intermediate data cleaned",
exec_res
);
Ok(WorkflowAction::Complete)
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
println!("🚀 Simple Two-Node Workflow");
println!("===============================================");
let store = MemoryStore::new();
let resources = Resources::new()
.insert(
"config",
GeneratorConfig {
min_size: 25,
max_size: 150,
},
)
.insert("store", store.clone());
let workflow = Workflow::new(resources)
.register(WorkflowAction::Generate, GeneratorNode)
.register(WorkflowAction::Count, CounterNode)
.add_exit_states(vec![WorkflowAction::Complete, WorkflowAction::Error]);
match workflow.orchestrate(WorkflowAction::Generate).await {
Ok(WorkflowAction::Complete) => {
println!("\n✅ Workflow completed successfully!");
match store.get::<usize>("number_count") {
Ok(final_count) => {
println!("\n📈 FINAL RESULTS");
println!("================");
println!("Total even numbers found: {final_count}");
if store.get::<Vec<u32>>("filtered_numbers").is_err() {
println!("✓ Original data successfully cleaned up");
}
}
Err(e) => {
return Err(CanoError::node_execution(format!(
"Failed to get final count: {e}"
)));
}
}
}
Ok(other) => {
eprintln!("⚠️ Workflow ended in unexpected state: {other:?}");
return Err(CanoError::workflow(format!(
"unexpected final state: {other:?}"
)));
}
Err(e) => {
eprintln!("❌ Workflow execution failed: {e}");
return Err(e);
}
}
Ok(())
}