use cano::prelude::*;
use std::time::Duration;
use tokio::time::sleep;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum ApiState {
Start,
#[allow(dead_code)]
FetchData,
Process,
Complete,
Failed,
}
#[derive(Clone)]
struct ApiTask {
id: usize,
latency_ms: u64,
should_fail: bool,
}
impl ApiTask {
fn new(id: usize, latency_ms: u64, should_fail: bool) -> Self {
Self {
id,
latency_ms,
should_fail,
}
}
}
#[task(state = ApiState)]
impl ApiTask {
async fn run(&self, res: &Resources) -> Result<TaskResult<ApiState>, CanoError> {
let store = res.get::<MemoryStore, str>("store")?;
println!(
"Task {}: Starting (latency: {}ms)",
self.id, self.latency_ms
);
sleep(Duration::from_millis(self.latency_ms)).await;
if self.should_fail {
println!("Task {}: Failed", self.id);
Err(CanoError::task_execution(format!(
"Task {} failed",
self.id
)))
} else {
println!("Task {}: Completed successfully", self.id);
store.put(
&format!("result_{}", self.id),
format!("data_from_{}", self.id),
)?;
Ok(TaskResult::Single(ApiState::Process))
}
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
println!("--- Partial Results Workflow Example ---\n");
let tasks = vec![
ApiTask::new(1, 100, false), ApiTask::new(2, 500, false), ApiTask::new(3, 150, true), ApiTask::new(4, 800, false), ];
let join_config = JoinConfig::new(JoinStrategy::PartialResults(2), ApiState::Complete);
let store = MemoryStore::new();
let workflow = Workflow::new(Resources::new().insert("store", store.clone()))
.register_split(ApiState::Start, tasks, join_config)
.add_exit_state(ApiState::Complete)
.add_exit_state(ApiState::Failed);
println!("Starting workflow...");
let start = std::time::Instant::now();
let result = workflow.orchestrate(ApiState::Start).await?;
let duration = start.elapsed();
println!(
"\nWorkflow finished in {:?} with state: {:?}",
duration, result
);
let task_ids = [1usize, 2, 3, 4];
let results: Vec<(usize, Option<String>)> = task_ids
.iter()
.map(|&id| (id, store.get::<String>(&format!("result_{id}")).ok()))
.collect();
println!("\nResults:");
for (id, value) in &results {
println!("- Task {}: {:?}", id, value);
}
let successes = results.iter().filter(|(_, r)| r.is_some()).count();
if successes >= 2 {
println!("\nSUCCESS: Workflow behaved as expected (waited for successes).");
} else {
println!("\nWARNING: Unexpected results.");
}
Ok(())
}