use cano::prelude::*;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum DataProcessingState {
Start,
#[allow(dead_code)]
LoadData,
ParallelProcessing,
Aggregate,
Complete,
}
#[derive(Clone)]
struct DataLoader;
#[async_trait]
impl Task<DataProcessingState> for DataLoader {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<DataProcessingState>, CanoError> {
println!("Loading initial data...");
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
store.put("input_data", data)?;
println!("Data loaded: 10 numbers");
Ok(TaskResult::Single(DataProcessingState::ParallelProcessing))
}
}
#[derive(Clone)]
struct ProcessorTask {
task_id: usize,
}
impl ProcessorTask {
fn new(task_id: usize) -> Self {
Self { task_id }
}
}
#[async_trait]
impl Task<DataProcessingState> for ProcessorTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<DataProcessingState>, CanoError> {
println!("Processor {} starting...", self.task_id);
let data: Vec<i32> = store.get("input_data")?;
tokio::time::sleep(Duration::from_millis(100 * self.task_id as u64)).await;
let result: i32 = data.iter().map(|&x| x * self.task_id as i32).sum();
store.put(&format!("result_{}", self.task_id), result)?;
println!(
"Processor {} completed with result: {}",
self.task_id, result
);
Ok(TaskResult::Single(DataProcessingState::Aggregate))
}
}
#[derive(Clone)]
struct Aggregator;
#[async_trait]
impl Task<DataProcessingState> for Aggregator {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<DataProcessingState>, CanoError> {
println!("Aggregating results...");
let mut total = 0;
let mut count = 0;
for i in 1..=3 {
if let Ok(result) = store.get::<i32>(&format!("result_{}", i)) {
total += result;
count += 1;
}
}
store.put("final_result", total)?;
store.put("processor_count", count as usize)?;
println!(
"Aggregation complete: {} processors completed, total: {}",
count, total
);
Ok(TaskResult::Single(DataProcessingState::Complete))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
println!("=== Cano Workflow Split/Join Demo ===\n");
println!("--- Example 1: All Strategy ---");
{
let store = MemoryStore::new();
let processors = vec![
ProcessorTask::new(1),
ProcessorTask::new(2),
ProcessorTask::new(3),
];
let join_config = JoinConfig::new(JoinStrategy::All, DataProcessingState::Aggregate);
let workflow = Workflow::new(store.clone())
.register(DataProcessingState::Start, DataLoader)
.register_split(
DataProcessingState::ParallelProcessing,
processors,
join_config,
)
.register(DataProcessingState::Aggregate, Aggregator)
.add_exit_state(DataProcessingState::Complete);
let result = workflow.orchestrate(DataProcessingState::Start).await?;
let final_result: i32 = store.get("final_result")?;
println!("Final result: {}", final_result);
println!("Workflow completed: {:?}\n", result);
}
println!("--- Example 2: Quorum Strategy (2/3) ---");
{
let store = MemoryStore::new();
let processors = vec![
ProcessorTask::new(1),
ProcessorTask::new(2),
ProcessorTask::new(3),
];
let join_config = JoinConfig::new(JoinStrategy::Quorum(2), DataProcessingState::Aggregate);
let workflow = Workflow::new(store.clone())
.register(DataProcessingState::Start, DataLoader)
.register_split(
DataProcessingState::ParallelProcessing,
processors,
join_config,
)
.register(DataProcessingState::Aggregate, Aggregator)
.add_exit_state(DataProcessingState::Complete);
let result = workflow.orchestrate(DataProcessingState::Start).await?;
let processor_count: usize = store.get("processor_count")?;
println!(
"Processors completed: {} (quorum satisfied)",
processor_count
);
println!("Workflow completed: {:?}\n", result);
}
println!("--- Example 3: Any Strategy ---");
{
let store = MemoryStore::new();
let processors = vec![
ProcessorTask::new(1),
ProcessorTask::new(2),
ProcessorTask::new(3),
];
let join_config = JoinConfig::new(JoinStrategy::Any, DataProcessingState::Aggregate);
let workflow = Workflow::new(store.clone())
.register(DataProcessingState::Start, DataLoader)
.register_split(
DataProcessingState::ParallelProcessing,
processors,
join_config,
)
.register(DataProcessingState::Aggregate, Aggregator)
.add_exit_state(DataProcessingState::Complete);
let result = workflow.orchestrate(DataProcessingState::Start).await?;
println!("Workflow completed with Any strategy: {:?}\n", result);
}
println!("--- Example 4: Percentage Strategy (66%) ---");
{
let store = MemoryStore::new();
let processors = vec![
ProcessorTask::new(1),
ProcessorTask::new(2),
ProcessorTask::new(3),
];
let join_config = JoinConfig::new(
JoinStrategy::Percentage(0.66),
DataProcessingState::Aggregate,
);
let workflow = Workflow::new(store.clone())
.register(DataProcessingState::Start, DataLoader)
.register_split(
DataProcessingState::ParallelProcessing,
processors,
join_config,
)
.register(DataProcessingState::Aggregate, Aggregator)
.add_exit_state(DataProcessingState::Complete);
let result = workflow.orchestrate(DataProcessingState::Start).await?;
let processor_count: usize = store.get("processor_count")?;
println!("Processors completed: {} (66% threshold)", processor_count);
println!("Workflow completed: {:?}\n", result);
}
println!("--- Example 5: Split with Timeout ---");
{
let store = MemoryStore::new();
let processors = vec![
ProcessorTask::new(1),
ProcessorTask::new(2),
ProcessorTask::new(3),
];
let join_config = JoinConfig::new(JoinStrategy::All, DataProcessingState::Aggregate)
.with_timeout(Duration::from_millis(250));
let workflow = Workflow::new(store.clone())
.register(DataProcessingState::Start, DataLoader)
.register_split(
DataProcessingState::ParallelProcessing,
processors,
join_config,
)
.register(DataProcessingState::Aggregate, Aggregator)
.add_exit_state(DataProcessingState::Complete);
match workflow.orchestrate(DataProcessingState::Start).await {
Ok(result) => println!("Workflow completed: {:?}", result),
Err(e) => println!("Workflow failed (expected timeout): {}", e),
}
}
println!("\n=== Demo Complete ===");
Ok(())
}