use taskflow_rs::pipeline::{ConcurrentPipeline, Token};
use std::thread;
use std::time::Duration;
use std::sync::Arc;
fn main() {
println!("=== Advanced Pipeline Processing ===\n");
demo_data_processing_pipeline();
println!();
demo_parallel_stages();
}
fn demo_data_processing_pipeline() {
println!("1. Multi-Stage Data Processing");
println!(" Raw data → Parse → Transform → Validate → Output\n");
let stage1_in = ConcurrentPipeline::new(10, 100); let stage1_out = ConcurrentPipeline::new(10, 100); let stage2_out = ConcurrentPipeline::new(10, 100); let stage3_out = ConcurrentPipeline::new(10, 100);
let s1_in = stage1_in.clone();
let s1_out = stage1_out.clone();
let s2_in = stage1_out.clone();
let s2_out = stage2_out.clone();
let s3_in = stage2_out.clone();
let s3_out = stage3_out.clone();
let stage1 = thread::spawn(move || {
println!(" [Stage 1: Parse] Started");
while !s1_in.is_stopped() {
if let Some(token) = s1_in.try_pop() {
let parsed = token.data; println!(" [Stage 1] Parsed token #{}: {}", token.index, parsed);
s1_out.push(parsed).ok();
} else {
thread::sleep(Duration::from_millis(5));
}
}
s1_out.stop();
println!(" [Stage 1: Parse] Stopped");
});
let stage2 = thread::spawn(move || {
println!(" [Stage 2: Transform] Started");
while !s2_in.is_stopped() {
if let Some(token) = s2_in.try_pop() {
let transformed = token.data * token.data;
println!(" [Stage 2] Transformed token #{}: {} → {}",
token.index, token.data, transformed);
s2_out.push(transformed).ok();
thread::sleep(Duration::from_millis(20)); } else {
thread::sleep(Duration::from_millis(5));
}
}
s2_out.stop();
println!(" [Stage 2: Transform] Stopped");
});
let stage3 = thread::spawn(move || {
println!(" [Stage 3: Validate] Started");
while !s3_in.is_stopped() {
if let Some(token) = s3_in.try_pop() {
let valid = token.data < 1000;
println!(" [Stage 3] Validated token #{}: {} (valid: {})",
token.index, token.data, valid);
if valid {
s3_out.push(token.data).ok();
}
} else {
thread::sleep(Duration::from_millis(5));
}
}
s3_out.stop();
println!(" [Stage 3: Validate] Stopped");
});
let producer = thread::spawn(move || {
println!(" [Producer] Started\n");
for i in 1..=10 {
stage1_in.push(i).unwrap();
println!(" [Producer] Sent: {}", i);
thread::sleep(Duration::from_millis(30));
}
stage1_in.stop();
println!("\n [Producer] Stopped");
});
let consumer = thread::spawn(move || {
println!(" [Consumer] Started");
let mut results = Vec::new();
while !stage3_out.is_stopped() || stage3_out.tokens_in_flight() > 0 {
if let Some(token) = stage3_out.try_pop() {
println!(" [Consumer] Received final result: {}", token.data);
results.push(token.data);
} else {
thread::sleep(Duration::from_millis(10));
}
}
println!(" [Consumer] Final results: {:?}", results);
println!(" [Consumer] Stopped");
});
producer.join().unwrap();
stage1.join().unwrap();
stage2.join().unwrap();
stage3.join().unwrap();
consumer.join().unwrap();
println!("\n ✓ Multi-stage pipeline complete");
}
fn demo_parallel_stages() {
println!("2. Parallel Processing Stages");
println!(" Multiple workers processing in parallel\n");
let input = ConcurrentPipeline::new(20, 100);
let output = ConcurrentPipeline::new(20, 100);
let num_workers = 3;
let mut workers = Vec::new();
for worker_id in 0..num_workers {
let input_clone = input.clone();
let output_clone = output.clone();
let worker = thread::spawn(move || {
println!(" [Worker {}] Started", worker_id);
let mut processed = 0;
while !input_clone.is_stopped() {
if let Some(token) = input_clone.try_pop() {
thread::sleep(Duration::from_millis(50));
let result = token.data * 2;
println!(" [Worker {}] Processed token #{}: {} → {}",
worker_id, token.index, token.data, result);
output_clone.push(result).ok();
processed += 1;
} else {
thread::sleep(Duration::from_millis(5));
}
}
println!(" [Worker {}] Stopped (processed {} tokens)", worker_id, processed);
});
workers.push(worker);
}
let input_clone = input.clone();
let producer = thread::spawn(move || {
for i in 1..=15 {
input_clone.push(i * 10).unwrap();
thread::sleep(Duration::from_millis(20));
}
input_clone.stop();
});
let output_clone = output.clone();
let consumer = thread::spawn(move || {
let mut results = Vec::new();
while !output_clone.is_stopped() || output_clone.tokens_in_flight() > 0 {
if let Some(token) = output_clone.try_pop() {
results.push(token.data);
} else {
thread::sleep(Duration::from_millis(10));
}
}
println!("\n Results: {:?}", results);
println!(" Total processed: {}", results.len());
});
producer.join().unwrap();
for worker in workers {
worker.join().unwrap();
}
output.stop();
consumer.join().unwrap();
println!("\n ✓ Parallel pipeline complete");
}