use async_trait::async_trait;
use cano::prelude::*;
use rand::RngExt;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum WorkflowAction {
Generate,
Count,
Complete,
Error,
}
#[derive(Clone)]
struct GeneratorNode;
impl GeneratorNode {
fn new() -> Self {
Self
}
}
#[async_trait]
impl Node<WorkflowAction> for GeneratorNode {
type PrepResult = Vec<u32>;
type ExecResult = Vec<u32>;
fn config(&self) -> TaskConfig {
TaskConfig::minimal() }
async fn prep(&self, _store: &MemoryStore) -> Result<Self::PrepResult, CanoError> {
let mut rng = rand::rng();
let size = rng.random_range(25..=150);
let numbers: Vec<u32> = (0..size).map(|_| rng.random_range(1..=1000)).collect();
println!("Generated {} random numbers", numbers.len());
println!("Sample: {:?}", &numbers[..std::cmp::min(10, numbers.len())]);
Ok(numbers)
}
async fn exec(&self, prep_res: Self::PrepResult) -> Self::ExecResult {
let even_numbers: Vec<u32> = prep_res.into_iter().filter(|&n| n % 2 == 0).collect();
println!("Filtered to {} even numbers", even_numbers.len());
println!(
"Sample even numbers: {:?}",
&even_numbers[..std::cmp::min(10, even_numbers.len())]
);
even_numbers
}
async fn post(
&self,
store: &MemoryStore,
exec_res: Self::ExecResult,
) -> Result<WorkflowAction, CanoError> {
store.put("filtered_numbers", exec_res)?;
println!("✓ Generator node completed - filtered numbers stored in memory");
Ok(WorkflowAction::Count)
}
}
#[derive(Clone)]
struct CounterNode;
impl CounterNode {
fn new() -> Self {
Self
}
}
#[async_trait]
impl Node<WorkflowAction> for CounterNode {
type PrepResult = Vec<u32>;
type ExecResult = usize;
fn config(&self) -> TaskConfig {
TaskConfig::minimal()
}
async fn prep(&self, store: &MemoryStore) -> Result<Self::PrepResult, CanoError> {
let numbers: Vec<u32> = store
.get("filtered_numbers")
.map_err(|e| CanoError::preparation(format!("Failed to load filtered numbers: {e}")))?;
println!("Loaded {} numbers from memory", numbers.len());
Ok(numbers)
}
async fn exec(&self, prep_res: Self::PrepResult) -> Self::ExecResult {
let count = prep_res.len();
println!("Counted {} even numbers", count);
count
}
async fn post(
&self,
store: &MemoryStore,
exec_res: Self::ExecResult,
) -> Result<WorkflowAction, CanoError> {
store.put("number_count", exec_res)?;
store.remove("filtered_numbers")?;
println!(
"✓ Counter node completed - count stored ({}) and original data cleaned up",
exec_res
);
Ok(WorkflowAction::Complete)
}
}
async fn run_simple_workflow_with_flow() -> Result<(), CanoError> {
println!("🚀 Starting Simple Two-Node Workflow with Workflow");
println!("===============================================");
let store = MemoryStore::new();
let workflow = Workflow::new(store.clone())
.register(WorkflowAction::Generate, GeneratorNode::new())
.register(WorkflowAction::Count, CounterNode::new())
.add_exit_states(vec![WorkflowAction::Complete, WorkflowAction::Error]);
match workflow.orchestrate(WorkflowAction::Generate).await {
Ok(final_state) => {
match final_state {
WorkflowAction::Complete => {
println!("✅ Workflow completed successfully!");
match store.get::<usize>("number_count") {
Ok(final_count) => {
println!("\n📈 FINAL RESULTS");
println!("================");
println!("Total even numbers found: {final_count}");
match store.get::<Vec<u32>>("filtered_numbers") {
Ok(_) => {
println!("⚠️ Warning: Original data still exists in memory")
}
Err(_) => println!("✓ Original data successfully cleaned up"),
}
}
Err(e) => {
return Err(CanoError::node_execution(format!(
"Failed to get final count: {e}"
)));
}
}
}
WorkflowAction::Error => {
eprintln!("❌ Workflow terminated with error state");
return Err(CanoError::workflow("Workflow terminated with error state"));
}
other => {
eprintln!("⚠️ Workflow ended in unexpected state: {other:?}");
return Err(CanoError::workflow(format!(
"Workflow ended in unexpected state: {other:?}"
)));
}
}
}
Err(e) => {
eprintln!("❌ Workflow failed: {e}");
return Err(e);
}
}
Ok(())
}
#[tokio::main]
async fn main() {
println!("Simple Two-Node Workflow Example");
println!("=================================");
println!("\nRunning Workflow-Based Workflow:");
match run_simple_workflow_with_flow().await {
Ok(()) => {
println!("Workflow completed successfully!");
}
Err(e) => {
eprintln!("Workflow failed: {e}");
std::process::exit(1);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_generator_node() {
let generator = GeneratorNode::new();
let store = MemoryStore::new();
let result = generator.run(&store).await.unwrap();
assert_eq!(result, WorkflowAction::Count);
let stored_numbers: Vec<u32> = store.get("filtered_numbers").unwrap();
for &num in &stored_numbers {
assert_eq!(num % 2, 0, "Number {} should be even", num);
}
println!(
"Generator test passed - {} even numbers stored",
stored_numbers.len()
);
}
#[tokio::test]
async fn test_counter_node() {
let store = MemoryStore::new();
let test_numbers: Vec<u32> = vec![2, 4, 6, 8, 10];
store.put("filtered_numbers", test_numbers.clone()).unwrap();
let counter = CounterNode::new();
let result = counter.run(&store).await.unwrap();
assert_eq!(result, WorkflowAction::Complete);
let count: usize = store.get("number_count").unwrap();
assert_eq!(count, test_numbers.len());
assert!(store.get::<Vec<u32>>("filtered_numbers").is_err());
println!("Counter test passed - count: {}", count);
}
#[tokio::test]
async fn test_full_workflow_with_flow_different_node_types() {
let result = run_simple_workflow_with_flow().await;
assert!(result.is_ok());
println!("Full Workflow workflow test with different node types passed");
}
#[tokio::test]
async fn test_generator_number_range() {
let generator = GeneratorNode::new();
let store = MemoryStore::new();
for _ in 0..5 {
let prep_result = generator.prep(&store).await.unwrap();
assert!(
prep_result.len() >= 25,
"Generated vector too small: {}",
prep_result.len()
);
assert!(
prep_result.len() <= 150,
"Generated vector too large: {}",
prep_result.len()
);
for &num in &prep_result {
assert!(
num >= 1 && num <= 1000,
"Number {} out of expected range",
num
);
}
}
}
#[tokio::test]
async fn test_odd_number_filtering() {
let generator = GeneratorNode::new();
let test_input = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let result = generator.exec(test_input).await;
let expected = vec![2, 4, 6, 8, 10];
assert_eq!(result, expected);
}
#[tokio::test]
async fn test_counter_node_error_handling() {
let counter = CounterNode::new();
let store = MemoryStore::new();
let result = counter.run(&store).await;
assert!(result.is_err());
let error = result.unwrap_err();
assert!(
error
.to_string()
.contains("Failed to load filtered numbers")
);
}
#[tokio::test]
async fn test_workflow_error_state() {
let store = MemoryStore::new();
let counter = CounterNode::new();
let result = counter.run(&store).await;
assert!(result.is_err());
println!("Error handling test passed");
}
}