<!DOCTYPE html>
<html lang="en" style="scroll-behavior: smooth;">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Workflows - Cano</title>
<link rel="stylesheet" href="styles.css">
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/themes/prism-tomorrow.min.css">
<link href="https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700&family=Outfit:wght@500;700&family=Fira+Code&display=swap" rel="stylesheet">
<script src="https://cdnjs.cloudflare.com/ajax/libs/mermaid/10.6.1/mermaid.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/prism.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/components/prism-rust.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/components/prism-toml.min.js"></script>
<script src="./script.js" defer></script>
<style>
.page-toc {
background: var(--card-bg);
border: 1px solid var(--border-color);
border-radius: 0.75rem;
padding: 1.5rem 2rem;
margin: 2rem 0 3rem;
position: relative;
}
.page-toc::before {
content: '';
position: absolute;
left: 0; top: 0; bottom: 0;
width: 3px;
background: linear-gradient(to bottom, var(--primary-color), var(--secondary-color));
border-radius: 3px 0 0 3px;
}
.page-toc summary {
font-weight: 600;
font-size: 1.1rem;
color: #fff;
cursor: pointer;
list-style: none;
display: flex;
align-items: center;
gap: 0.5rem;
user-select: none;
}
.page-toc summary::-webkit-details-marker { display: none; }
.page-toc summary::before {
content: '';
display: inline-block;
width: 6px; height: 6px;
border-right: 2px solid var(--primary-color);
border-bottom: 2px solid var(--primary-color);
transform: rotate(-45deg);
transition: transform 0.2s ease;
flex-shrink: 0;
}
.page-toc[open] summary::before { transform: rotate(45deg); }
.page-toc ol {
list-style: none;
padding: 0;
margin: 1rem 0 0;
columns: 2;
column-gap: 2rem;
}
.page-toc li {
break-inside: avoid;
margin-bottom: 0.35rem;
}
.page-toc a {
color: var(--text-color);
text-decoration: none;
font-size: 0.95rem;
display: block;
padding: 0.25rem 0.5rem;
border-radius: 0.25rem;
transition: all 0.15s ease;
}
.page-toc a:hover {
color: var(--primary-color);
background: rgba(56, 189, 248, 0.07);
}
.page-toc .toc-sub { padding-left: 1.25rem; font-size: 0.9rem; opacity: 0.8; }
.main-content h2[id],
.main-content h3[id] { scroll-margin-top: 2rem; position: relative; }
.main-content h2[id] a.anchor-link,
.main-content h3[id] a.anchor-link {
position: absolute;
left: -1.5rem;
color: var(--border-color);
font-weight: 400;
opacity: 0;
text-decoration: none;
transition: opacity 0.15s ease;
}
.main-content h2[id]:hover a.anchor-link,
.main-content h3[id]:hover a.anchor-link {
opacity: 1;
color: var(--primary-color);
}
.diagram-frame {
background: var(--card-bg);
border: 1px solid var(--border-color);
border-radius: 0.75rem;
margin: 2rem 0;
overflow: hidden;
}
.diagram-frame .diagram-label {
font-size: 0.78rem;
font-weight: 600;
letter-spacing: 0.06em;
text-transform: uppercase;
color: var(--secondary-color);
padding: 0.75rem 1.25rem 0;
margin: 0;
}
.diagram-frame .mermaid {
margin: 0;
border: none;
border-radius: 0;
background: transparent;
}
.strategy-grid {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(280px, 1fr));
gap: 1.25rem;
margin: 2rem 0;
}
.strategy-card {
background: var(--card-bg);
border: 1px solid var(--border-color);
border-radius: 0.75rem;
padding: 1.5rem;
transition: border-color 0.2s ease, transform 0.2s ease;
}
.strategy-card:hover {
border-color: var(--primary-color);
transform: translateY(-3px);
}
.strategy-card .strategy-name {
font-size: 1.15rem;
font-weight: 600;
color: #fff;
margin: 0 0 0.5rem;
}
.strategy-card p { font-size: 0.95rem; margin-bottom: 0.75rem; }
.strategy-card pre { margin: 0.75rem 0 0 !important; font-size: 0.85rem; }
.comparison-table {
width: 100%;
border-collapse: collapse;
margin: 2rem 0;
background: var(--card-bg);
border-radius: 0.75rem;
overflow: hidden;
border: 1px solid var(--border-color);
}
.comparison-table thead tr { background: rgba(255, 255, 255, 0.05); text-align: left; }
.comparison-table th {
padding: 1rem 1.25rem;
font-weight: 600;
color: #fff;
font-size: 0.9rem;
}
.comparison-table td {
padding: 0.85rem 1.25rem;
border-top: 1px solid var(--border-color);
font-size: 0.95rem;
}
.comparison-table tbody tr { transition: background 0.15s ease; }
.comparison-table tbody tr:hover { background: rgba(56, 189, 248, 0.04); }
.callout {
border-radius: 0.75rem;
padding: 1.25rem 1.5rem;
margin: 1.5rem 0;
border: 1px solid;
}
.callout-warn {
background: rgba(248, 113, 113, 0.08);
border-color: rgba(248, 113, 113, 0.25);
}
.callout-warn .callout-title { color: #f87171; }
.callout-info {
background: rgba(56, 189, 248, 0.08);
border-color: rgba(56, 189, 248, 0.25);
}
.callout-info .callout-title { color: var(--primary-color); }
.callout-tip {
background: rgba(56, 189, 248, 0.08);
border-left: 3px solid var(--primary-color);
border-top: none; border-right: none; border-bottom: none;
}
.callout .callout-title { font-weight: 600; margin-bottom: 0.5rem; }
.callout p:last-child { margin-bottom: 0; }
@media (max-width: 768px) {
.page-toc ol { columns: 1; }
.page-toc { padding: 1.25rem 1.25rem; }
.main-content h2[id] a.anchor-link,
.main-content h3[id] a.anchor-link { display: none; }
.strategy-grid { grid-template-columns: 1fr; }
.strategy-card { padding: 1.25rem; }
.callout { padding: 1rem 1.25rem; }
.diagram-frame { overflow-x: auto; -webkit-overflow-scrolling: touch; }
.comparison-table {
display: block;
overflow-x: auto;
-webkit-overflow-scrolling: touch;
}
.comparison-table th,
.comparison-table td {
padding: 0.625rem 0.75rem;
white-space: nowrap;
font-size: 0.85rem;
}
}
@media (max-width: 480px) {
.comparison-table th,
.comparison-table td {
padding: 0.5rem 0.625rem;
font-size: 0.8rem;
}
}
</style>
</head>
<body>
<button id="menu-toggle" class="menu-toggle" aria-label="Toggle navigation" aria-expanded="false">☰</button>
<div class="sidebar-overlay"></div>
<nav class="sidebar" role="navigation" aria-label="Main navigation">
<a href="index.html" class="logo">
<img src="logo.png" alt="" style="height: 24px; vertical-align: middle; margin-right: 8px;">
Cano
</a>
<ul class="nav-links">
<li><a href="index.html">Home</a></li>
<li><a href="task.html">Tasks</a></li>
<li><a href="nodes.html">Nodes</a></li>
<li><a href="workflows.html" class="active">Workflows</a></li>
<li><a href="store.html">Store</a></li>
<li><a href="scheduler.html">Scheduler</a></li>
<li><a href="tracing.html">Tracing</a></li>
</ul>
<div class="sidebar-footer">
<span class="version-badge">v0.8.0</span>
<div class="sidebar-links">
<a href="https://github.com/nassor/cano" title="GitHub Repository" aria-label="GitHub">GitHub</a>
<a href="https://crates.io/crates/cano" title="Crates.io" aria-label="Crates.io">Crates.io</a>
<a href="https://docs.rs/cano" title="API Documentation" aria-label="API Docs">Docs.rs</a>
</div>
</div>
</nav>
<main class="main-content">
<div class="content-wrapper">
<h1>Workflows</h1>
<p class="subtitle">Orchestrate complex processes with state machine semantics.</p>
<details class="page-toc" open>
<summary>On this page</summary>
<ol>
<li><a href="#defining-states">Defining States</a></li>
<li><a href="#building-a-workflow">Building a Workflow</a></li>
<li><a href="#builder-pattern">Builder Pattern and #[must_use]</a></li>
<li><a href="#validation">Workflow Validation</a></li>
<li><a href="#error-handling">Error Handling</a></li>
<li><a href="#split-join">Split/Join Workflows</a></li>
<li class="toc-sub"><a href="#join-strategies">Join Strategies</a></li>
<li class="toc-sub"><a href="#complete-example">Complete Example</a></li>
<li><a href="#join-strategy-examples">Join Strategy Examples</a></li>
<li><a href="#comparison-table">Comparison Table</a></li>
<li><a href="#parallel-patterns">Common Parallel Patterns</a></li>
<li><a href="#ad-exchange">Advanced: Ad Exchange</a></li>
</ol>
</details>
<p>
Workflows in Cano are state machines. You define a set of states (usually an enum) and register a <code>Task</code> or <code>Node</code> for each state.
The workflow engine manages the transitions between these states until an exit state is reached.
</p>
<h2 id="defining-states"><a href="#defining-states" class="anchor-link" aria-hidden="true">#</a>Defining States</h2>
<pre><code class="language-rust">#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum OrderState {
Start,
Validate,
Process,
Complete,
Failed,
}</code></pre>
<h2 id="building-a-workflow"><a href="#building-a-workflow" class="anchor-link" aria-hidden="true">#</a>Building a Workflow</h2>
<div class="diagram-frame">
<p class="diagram-label">Workflow State Transitions</p>
<div class="mermaid">
graph TD
A[Start] --> B[Validate]
B -->|Valid| C[Process]
B -->|Invalid| D[Failed]
C --> E[Complete]
</div>
</div>
<h3 id="linear-example"><a href="#linear-example" class="anchor-link" aria-hidden="true">#</a>Linear Workflow Example</h3>
<pre><code class="language-rust">use cano::prelude::*;
use async_trait::async_trait;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum OrderState {
Start,
Validate,
Process,
Complete,
Failed,
}
// Define simple tasks (omitted for brevity, see Tasks page)
struct ValidateTask;
struct ProcessTask;
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let store = MemoryStore::new();
// Build workflow using the builder pattern ā each method consumes self
// and returns a new Workflow, so you must capture the return value.
let workflow = Workflow::new(store.clone())
// 1. Register Tasks for each state
.register(OrderState::Start, |_: &MemoryStore| async {
Ok(TaskResult::Single(OrderState::Validate))
})
.register(OrderState::Validate, ValidateTask)
.register(OrderState::Process, ProcessTask)
// 2. Define Exit States (Workflow stops here)
.add_exit_states(vec![OrderState::Complete, OrderState::Failed]);
// 3. Execute
let result = workflow.orchestrate(OrderState::Start).await?;
println!("Final State: {:?}", result);
Ok(())
}</code></pre>
<h2 id="builder-pattern"><a href="#builder-pattern" class="anchor-link" aria-hidden="true">#</a>Builder Pattern and #[must_use]</h2>
<p>
Workflow uses a builder pattern where <code>register()</code>, <code>register_split()</code>, and
<code>add_exit_state()</code> all consume <code>self</code> and return a new <code>Workflow</code>.
The <code>#[must_use]</code> attribute on <code>Workflow</code> and <code>JoinConfig</code> means the compiler
will warn you if you discard the return value. If you forget to capture it, the registration is silently lost.
</p>
<div class="callout callout-warn">
<p class="callout-title">Warning: Do not discard the return value</p>
<pre style="margin: 0 !important;"><code class="language-rust">// WRONG ā registration is lost!
let workflow = Workflow::new(store.clone());
workflow.register(State::Start, my_task); // returns a new Workflow, but it is discarded
// CORRECT ā capture the returned workflow
let workflow = Workflow::new(store.clone());
let workflow = workflow.register(State::Start, my_task);
// BEST ā chain calls in a single expression
let workflow = Workflow::new(store.clone())
.register(State::Start, my_task)
.add_exit_state(State::Complete);</code></pre>
</div>
<h2 id="validation"><a href="#validation" class="anchor-link" aria-hidden="true">#</a>Workflow Validation</h2>
<p>
Before orchestrating a workflow, you can validate its configuration to catch common mistakes early.
Cano provides two validation methods that check for different categories of problems.
</p>
<h3 id="validate-method"><a href="#validate-method" class="anchor-link" aria-hidden="true">#</a>validate()</h3>
<p>
Checks the overall workflow structure. Returns <code>CanoError::Configuration</code> if problems are found.
</p>
<div class="card-stack">
<div class="card">
<h3>Checks performed</h3>
<p>No handlers registered ā the workflow has no states mapped to tasks.</p>
<p>No exit states defined ā the workflow has no way to terminate.</p>
</div>
</div>
<h3 id="validate-initial-state"><a href="#validate-initial-state" class="anchor-link" aria-hidden="true">#</a>validate_initial_state()</h3>
<p>
Checks that a specific initial state has a handler registered. Returns <code>CanoError::Configuration</code>
if the given state has no registered task or split handler.
</p>
<pre><code class="language-rust">let workflow = Workflow::new(store.clone())
.register(State::Start, MyTask)
.register(State::Process, ProcessTask)
.add_exit_state(State::Complete);
// Validate structure: ensures handlers and exit states exist
workflow.validate()?;
// Validate that the initial state has a handler
workflow.validate_initial_state(&State::Start)?;
// Safe to orchestrate
let result = workflow.orchestrate(State::Start).await?;</code></pre>
<h2 id="error-handling"><a href="#error-handling" class="anchor-link" aria-hidden="true">#</a>Error Handling</h2>
<p>
The <code>orchestrate()</code> method can return several error variants depending on what goes wrong
during execution. Understanding these errors helps you build robust error recovery logic.
</p>
<table class="comparison-table">
<thead>
<tr>
<th>Error Variant</th>
<th>Condition</th>
<th>How to Fix</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>CanoError::Workflow</code></td>
<td>No handler registered for current state</td>
<td>Register a task for every reachable state with <code>register()</code></td>
</tr>
<tr>
<td><code>CanoError::Workflow</code></td>
<td>Single task returned <code>TaskResult::Split</code></td>
<td>Use <code>register_split()</code> instead of <code>register()</code> for parallel tasks</td>
</tr>
<tr>
<td><code>CanoError::Workflow</code></td>
<td>Workflow timeout exceeded</td>
<td>Increase <code>with_timeout()</code> or optimize task execution time</td>
</tr>
<tr>
<td><code>CanoError::Configuration</code></td>
<td><code>PartialTimeout</code> strategy used without timeout configured</td>
<td>Add <code>.with_timeout(duration)</code> to <code>JoinConfig</code></td>
</tr>
<tr>
<td><code>CanoError::RetryExhausted</code></td>
<td>All retry attempts exhausted by a Node</td>
<td>Increase retry count or fix the underlying transient failure</td>
</tr>
<tr>
<td><code>CanoError::*</code></td>
<td>Any error propagated from task execution</td>
<td>Check the specific task logic ā <code>NodeExecution</code>, <code>Preparation</code>, <code>Store</code>, etc.</td>
</tr>
</tbody>
</table>
<pre><code class="language-rust">match workflow.orchestrate(State::Start).await {
Ok(final_state) => println!("Completed: {:?}", final_state),
Err(CanoError::Workflow(msg)) => eprintln!("Workflow error: {}", msg),
Err(CanoError::Configuration(msg)) => eprintln!("Config error: {}", msg),
Err(CanoError::RetryExhausted(msg)) => eprintln!("Retries exhausted: {}", msg),
Err(e) => eprintln!("Task error: {}", e),
}</code></pre>
<h2 id="split-join"><a href="#split-join" class="anchor-link" aria-hidden="true">#</a>Split/Join Workflows</h2>
<p>
Execute multiple tasks in parallel and control how they proceed using flexible join strategies.
This is essential for scatter-gather patterns, redundant API calls, and performance optimization.
</p>
<div class="diagram-frame">
<p class="diagram-label">Split / Join Pattern</p>
<div class="mermaid">
graph TD
A[Process State] -->|Split| B[Task 1]
A -->|Split| C[Task 2]
A -->|Split| D[Task 3]
B --> E{Join Strategy}
C --> E
D --> E
E -->|Satisfied| F[Aggregate State]
E -->|Failed/Timeout| G[Error State]
</div>
</div>
<h3 id="join-strategies"><a href="#join-strategies" class="anchor-link" aria-hidden="true">#</a>Join Strategies</h3>
<p>Cano provides several strategies to control how parallel tasks are aggregated.</p>
<div class="strategy-grid">
<div class="strategy-card">
<p class="strategy-name">All</p>
<p>Wait for <strong>all</strong> tasks to complete successfully.</p>
<pre><code class="language-rust">JoinStrategy::All</code></pre>
</div>
<div class="strategy-card">
<p class="strategy-name">Any</p>
<p>Proceed after the <strong>first</strong> task completes successfully.</p>
<pre><code class="language-rust">JoinStrategy::Any</code></pre>
</div>
<div class="strategy-card">
<p class="strategy-name">Quorum(n)</p>
<p>Wait for <strong>n</strong> tasks to complete successfully.</p>
<pre><code class="language-rust">JoinStrategy::Quorum(2)</code></pre>
</div>
<div class="strategy-card">
<p class="strategy-name">Percentage(p)</p>
<p>Wait for <strong>p%</strong> of tasks to complete successfully.</p>
<pre><code class="language-rust">JoinStrategy::Percentage(0.5)</code></pre>
</div>
<div class="strategy-card">
<p class="strategy-name">PartialResults(n)</p>
<p>Proceed after <strong>n</strong> tasks complete successfully.</p>
<pre><code class="language-rust">JoinStrategy::PartialResults(3)</code></pre>
</div>
<div class="strategy-card">
<p class="strategy-name">PartialTimeout</p>
<p>Accept whatever completes before <strong>timeout</strong> expires. Requires <code>.with_timeout()</code>.</p>
<pre><code class="language-rust">JoinStrategy::PartialTimeout</code></pre>
</div>
</div>
<h3 id="complete-example"><a href="#complete-example" class="anchor-link" aria-hidden="true">#</a>Complete Example</h3>
<p>Here is a complete, runnable example demonstrating how to use Split/Join with different strategies.</p>
<pre><code class="language-rust">use cano::prelude::*;
use async_trait::async_trait;
use std::time::Duration;
// 1. Define Workflow State
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum DataState {
Start,
LoadData,
ParallelProcessing,
Aggregate,
Complete,
}
// 2. Task to load initial data
#[derive(Clone)]
struct DataLoader;
#[async_trait]
impl Task<DataState> for DataLoader {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<DataState>, CanoError> {
println!("Loading initial data...");
// Load some data to process
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
store.put("input_data", data)?;
println!("Data loaded: 10 numbers");
Ok(TaskResult::Single(DataState::ParallelProcessing))
}
}
// 3. Parallel processing task
#[derive(Clone)]
struct ProcessorTask {
task_id: usize,
}
impl ProcessorTask {
fn new(task_id: usize) -> Self {
Self { task_id }
}
}
#[async_trait]
impl Task<DataState> for ProcessorTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<DataState>, CanoError> {
println!("Processor {} starting...", self.task_id);
// Get input data
let data: Vec<i32> = store.get("input_data")?;
// Simulate processing time
tokio::time::sleep(Duration::from_millis(100 * self.task_id as u64)).await;
// Process data (simple example: multiply by task_id)
let result: i32 = data.iter().map(|&x| x * self.task_id as i32).sum();
// Store individual result
store.put(&format!("result_{}", self.task_id), result)?;
println!("Processor {} completed with result: {}", self.task_id, result);
Ok(TaskResult::Single(DataState::Aggregate))
}
}
// 4. Aggregator task
#[derive(Clone)]
struct Aggregator;
#[async_trait]
impl Task<DataState> for Aggregator {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<DataState>, CanoError> {
println!("Aggregating results...");
// Collect all results
let mut total = 0;
let mut count = 0;
for i in 1..=3 {
if let Ok(result) = store.get::<i32>(&format!("result_{}", i)) {
total += result;
count += 1;
}
}
store.put("final_result", total)?;
store.put("processor_count", count)?;
println!("Aggregation complete: {} processors, total: {}", count, total);
Ok(TaskResult::Single(DataState::Complete))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let store = MemoryStore::new();
// Define tasks to run in parallel
let processors = vec![
ProcessorTask::new(1),
ProcessorTask::new(2),
ProcessorTask::new(3),
];
// Configure Join Strategy: Wait for ALL tasks
let join_config = JoinConfig::new(
JoinStrategy::All,
DataState::Aggregate
).with_timeout(Duration::from_secs(5));
// Build Workflow
let workflow = Workflow::new(store.clone())
.register(DataState::Start, DataLoader)
.register_split(
DataState::ParallelProcessing,
processors,
join_config
)
.register(DataState::Aggregate, Aggregator)
.add_exit_state(DataState::Complete);
// Run Workflow
let result = workflow.orchestrate(DataState::Start).await?;
let final_result: i32 = store.get("final_result")?;
println!("Workflow completed: {:?}", result);
println!("Final result: {}", final_result);
Ok(())
}</code></pre>
<h2 id="join-strategy-examples"><a href="#join-strategy-examples" class="anchor-link" aria-hidden="true">#</a>Join Strategy Examples</h2>
<p>Each strategy handles parallel task completion differently. Here are detailed examples for each strategy.</p>
<h3 id="strategy-all"><a href="#strategy-all" class="anchor-link" aria-hidden="true">#</a>1. All Strategy - Wait for All Tasks</h3>
<p>Waits for all tasks to complete successfully. Fails if any task fails.</p>
<div class="diagram-frame">
<p class="diagram-label">All Strategy</p>
<div class="mermaid">
sequenceDiagram
participant W as Workflow
participant T1 as Task 1
participant T2 as Task 2
participant T3 as Task 3
W->>T1: Start
W->>T2: Start
W->>T3: Start
T1-->>W: Complete ā
T2-->>W: Complete ā
T3-->>W: Complete ā
Note over W: All Complete ā Proceed
</div>
</div>
<pre><code class="language-rust">// All Strategy: Best for workflows requiring complete data
let join_config = JoinConfig::new(
JoinStrategy::All,
DataState::Aggregate
).with_timeout(Duration::from_secs(10));
let workflow = Workflow::new(store.clone())
.register(DataState::Start, DataLoader)
.register_split(
DataState::ParallelProcessing,
vec![ProcessorTask::new(1), ProcessorTask::new(2), ProcessorTask::new(3)],
join_config
)
.register(DataState::Aggregate, Aggregator)
.add_exit_state(DataState::Complete);
</code></pre>
<h3 id="strategy-any"><a href="#strategy-any" class="anchor-link" aria-hidden="true">#</a>2. Any Strategy - First to Complete</h3>
<p>Proceeds as soon as the first task completes successfully. Other tasks are cancelled.</p>
<div class="diagram-frame">
<p class="diagram-label">Any Strategy</p>
<div class="mermaid">
sequenceDiagram
participant W as Workflow
participant T1 as Task 1 (slow)
participant T2 as Task 2 (fast)
participant T3 as Task 3 (slow)
W->>T1: Start
W->>T2: Start
W->>T3: Start
T2-->>W: Complete ā
Note over W: First Complete ā Proceed
W->>T1: Cancel
W->>T3: Cancel
</div>
</div>
<pre><code class="language-rust">// Any Strategy: Best for redundant API calls or fastest-wins scenarios
let join_config = JoinConfig::new(
JoinStrategy::Any,
DataState::Complete // Skip aggregation, proceed directly
);
// Example: Call 3 different data sources, use whoever responds first
let workflow = Workflow::new(store.clone())
.register(DataState::Start, DataLoader)
.register_split(
DataState::ParallelProcessing,
vec![
ApiCallTask::new("provider1"),
ApiCallTask::new("provider2"),
ApiCallTask::new("provider3"),
],
join_config
)
.add_exit_state(DataState::Complete);
</code></pre>
<h3 id="strategy-quorum"><a href="#strategy-quorum" class="anchor-link" aria-hidden="true">#</a>3. Quorum Strategy - Wait for N Tasks</h3>
<p>Proceeds after a specific number of tasks complete successfully. Useful for consensus systems.</p>
<div class="diagram-frame">
<p class="diagram-label">Quorum Strategy</p>
<div class="mermaid">
sequenceDiagram
participant W as Workflow
participant T1 as Task 1
participant T2 as Task 2
participant T3 as Task 3
participant T4 as Task 4
W->>T1: Start
W->>T2: Start
W->>T3: Start
W->>T4: Start
T1-->>W: Complete ā
T3-->>W: Complete ā
T2-->>W: Complete ā
Note over W: Quorum (3/4) Met ā Proceed
W->>T4: Cancel
</div>
</div>
<pre><code class="language-rust">// Quorum Strategy: Best for distributed consensus or majority voting
let join_config = JoinConfig::new(
JoinStrategy::Quorum(3), // Need 3 out of 5 to succeed
DataState::Aggregate
).with_timeout(Duration::from_secs(5));
// Example: Write to 5 replicas, succeed when 3 confirm
let workflow = Workflow::new(store.clone())
.register(DataState::Start, PrepareData)
.register_split(
DataState::ParallelProcessing,
vec![
WriteReplica::new(1),
WriteReplica::new(2),
WriteReplica::new(3),
WriteReplica::new(4),
WriteReplica::new(5),
],
join_config
)
.register(DataState::Aggregate, ConfirmWrite)
.add_exit_state(DataState::Complete);
</code></pre>
<h3 id="strategy-percentage"><a href="#strategy-percentage" class="anchor-link" aria-hidden="true">#</a>4. Percentage Strategy - Wait for % of Tasks</h3>
<p>Proceeds after a percentage of tasks complete successfully. Flexible for varying batch sizes.</p>
<div class="diagram-frame">
<p class="diagram-label">Percentage Strategy</p>
<div class="mermaid">
sequenceDiagram
participant W as Workflow
participant T1 as Task 1
participant T2 as Task 2
participant T3 as Task 3
participant T4 as Task 4
W->>T1: Start
W->>T2: Start
W->>T3: Start
W->>T4: Start
T1-->>W: Complete ā
T2-->>W: Complete ā
T4-->>W: Complete ā
Note over W: 75% (3/4) Met ā Proceed
W->>T3: Cancel
</div>
</div>
<pre><code class="language-rust">// Percentage Strategy: Best for batch processing with acceptable partial results
let join_config = JoinConfig::new(
JoinStrategy::Percentage(0.75), // Need 75% to succeed
DataState::Aggregate
).with_timeout(Duration::from_secs(10));
// Example: Process 100 records, proceed when 75 complete
let mut tasks = Vec::new();
for i in 0..100 {
tasks.push(RecordProcessor::new(i));
}
let workflow = Workflow::new(store.clone())
.register(DataState::Start, LoadRecords)
.register_split(
DataState::ParallelProcessing,
tasks,
join_config
)
.register(DataState::Aggregate, SummarizeResults)
.add_exit_state(DataState::Complete);
</code></pre>
<h3 id="strategy-partial-results"><a href="#strategy-partial-results" class="anchor-link" aria-hidden="true">#</a>5. PartialResults Strategy - Accept Partial Completion</h3>
<p>Proceeds after N tasks complete successfully, cancels remaining tasks. Tracks all outcomes.</p>
<div class="diagram-frame">
<p class="diagram-label">PartialResults Strategy</p>
<div class="mermaid">
sequenceDiagram
participant W as Workflow
participant T1 as Task 1
participant T2 as Task 2
participant T3 as Task 3
participant T4 as Task 4
W->>T1: Start
W->>T2: Start
W->>T3: Start
W->>T4: Start
T1-->>W: Complete ā
T2-->>W: Failed ā
T3-->>W: Complete ā
Note over W: 2 Successes ā Proceed
W->>T4: Cancel
Note over W: Track: 2 success, 1 error, 1 cancelled
</div>
</div>
<pre><code class="language-rust">// PartialResults Strategy: Best for fault-tolerant systems with latency optimization
let join_config = JoinConfig::new(
JoinStrategy::PartialResults(2), // Proceed after any 2 succeed
DataState::Aggregate
)
.with_timeout(Duration::from_secs(5))
.with_store_partial_results(true); // Store detailed results
// Example: Call multiple services, use fastest 3 responses
let workflow = Workflow::new(store.clone())
.register(DataState::Start, PrepareRequest)
.register_split(
DataState::ParallelProcessing,
vec![
ServiceCall::new("fast-service"),
ServiceCall::new("medium-service"),
ServiceCall::new("slow-service"),
ServiceCall::new("backup-service"),
],
join_config
)
.register(DataState::Aggregate, MergePartialResults)
.add_exit_state(DataState::Complete);
// After execution, check stored result counts
let successes: usize = store.get("split_successes_count")?;
let errors: usize = store.get("split_errors_count")?;
let cancelled: usize = store.get("split_cancelled_count")?;
println!("Successes: {}", successes);
println!("Errors: {}", errors);
println!("Cancelled: {}", cancelled);
</code></pre>
<h3 id="strategy-partial-timeout"><a href="#strategy-partial-timeout" class="anchor-link" aria-hidden="true">#</a>6. PartialTimeout Strategy - Deadline-Based Completion</h3>
<p>Accepts whatever completes before timeout expires. Proceeds with available results.</p>
<div class="diagram-frame">
<p class="diagram-label">PartialTimeout Strategy</p>
<div class="mermaid">
sequenceDiagram
participant W as Workflow
participant T1 as Task 1
participant T2 as Task 2
participant T3 as Task 3
participant T4 as Task 4
W->>T1: Start
W->>T2: Start
W->>T3: Start
W->>T4: Start
T1-->>W: Complete ā
T3-->>W: Complete ā
Note over W: Timeout Reached
W->>T2: Cancel
W->>T4: Cancel
Note over W: Proceed with 2 results
</div>
</div>
<pre><code class="language-rust">// PartialTimeout Strategy: Best for real-time systems with strict SLAs
let join_config = JoinConfig::new(
JoinStrategy::PartialTimeout, // Must specify timeout
DataState::Aggregate
)
.with_timeout(Duration::from_millis(500)) // 500ms deadline
.with_store_partial_results(true);
// Example: Real-time recommendation system with 500ms SLA
let workflow = Workflow::new(store.clone())
.register(DataState::Start, LoadUserContext)
.register_split(
DataState::ParallelProcessing,
vec![
RecommendationEngine::new("collaborative"),
RecommendationEngine::new("content-based"),
RecommendationEngine::new("trending"),
RecommendationEngine::new("personalized"),
],
join_config
)
.register(DataState::Aggregate, |store: &MemoryStore| async move {
// Aggregate whatever results we got within deadline
let successes: usize = store.get("split_successes_count")?;
println!("Got {} recommendations within SLA", successes);
Ok(TaskResult::Single(DataState::Complete))
})
.add_exit_state(DataState::Complete);
</code></pre>
<h2 id="comparison-table"><a href="#comparison-table" class="anchor-link" aria-hidden="true">#</a>Comparison Table</h2>
<table class="comparison-table">
<thead>
<tr>
<th>Strategy</th>
<th>Trigger Condition</th>
<th>Cancels Others</th>
<th>Best Use Case</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>All</code></td>
<td>All tasks succeed</td>
<td>No</td>
<td>Complete data required</td>
</tr>
<tr>
<td><code>Any</code></td>
<td>First success</td>
<td>Yes</td>
<td>Redundant API calls</td>
</tr>
<tr>
<td><code>Quorum(n)</code></td>
<td>N tasks succeed</td>
<td>Yes</td>
<td>Distributed consensus</td>
</tr>
<tr>
<td><code>Percentage(p)</code></td>
<td>P% succeed</td>
<td>Yes</td>
<td>Batch processing</td>
</tr>
<tr>
<td><code>PartialResults(n)</code></td>
<td>N tasks succeed</td>
<td>Yes</td>
<td>Latency optimization</td>
</tr>
<tr>
<td><code>PartialTimeout</code></td>
<td>Timeout reached</td>
<td>Yes</td>
<td>Strict SLA requirements</td>
</tr>
</tbody>
</table>
<h2 id="parallel-patterns"><a href="#parallel-patterns" class="anchor-link" aria-hidden="true">#</a>Common Parallel Processing Patterns</h2>
<p>
Split/Join is powerful for handling complex parallel processing scenarios. Here are real-world patterns
you can implement to replace the need for concurrent workflows.
</p>
<h3 id="pattern-queue"><a href="#pattern-queue" class="anchor-link" aria-hidden="true">#</a>Pattern 1: Queue Consumer with Batch Processing</h3>
<p>
Process items from a queue in parallel batches. Instead of running multiple workflow instances concurrently,
use a single workflow that pulls batches and processes them in parallel.
</p>
<pre><code class="language-rust">use cano::prelude::*;
use async_trait::async_trait;
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum QueueState {
PullBatch,
ProcessBatch,
Complete,
Idle,
}
// Simulated queue (in production, use actual queue like Redis, SQS, etc.)
type SharedQueue = Arc<Mutex<VecDeque<String>>>;
#[derive(Clone)]
struct QueuePuller {
queue: SharedQueue,
batch_size: usize,
}
#[async_trait]
impl Task<QueueState> for QueuePuller {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<QueueState>, CanoError> {
let mut queue = self.queue.lock().await;
// Pull batch from queue
let mut batch = Vec::new();
for _ in 0..self.batch_size {
if let Some(item) = queue.pop_front() {
batch.push(item);
} else {
break;
}
}
if batch.is_empty() {
println!("Queue empty, waiting...");
// Wait and retry
tokio::time::sleep(Duration::from_secs(1)).await;
return Ok(TaskResult::Single(QueueState::PullBatch));
}
println!("Pulled {} items from queue", batch.len());
store.put("current_batch", batch)?;
// Split into parallel processing
Ok(TaskResult::Single(QueueState::ProcessBatch))
}
}
#[derive(Clone)]
struct ItemProcessor {
item_id: String,
}
#[async_trait]
impl Task<QueueState> for ItemProcessor {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<QueueState>, CanoError> {
println!("Processing item: {}", self.item_id);
// Simulate processing
tokio::time::sleep(Duration::from_millis(500)).await;
// Store result
store.put(&format!("result_{}", self.item_id), "completed")?;
Ok(TaskResult::Single(QueueState::Complete))
}
}
#[derive(Clone)]
struct BatchSplitter {
queue: SharedQueue,
batch_size: usize,
}
#[async_trait]
impl Task<QueueState> for BatchSplitter {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<QueueState>, CanoError> {
let batch: Vec<String> = store.get("current_batch")?;
if batch.is_empty() {
return Ok(TaskResult::Single(QueueState::PullBatch));
}
// Create processors for each item in parallel
let processors: Vec<Box<dyn Task<QueueState>>> = batch
.into_iter()
.map(|item| Box::new(ItemProcessor { item_id: item }) as Box<dyn Task<QueueState>>)
.collect();
println!("Splitting into {} parallel processors", processors.len());
// Return split to process all items in parallel
Ok(TaskResult::Split(
processors.into_iter()
.map(|_| QueueState::Complete)
.collect()
))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let store = MemoryStore::new();
let queue = Arc::new(Mutex::new(VecDeque::from(vec![
"order1".to_string(),
"order2".to_string(),
"order3".to_string(),
"order4".to_string(),
"order5".to_string(),
])));
let workflow = Workflow::new(store.clone())
.register(QueueState::PullBatch, QueuePuller {
queue: queue.clone(),
batch_size: 10
})
.register(QueueState::ProcessBatch, BatchSplitter {
queue: queue.clone(),
batch_size: 10,
})
.add_exit_state(QueueState::Complete);
// Process batches continuously until queue empty
loop {
let result = workflow.orchestrate(QueueState::PullBatch).await?;
if result == QueueState::Complete {
let q = queue.lock().await;
if q.is_empty() {
break;
}
}
}
println!("ā
All items processed!");
Ok(())
}</code></pre>
<h3 id="pattern-dynamic"><a href="#pattern-dynamic" class="anchor-link" aria-hidden="true">#</a>Pattern 2: Dynamic Task Generation</h3>
<p>
Generate parallel tasks dynamically based on runtime data. Perfect for processing variable-size datasets
or handling events that arrive over time.
</p>
<pre><code class="language-rust">use cano::prelude::*;
use async_trait::async_trait;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum DataState {
LoadRecords,
ProcessBatch,
Aggregate,
Complete,
}
// Load records and store them so the split tasks can read them
#[derive(Clone)]
struct RecordLoader;
#[async_trait]
impl Task<DataState> for RecordLoader {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<DataState>, CanoError> {
let records: Vec<i32> = (1..=100).collect();
store.put("records", records)?;
println!("Loaded 100 records");
Ok(TaskResult::Single(DataState::ProcessBatch))
}
}
// Each processor reads from the shared store and handles one record by index
#[derive(Clone)]
struct RecordProcessor {
index: usize,
}
#[async_trait]
impl Task<DataState> for RecordProcessor {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<DataState>, CanoError> {
let records: Vec<i32> = store.get("records")?;
let value = records[self.index];
tokio::time::sleep(Duration::from_millis(10)).await;
store.put(&format!("result_{}", self.index), value * 2)?;
Ok(TaskResult::Single(DataState::Aggregate))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let store = MemoryStore::new();
// Build the processor tasks before constructing the workflow
let processors: Vec<RecordProcessor> = (0..100).map(|i| RecordProcessor { index: i }).collect();
let join_config = JoinConfig::new(JoinStrategy::All, DataState::Aggregate);
let workflow = Workflow::new(store.clone())
.register(DataState::LoadRecords, RecordLoader)
.register_split(DataState::ProcessBatch, processors, join_config)
.register(DataState::Aggregate, |_: &MemoryStore| async {
println!("All records processed");
Ok(TaskResult::Single(DataState::Complete))
})
.add_exit_state(DataState::Complete);
workflow.orchestrate(DataState::LoadRecords).await?;
Ok(())
}
</code></pre>
<h3 id="pattern-resource"><a href="#pattern-resource" class="anchor-link" aria-hidden="true">#</a>Pattern 3: Resource-Limited Parallel Processing</h3>
<p>
Control parallelism when you have limited resources (API keys, connections, etc.).
Use semaphores to limit concurrent executions within a single workflow.
</p>
<pre><code class="language-rust">use tokio::sync::Semaphore;
use std::sync::Arc;
#[derive(Clone)]
struct RateLimitedApiTask {
api_id: usize,
semaphore: Arc<Semaphore>, // Limit concurrent API calls
}
#[async_trait]
impl Task<ApiState> for RateLimitedApiTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<ApiState>, CanoError> {
// Acquire permit (blocks if limit reached)
let _permit = self.semaphore.acquire().await
.map_err(|e| CanoError::task_execution(e.to_string()))?;
println!("API call {} starting (within rate limit)", self.api_id);
// Make API call
let result = make_api_call(self.api_id).await?;
store.put(&format!("api_result_{}", self.api_id), result)?;
println!("API call {} completed", self.api_id);
Ok(TaskResult::Single(ApiState::Complete))
}
}
// Example: Limit to 5 concurrent API calls
let semaphore = Arc::new(Semaphore::new(5));
let mut tasks = Vec::new();
for i in 0..20 {
tasks.push(RateLimitedApiTask {
api_id: i,
semaphore: semaphore.clone(),
});
}
// All 20 tasks will run, but only 5 at a time
let join_config = JoinConfig::new(JoinStrategy::All, ApiState::Complete);
let workflow = Workflow::new(store.clone())
.register_split(ApiState::Start, tasks, join_config)
.add_exit_state(ApiState::Complete);
</code></pre>
<h3 id="pattern-continuous"><a href="#pattern-continuous" class="anchor-link" aria-hidden="true">#</a>Pattern 4: Continuous Workflow with Split/Join</h3>
<p>
Combine scheduling with split/join for continuous parallel processing.
This replaces the need for concurrent workflow scheduling.
</p>
<pre><code class="language-rust">use cano::prelude::*;
// WorkProcessor handles a single item identified by index in the store
#[derive(Clone)]
struct WorkProcessor {
item_index: usize,
}
#[async_trait]
impl Task<ProcessState> for WorkProcessor {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<ProcessState>, CanoError> {
let items: Vec<String> = store.get("work_items")?;
if let Some(item) = items.get(self.item_index) {
println!("Processing item: {}", item);
// ... processing logic
}
Ok(TaskResult::Single(ProcessState::Complete))
}
}
// LoaderTask fetches work and registers parallel processors via register_split at workflow build time.
// Because the split tasks are registered statically, the batch size is fixed per workflow instance.
#[derive(Clone)]
struct BatchLoaderTask;
#[async_trait]
impl Task<ProcessState> for BatchLoaderTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<ProcessState>, CanoError> {
let items = fetch_pending_work().await?;
if items.is_empty() {
println!("No work available");
return Ok(TaskResult::Single(ProcessState::Complete));
}
store.put("work_items", items)?;
Ok(TaskResult::Single(ProcessState::ProcessBatch))
}
}
#[tokio::main]
async fn main() -> Result<(), CanoError> {
let mut scheduler = Scheduler::new();
let store = MemoryStore::new();
// Build worker tasks for a fixed batch size; adjust batch_size as needed.
let batch_size = 10usize;
let processors: Vec<WorkProcessor> = (0..batch_size).map(|i| WorkProcessor { item_index: i }).collect();
let join_config = JoinConfig::new(JoinStrategy::All, ProcessState::Complete);
let batch_workflow = Workflow::new(store.clone())
.register(ProcessState::Start, BatchLoaderTask)
.register_split(ProcessState::ProcessBatch, processors, join_config)
.add_exit_state(ProcessState::Complete);
// Schedule to run every 10 seconds
scheduler.every_seconds("batch_processor", batch_workflow, ProcessState::Start, 10)?;
scheduler.start().await?;
Ok(())
}</code></pre>
<h3 id="when-to-use"><a href="#when-to-use" class="anchor-link" aria-hidden="true">#</a>When to Use These Patterns</h3>
<ul>
<li><strong>Queue Consumer Pattern</strong>: When processing items from external queues (SQS, Redis, Kafka)</li>
<li><strong>Dynamic Task Generation</strong>: When the number of parallel tasks depends on runtime data</li>
<li><strong>Resource-Limited Processing</strong>: When you need to limit concurrent operations (API rate limits, database connections)</li>
<li><strong>Continuous Workflow</strong>: When you need scheduled parallel processing without multiple workflow instances</li>
</ul>
<div class="callout callout-tip">
<p><strong>š” Key Insight:</strong> These patterns leverage Split/Join within a single workflow to achieve
the same parallelism as running multiple concurrent workflow instances, but with simpler mental model,
better resource control, and type-safe state management.</p>
</div>
<h2 id="ad-exchange"><a href="#ad-exchange" class="anchor-link" aria-hidden="true">#</a>Advanced Example: Real-Time Ad Exchange Workflow</h2>
<p>
This comprehensive example demonstrates a production-grade ad exchange system with multiple split/join points,
diverse join strategies, complex state management, and real-world constraints like timeouts and partial results.
</p>
<h3 id="ad-architecture"><a href="#ad-architecture" class="anchor-link" aria-hidden="true">#</a>System Architecture</h3>
<div class="diagram-frame">
<p class="diagram-label">Ad Exchange Architecture</p>
<div class="mermaid">
graph TB
Start[Ad Request Received] --> Validate[Validate Request]
Validate -->|Valid| Split1[SPLIT: Context Gathering]
Validate -->|Invalid| Invalid[Invalid Response]
Invalid --> Complete
subgraph "Split 1: Context Gathering - All Strategy"
Split1 --> User[Fetch User Profile]
Split1 --> Geo[Fetch Geo Data]
Split1 --> Device[Device Detection]
end
User --> Join1[JOIN: All Required 100ms timeout]
Geo --> Join1
Device --> Join1
Join1 -->|Success| Split2[SPLIT: Bid Requests]
Join1 -.->|Timeout/Error| Split5[SPLIT: Error Tracking]
subgraph "Split 2: Bid Requests - PartialTimeout Strategy"
Split2 --> DSP1[DSP-FastBidder 45ms]
Split2 --> DSP2[DSP-Premium 80ms]
Split2 --> DSP3[DSP-Global 120ms]
Split2 --> DSP4[DSP-Slow 190ms]
Split2 --> DSP5[DSP-TooSlow 250ms]
end
DSP1 --> Join2[JOIN: PartialTimeout 200ms]
DSP2 --> Join2
DSP3 --> Join2
DSP4 --> Join2
DSP5 --> Join2
Join2 --> Split3[SPLIT: Bid Scoring]
subgraph "Split 3: Bid Scoring - All Strategy"
Split3 --> Score1[Score Bid 0]
Split3 --> Score2[Score Bid 1]
Split3 --> Score3[Score Bid 2]
end
Score1 --> Join3[JOIN: All 50ms timeout]
Score2 --> Join3
Score3 --> Join3
Join3 -->|Success| Auction[Run Auction]
Join3 -.->|Timeout/Error| Split5
Auction --> Winner{Has Winner?}
Winner -->|Yes| Split4[SPLIT: Tracking]
Winner -->|No| Split5
subgraph "Split 4: Tracking - All Strategy"
Split4 --> Track1[Log Analytics]
Split4 --> Track2[Update Metrics]
Split4 --> Track3[Notify Winner]
Split4 --> Track4[Store Auction]
end
Track1 --> Join4[JOIN: All 100ms timeout]
Track2 --> Join4
Track3 --> Join4
Track4 --> Join4
Join4 -->|Success| Response[Build Response]
Join4 -.->|Timeout/Error| Split5
Response --> Complete[Complete]
subgraph "Split 5: Error Tracking - All Strategy"
Split5 --> ErrLog[Log Error]
Split5 --> ErrMetrics[Update Error Metrics]
end
ErrLog --> Join5[JOIN: All 50ms timeout]
ErrMetrics --> Join5
Join5 --> NoFill[No Fill Response]
NoFill --> Complete
style Start fill:#4CAF50
style Complete fill:#2196F3
style NoFill fill:#FF5722
style Invalid fill:#FF5722
style Split1 fill:#FF9800
style Split2 fill:#FF9800
style Split3 fill:#FF9800
style Split4 fill:#FF9800
style Split5 fill:#F59E0B
style Join1 fill:#9C27B0
style Join2 fill:#9C27B0
style Join3 fill:#9C27B0
style Join4 fill:#9C27B0
style Join5 fill:#F59E0B
</div>
</div>
<h3 id="ad-implementation"><a href="#ad-implementation" class="anchor-link" aria-hidden="true">#</a>Complete Implementation</h3>
<pre><code class="language-rust">use cano::prelude::*;
use async_trait::async_trait;
use std::time::Duration;
// ============================================================================
// State Definitions
// ============================================================================
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum AdExchangeState {
// Entry and validation
Start,
// Context gathering (Split 1)
GatherContext,
// Bid request phase (Split 2)
RequestBids,
// Auction phase (Split 3)
ScoreBids,
RunAuction,
// Tracking phase (Split 4)
TrackResults,
// Error tracking phase (Split 5)
ErrorTracking,
// Terminal states
BuildResponse,
InvalidResponse,
Complete,
Rejected,
NoFill,
}
// ============================================================================
// Data Models
// ============================================================================
#[derive(Debug, Clone)]
struct AdRequest {
request_id: String,
placement_id: String,
floor_price: f64,
}
#[derive(Debug, Clone)]
struct UserContext {}
#[derive(Debug, Clone)]
struct GeoContext {}
#[derive(Debug, Clone)]
struct DeviceContext {}
#[derive(Debug, Clone)]
struct BidResponse {
partner_id: String,
price: f64,
creative_id: String,
response_time_ms: u64,
}
#[derive(Debug, Clone)]
struct ScoredBid {
bid: BidResponse,
score: f64, // Adjusted price after quality scoring
rank: usize,
}
#[derive(Debug, Clone)]
struct AuctionResult {
winner: Option<ScoredBid>,
total_bids: usize,
auction_time_ms: u64,
}
// ============================================================================
// Phase 1: Request Validation
// ============================================================================
#[derive(Clone)]
struct ValidateRequestTask;
#[async_trait]
impl Task<AdExchangeState> for ValidateRequestTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
let request: AdRequest = store.get("ad_request")?;
println!("š Validating request {}", request.request_id);
// Validation logic
if request.placement_id.is_empty() {
println!("ā Invalid placement ID");
return Ok(TaskResult::Single(AdExchangeState::InvalidResponse));
}
if request.floor_price < 0.01 {
println!("ā Floor price too low");
return Ok(TaskResult::Single(AdExchangeState::InvalidResponse));
}
println!("ā
Request validated");
Ok(TaskResult::Single(AdExchangeState::GatherContext))
}
}
// ============================================================================
// Phase 2: Context Gathering (Split 1 - All Strategy)
// ============================================================================
// Wrapper enum for heterogeneous context gathering tasks
#[derive(Clone)]
enum ContextTask {
FetchUser,
FetchGeo,
DetectDevice,
}
#[async_trait]
impl Task<AdExchangeState> for ContextTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
match self {
ContextTask::FetchUser => {
println!(" š¤ Fetching user profile...");
tokio::time::sleep(Duration::from_millis(50)).await;
let user = UserContext {};
store.put("user_context", user)?;
println!(" ā
User profile loaded");
Ok(TaskResult::Single(AdExchangeState::RequestBids))
}
ContextTask::FetchGeo => {
println!(" š Fetching geo data...");
tokio::time::sleep(Duration::from_millis(30)).await;
let geo = GeoContext {};
store.put("geo_context", geo)?;
println!(" ā
Geo data loaded");
Ok(TaskResult::Single(AdExchangeState::RequestBids))
}
ContextTask::DetectDevice => {
println!(" š± Detecting device...");
tokio::time::sleep(Duration::from_millis(20)).await;
let device = DeviceContext {};
store.put("device_context", device)?;
println!(" ā
Device detected");
Ok(TaskResult::Single(AdExchangeState::RequestBids))
}
}
}
}
// ============================================================================
// Phase 3: Bid Requests (Split 2 - PartialTimeout Strategy)
// ============================================================================
#[derive(Clone)]
struct ContactDSPTask {
partner_id: String,
response_delay_ms: u64, // Simulated network latency
}
#[async_trait]
impl Task<AdExchangeState> for ContactDSPTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
println!(" š” Requesting bid from {}...", self.partner_id);
// Simulate DSP bid request with varying latency
tokio::time::sleep(Duration::from_millis(self.response_delay_ms)).await;
// Some DSPs might not respond in time or may not bid
if self.response_delay_ms > 180 {
// Will timeout
tokio::time::sleep(Duration::from_millis(100)).await;
}
let bid = BidResponse {
partner_id: self.partner_id.clone(),
price: 2.50 + (self.response_delay_ms as f64 / 100.0),
creative_id: format!("creative_{}", self.partner_id),
response_time_ms: self.response_delay_ms,
};
// Store bid
let mut bids: Vec<BidResponse> = store.get("bids").unwrap_or_default();
bids.push(bid.clone());
store.put("bids", bids)?;
println!(" ā
{} bid: ${:.2}", self.partner_id, bid.price);
Ok(TaskResult::Single(AdExchangeState::ScoreBids))
}
}
// ============================================================================
// Phase 4: Bid Scoring (Split 3 - Percentage Strategy)
// ============================================================================
#[derive(Clone)]
struct ScoreBidTask {
bid_index: usize,
}
#[async_trait]
impl Task<AdExchangeState> for ScoreBidTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
let bids: Vec<BidResponse> = store.get("bids")?;
if self.bid_index >= bids.len() {
return Err(CanoError::task_execution("Bid index out of range"));
}
let bid = &bids[self.bid_index];
println!(" š Scoring bid from {}...", bid.partner_id);
// Simulate scoring computation
tokio::time::sleep(Duration::from_millis(10)).await;
// Quality score based on partner history and response time
let quality_multiplier = match bid.response_time_ms {
0..=50 => 1.1, // Fast response bonus
51..=100 => 1.0, // Normal
101..=150 => 0.95, // Slight penalty
_ => 0.9, // Slow response penalty
};
let scored_bid = ScoredBid {
bid: bid.clone(),
score: bid.price * quality_multiplier,
rank: 0, // Will be set during auction
};
let score_value = scored_bid.score;
// Store scored bid
let mut scored_bids: Vec<ScoredBid> = store.get("scored_bids").unwrap_or_default();
scored_bids.push(scored_bid);
store.put("scored_bids", scored_bids)?;
println!(" ā
Bid scored: ${:.2}", score_value);
Ok(TaskResult::Single(AdExchangeState::RunAuction))
}
}
// ============================================================================
// Phase 5: Auction
// ============================================================================
#[derive(Clone)]
struct RunAuctionTask;
#[async_trait]
impl Task<AdExchangeState> for RunAuctionTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
println!("\n šÆ Running auction...");
let start = tokio::time::Instant::now();
let mut scored_bids: Vec<ScoredBid> = store.get("scored_bids")?;
let request: AdRequest = store.get("ad_request")?;
// Filter bids above floor price
scored_bids.retain(|b| b.score >= request.floor_price);
if scored_bids.is_empty() {
println!(" ā No valid bids above floor price");
let result = AuctionResult {
winner: None,
total_bids: 0,
auction_time_ms: start.elapsed().as_millis() as u64,
};
store.put("auction_result", result)?;
return Ok(TaskResult::Single(AdExchangeState::ErrorTracking));
}
// Sort by score (descending)
scored_bids.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());
// Set ranks
for (i, bid) in scored_bids.iter_mut().enumerate() {
bid.rank = i + 1;
}
let winner = scored_bids[0].clone();
println!(" š Winner: {} at ${:.2}", winner.bid.partner_id, winner.score);
let result = AuctionResult {
winner: Some(winner),
total_bids: scored_bids.len(),
auction_time_ms: start.elapsed().as_millis() as u64,
};
store.put("auction_result", result)?;
Ok(TaskResult::Single(AdExchangeState::TrackResults))
}
}
// ============================================================================
// Phase 6: Tracking (Split 4 - Quorum Strategy)
// ============================================================================
// Wrapper enum for heterogeneous tracking tasks
#[derive(Clone)]
enum TrackingTask {
LogAnalytics,
UpdateMetrics,
NotifyWinner,
StoreAuction,
}
#[async_trait]
impl Task<AdExchangeState> for TrackingTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
match self {
TrackingTask::LogAnalytics => {
println!(" š Logging to analytics...");
tokio::time::sleep(Duration::from_millis(30)).await;
let result: AuctionResult = store.get("auction_result")?;
println!(" ā
Analytics logged: {} bids", result.total_bids);
Ok(TaskResult::Single(AdExchangeState::BuildResponse))
}
TrackingTask::UpdateMetrics => {
println!(" š Updating metrics...");
tokio::time::sleep(Duration::from_millis(25)).await;
println!(" ā
Metrics updated");
Ok(TaskResult::Single(AdExchangeState::BuildResponse))
}
TrackingTask::NotifyWinner => {
println!(" š¬ Notifying winner...");
let result: AuctionResult = store.get("auction_result")?;
if let Some(winner) = result.winner {
tokio::time::sleep(Duration::from_millis(40)).await;
println!(" ā
Winner {} notified", winner.bid.partner_id);
}
Ok(TaskResult::Single(AdExchangeState::BuildResponse))
}
TrackingTask::StoreAuction => {
println!(" š¾ Storing auction data...");
tokio::time::sleep(Duration::from_millis(35)).await;
println!(" ā
Auction data stored");
Ok(TaskResult::Single(AdExchangeState::BuildResponse))
}
}
}
}
// ============================================================================
// Phase 7: Response Building
// ============================================================================
#[derive(Clone)]
struct BuildResponseTask;
#[async_trait]
impl Task<AdExchangeState> for BuildResponseTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
println!("\n š¦ Building response...");
let request: AdRequest = store.get("ad_request")?;
let result: AuctionResult = store.get("auction_result")?;
println!("\nšÆ Ad Exchange Response Summary:");
println!(" Request ID: {}", request.request_id);
println!(" Total Bids: {}", result.total_bids);
println!(" Auction Time: {}ms", result.auction_time_ms);
if let Some(winner) = result.winner {
println!(" Winner: {}", winner.bid.partner_id);
println!(" Winning Price: ${:.2}", winner.score);
println!(" Creative: {}", winner.bid.creative_id);
} else {
println!(" Result: No Fill");
}
Ok(TaskResult::Single(AdExchangeState::Complete))
}
}
// ============================================================================
// NoFill Handler
// ============================================================================
#[derive(Clone)]
struct NoFillTask;
#[async_trait]
impl Task<AdExchangeState> for NoFillTask {
async fn run(&self, _store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
println!("\nā ļø No Fill Response");
println!("Unable to complete ad request due to timeout or insufficient data.\n");
Ok(TaskResult::Single(AdExchangeState::Complete))
}
}
// ============================================================================
// Invalid Response Handler
// ============================================================================
#[derive(Clone)]
struct InvalidResponseTask;
#[async_trait]
impl Task<AdExchangeState> for InvalidResponseTask {
async fn run(&self, _store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
println!("\nā ļø Invalid Request");
println!("Request validation failed.\n");
Ok(TaskResult::Single(AdExchangeState::Complete))
}
}
// ============================================================================
// Phase 8: Error Tracking (Split 5 - All Strategy)
// ============================================================================
// Wrapper enum for error tracking tasks
#[derive(Clone)]
enum ErrorTrackingTask {
LogError,
UpdateErrorMetrics,
}
#[async_trait]
impl Task<AdExchangeState> for ErrorTrackingTask {
async fn run(&self, store: &MemoryStore) -> Result<TaskResult<AdExchangeState>, CanoError> {
match self {
ErrorTrackingTask::LogError => {
println!(" š Logging error...");
tokio::time::sleep(Duration::from_millis(20)).await;
// Determine error type from store or state
let error_type = if store.get::<AuctionResult>("auction_result").is_ok() {
"NoFill"
} else {
"Rejected"
};
println!(" ā
Error logged: {}", error_type);
Ok(TaskResult::Single(AdExchangeState::NoFill))
}
ErrorTrackingTask::UpdateErrorMetrics => {
println!(" š Updating error metrics...");
tokio::time::sleep(Duration::from_millis(25)).await;
println!(" ā
Error metrics updated");
Ok(TaskResult::Single(AdExchangeState::NoFill))
}
}
}
}
// ============================================================================
// Main Workflow Construction
// ============================================================================
fn create_ad_exchange_workflow(store: MemoryStore) -> Workflow<AdExchangeState> {
Workflow::new(store.clone())
// Phase 1: Validation
.register(AdExchangeState::Start, ValidateRequestTask)
// Invalid Response Handler
.register(AdExchangeState::InvalidResponse, InvalidResponseTask)
// Phase 2: Context Gathering - SPLIT 1 (All Strategy)
// All three must succeed to proceed within 100ms timeout
// If any task fails or timeout is exceeded, workflow will error and transition to NoFill
.register_split(
AdExchangeState::GatherContext,
vec![
ContextTask::FetchUser,
ContextTask::FetchGeo,
ContextTask::DetectDevice,
],
JoinConfig::new(
JoinStrategy::All,
AdExchangeState::RequestBids,
)
.with_timeout(Duration::from_millis(100)),
)
// Phase 3: Bid Requests - SPLIT 2 (PartialTimeout Strategy)
// Accept whatever bids come back within 200ms
.register_split(
AdExchangeState::RequestBids,
vec![
ContactDSPTask { partner_id: "DSP-FastBidder".to_string(), response_delay_ms: 45 },
ContactDSPTask { partner_id: "DSP-Premium".to_string(), response_delay_ms: 80 },
ContactDSPTask { partner_id: "DSP-Global".to_string(), response_delay_ms: 120 },
ContactDSPTask { partner_id: "DSP-Slow".to_string(), response_delay_ms: 190 },
ContactDSPTask { partner_id: "DSP-TooSlow".to_string(), response_delay_ms: 250 },
],
JoinConfig::new(
JoinStrategy::PartialTimeout,
AdExchangeState::ScoreBids,
)
.with_timeout(Duration::from_millis(200))
.with_store_partial_results(true),
)
// Phase 4: Bid Scoring - SPLIT 3 (All Strategy)
// Score all received bids within 50ms timeout
// If timeout or any scoring fails, workflow will error and transition to NoFill
.register_split(
AdExchangeState::ScoreBids,
vec![
ScoreBidTask { bid_index: 0 },
ScoreBidTask { bid_index: 1 },
ScoreBidTask { bid_index: 2 },
],
JoinConfig::new(
JoinStrategy::All,
AdExchangeState::RunAuction,
)
.with_timeout(Duration::from_millis(50)),
)
// Phase 5: Auction
.register(AdExchangeState::RunAuction, RunAuctionTask)
// Phase 6: Tracking - SPLIT 4 (All Strategy)
// All tracking tasks must complete within 100ms timeout
// If timeout or any task fails, workflow will error and transition to NoFill
.register_split(
AdExchangeState::TrackResults,
vec![
TrackingTask::LogAnalytics,
TrackingTask::UpdateMetrics,
TrackingTask::NotifyWinner,
TrackingTask::StoreAuction,
],
JoinConfig::new(
JoinStrategy::All,
AdExchangeState::BuildResponse,
)
.with_timeout(Duration::from_millis(100)),
)
// Phase 7: Response
.register(AdExchangeState::BuildResponse, BuildResponseTask)
// NoFill handler (used when splits timeout or fail)
.register(AdExchangeState::NoFill, NoFillTask)
// Phase 8: Error Tracking - SPLIT 5 (All Strategy)
// Both error logging and metrics must complete within 50ms timeout
.register_split(
AdExchangeState::ErrorTracking,
vec![
ErrorTrackingTask::LogError,
ErrorTrackingTask::UpdateErrorMetrics,
],
JoinConfig::new(
JoinStrategy::All,
AdExchangeState::NoFill,
)
.with_timeout(Duration::from_millis(50)),
)
// Terminal states
.add_exit_states(vec![
AdExchangeState::Complete,
AdExchangeState::Rejected,
])
}
// ============================================================================
// Example Usage
// ============================================================================
#[tokio::main]
async fn main() -> Result<(), CanoError> {
println!("š Real-Time Ad Exchange Workflow\n");
println!("{}", "=".repeat(60));
let store = MemoryStore::new();
// Create ad request
let request = AdRequest {
request_id: "req_abc123".to_string(),
placement_id: "placement_728x90_top".to_string(),
floor_price: 1.50,
};
store.put("ad_request", request)?;
// Build and execute workflow
let workflow = create_ad_exchange_workflow(store.clone());
println!("\nš¬ Starting ad exchange workflow...\n");
let start = tokio::time::Instant::now();
// Execute workflow - if splits timeout or fail, transition to NoFill
let result = match workflow.orchestrate(AdExchangeState::Start).await {
Ok(state) => state,
Err(e) => {
// If workflow fails due to split timeout/error, handle as NoFill
eprintln!("ā Workflow error: {}", e);
println!("\nā ļø Handling as No Fill due to error\n");
// Execute ErrorTracking state explicitly
workflow.orchestrate(AdExchangeState::ErrorTracking).await?
}
};
let total_time = start.elapsed();
println!("\n{}", "=".repeat(60));
println!("ā
Workflow completed in {:?}", total_time);
println!(" Final State: {:?}", result);
Ok(())
}</code></pre>
<h3 id="ad-features"><a href="#ad-features" class="anchor-link" aria-hidden="true">#</a>Key Features Demonstrated</h3>
<div class="card-stack">
<div class="card">
<h3>Multiple Split Points</h3>
<p><strong>Split 1:</strong> Context gathering with <code>All</code> strategy - all 3 data sources required (100ms timeout)</p>
<p><strong>Split 2:</strong> Bid requests with <code>PartialTimeout</code> - accept responses within 200ms (3-4 of 5 DSPs)</p>
<p><strong>Split 3:</strong> Bid scoring with <code>All</code> strategy - score all received bids (50ms timeout)</p>
<p><strong>Split 4:</strong> Tracking with <code>All</code> strategy - all 4 tracking tasks must complete (100ms timeout)</p>
<p><strong>Split 5:</strong> Error tracking with <code>All</code> strategy - log errors and update metrics, then transition to NoFill (50ms timeout)</p>
<p><strong>Error Handling:</strong> If any <code>All</code> strategy split times out or fails, workflow gracefully transitions to ErrorTracking, which then calls NoFill before completing</p>
</div>
<div class="card">
<h3>Real-World Constraints</h3>
<p>⢠200ms timeout for bid requests (industry standard)</p>
<p>⢠Floor price enforcement</p>
<p>⢠Quality scoring with latency penalties</p>
<p>⢠Strict tracking requirements (all systems must update)</p>
<p>⢠Graceful NoFill handling with error tracking when timeouts occur</p>
<p>⢠Comprehensive error logging and metrics on failures</p>
</div>
<div class="card">
<h3>Production Patterns</h3>
<p>⢠Parallel external service calls</p>
<p>⢠Graceful degradation (partial results)</p>
<p>⢠Time-based optimization (fastest wins)</p>
<p>⢠Complex state management across phases</p>
</div>
</div>
<h3 id="ad-output"><a href="#ad-output" class="anchor-link" aria-hidden="true">#</a>Example Output</h3>
<pre><code>š Real-Time Ad Exchange Workflow
============================================================
š¬ Starting ad exchange workflow...
š Validating request req_abc123
ā
Request validated
š¤ Fetching user profile...
š± Detecting device...
š Fetching geo data...
ā
Device detected
ā
Geo data loaded
ā
User profile loaded
š” Requesting bid from DSP-FastBidder...
š” Requesting bid from DSP-Premium...
š” Requesting bid from DSP-Global...
š” Requesting bid from DSP-TooSlow...
š” Requesting bid from DSP-Slow...
ā
DSP-FastBidder bid: $2.95
ā
DSP-Premium bid: $3.30
ā
DSP-Global bid: $3.70
š Scoring bid from DSP-FastBidder...
š Scoring bid from DSP-Premium...
š Scoring bid from DSP-Global...
ā
Bid scored: $3.52
ā
Bid scored: $3.25
ā
Bid scored: $3.30
šÆ Running auction...
š Winner: DSP-Global at $3.52
š Logging to analytics...
š Updating metrics...
š¬ Notifying winner...
š¾ Storing auction data...
ā
Metrics updated
ā
Analytics logged: 3 bids
ā
Auction data stored
ā
Winner DSP-Global notified
š¦ Building response...
šÆ Ad Exchange Response Summary:
Request ID: req_abc123
Total Bids: 3
Auction Time: 0ms
Winner: DSP-Global
Winning Price: $3.52
Creative: creative_DSP-Global
============================================================
ā
Workflow completed in 307.716975ms
Final State: Complete</code></pre>
<div class="callout callout-tip">
<p><strong>š” Production Insight:</strong> This ad exchange workflow demonstrates how to build real-time
bidding systems that handle 1000s of requests per second. The combination of <code>All</code> and
<code>PartialTimeout</code> strategies ensures optimal performance while maintaining quality and reliability.
The ~308ms total time includes 4 parallel split/join operations, showing how split/join can meet the strict
latency requirements of programmatic advertising (typically <300ms). Note how slow DSPs (DSP-TooSlow and
DSP-Slow) are automatically cancelled when they exceed the 200ms timeout, preventing them from delaying the auction.</p>
</div>
</div>
</main>
</body>
</html>