use async_trait::async_trait;
use cano::prelude::*;
use std::time::Duration;
use tokio::time::sleep;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum ApiState {
Start,
#[allow(dead_code)]
FetchData,
Process,
Complete,
Failed,
}
#[derive(Clone)]
struct ApiTask {
id: usize,
latency_ms: u64,
should_fail: bool,
}
impl ApiTask {
fn new(id: usize, latency_ms: u64, should_fail: bool) -> Self {
Self {
id,
latency_ms,
should_fail,
}
}
}
#[async_trait]
impl Task<ApiState> for ApiTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<ApiState>, CanoError> {
println!(
"Task {}: Starting (latency: {}ms)",
self.id, self.latency_ms
);
sleep(Duration::from_millis(self.latency_ms)).await;
if self.should_fail {
println!("Task {}: Failed", self.id);
Err(CanoError::task_execution(format!(
"Task {} failed",
self.id
)))
} else {
println!("Task {}: Completed successfully", self.id);
store.put(
&format!("result_{}", self.id),
format!("data_from_{}", self.id),
)?;
Ok(TaskResult::Single(ApiState::Process))
}
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
println!("--- Partial Results Workflow Example ---\n");
let tasks = vec![
ApiTask::new(1, 100, false), ApiTask::new(2, 500, false), ApiTask::new(3, 150, true), ApiTask::new(4, 800, false), ];
let join_config = JoinConfig::new(JoinStrategy::PartialResults(2), ApiState::Complete)
.with_store_partial_results(true);
let store = MemoryStore::new();
let workflow = Workflow::new(store.clone())
.register_split(ApiState::Start, tasks, join_config)
.add_exit_state(ApiState::Complete)
.add_exit_state(ApiState::Failed);
println!("Starting workflow...");
let start = std::time::Instant::now();
let result = workflow.orchestrate(ApiState::Start).await?;
let duration = start.elapsed();
println!(
"\nWorkflow finished in {:?} with state: {:?}",
duration, result
);
let successes: usize = store.get("split_successes_count").unwrap_or(0);
let errors: usize = store.get("split_errors_count").unwrap_or(0);
let cancelled: usize = store.get("split_cancelled_count").unwrap_or(0);
println!("\nStatistics:");
println!("- Successes: {}", successes);
println!("- Errors: {}", errors);
println!("- Cancelled: {}", cancelled);
if successes >= 2 {
println!("\nSUCCESS: Workflow behaved as expected (waited for successes).");
} else {
println!("\nWARNING: Unexpected results.");
}
Ok(())
}