taskflowrs 0.1.0

A Rust implementation of TaskFlow — task-parallel programming with heterogeneous GPU support
Documentation
# Testing Pipeline Support

This guide covers testing strategies for pipeline functionality in TaskFlow-RS.

## Unit Tests

Run built-in pipeline tests:

```bash
cargo test pipeline
```

### Basic Functionality Tests

```rust
use taskflow_rs::{ConcurrentPipeline, Token};

#[test]
fn test_push_pop() {
    let pipeline = ConcurrentPipeline::new(10, 100);
    
    pipeline.push(42).unwrap();
    assert_eq!(pipeline.tokens_in_flight(), 1);
    
    let token = pipeline.try_pop().unwrap();
    assert_eq!(token.data, 42);
    assert_eq!(pipeline.tokens_in_flight(), 0);
}

#[test]
fn test_token_indexing() {
    let pipeline = ConcurrentPipeline::new(10, 100);
    
    pipeline.push(1).unwrap();
    pipeline.push(2).unwrap();
    
    let t1 = pipeline.try_pop().unwrap();
    let t2 = pipeline.try_pop().unwrap();
    
    assert_eq!(t1.index, 0);
    assert_eq!(t2.index, 1);
}

#[test]
fn test_backpressure() {
    let pipeline = ConcurrentPipeline::new(2, 10);
    
    assert!(pipeline.push(1).is_ok());
    assert!(pipeline.push(2).is_ok());
    
    // Buffer full
    assert!(pipeline.push(3).is_err());
    
    // Make space
    pipeline.try_pop();
    
    // Should work now
    assert!(pipeline.push(3).is_ok());
}

#[test]
fn test_stop_signal() {
    let pipeline = ConcurrentPipeline::new(10, 100);
    
    assert!(!pipeline.is_stopped());
    pipeline.stop();
    assert!(pipeline.is_stopped());
}
```

## Integration Tests

### Producer-Consumer Test

```rust
use std::thread;
use std::time::Duration;

#[test]
fn test_producer_consumer() {
    let pipeline = ConcurrentPipeline::new(10, 100);
    let num_items = 100;
    
    let p = pipeline.clone();
    let producer = thread::spawn(move || {
        for i in 0..num_items {
            p.push(i).unwrap();
        }
        p.stop();
    });
    
    let c = pipeline.clone();
    let consumer = thread::spawn(move || {
        let mut received = 0;
        while !c.is_stopped() || c.tokens_in_flight() > 0 {
            if let Some(_) = c.try_pop() {
                received += 1;
            }
        }
        received
    });
    
    producer.join().unwrap();
    let received = consumer.join().unwrap();
    
    assert_eq!(received, num_items);
}
```

### Multi-Stage Pipeline Test

```rust
#[test]
fn test_multi_stage() {
    let stage1_out = ConcurrentPipeline::new(10, 100);
    let stage2_out = ConcurrentPipeline::new(10, 100);
    
    // Stage 1: Double the value
    let s1_in = stage1_out.clone();
    let s1_out = stage1_out.clone();
    let stage1 = thread::spawn(move || {
        for i in 0..10 {
            s1_in.push(i).unwrap();
        }
        s1_in.stop();
    });
    
    // Stage 2: Square the value
    let s2_in = stage1_out.clone();
    let s2_out = stage2_out.clone();
    let stage2 = thread::spawn(move || {
        while !s2_in.is_stopped() || s2_in.tokens_in_flight() > 0 {
            if let Some(token) = s2_in.try_pop() {
                let doubled = token.data * 2;
                s2_out.push(doubled * doubled).ok();
            }
        }
        s2_out.stop();
    });
    
    // Verify
    let mut results = vec![];
    while !stage2_out.is_stopped() || stage2_out.tokens_in_flight() > 0 {
        if let Some(token) = stage2_out.try_pop() {
            results.push(token.data);
        }
    }
    
    stage1.join().unwrap();
    stage2.join().unwrap();
    
    // 0→0, 1→4, 2→16, 3→36, etc.
    assert_eq!(results[0], 0);
    assert_eq!(results[1], 4);
    assert_eq!(results[2], 16);
}
```

## Running Examples

### Basic Pipeline

```bash
cargo run --example pipeline_basic
```

Expected output:
```
=== Pipeline Stream Processing Demo ===

1. Basic Pipeline
   Simple data flow with tokens

   Pushed: 0, Tokens in flight: 1
   Pushed: 10, Tokens in flight: 2
   ...
   Popped token #0: data = 0
   Popped token #1: data = 10
   ✓ Pipeline processed all tokens
```

### Advanced Pipeline

```bash
cargo run --example pipeline_advanced
```

Expected output shows multi-stage processing with parallel workers.

### Benchmarks

```bash
cargo run --release --example pipeline_benchmark
```

Expected output:
```
=== Pipeline Benchmarks ===

1. Throughput Benchmark
   Processed: 10000 items
   Time: 123ms
   Throughput: 81300.81 items/sec

2. Parallel Scaling Benchmark
   1 worker(s): 9615.38 items/sec
   2 worker(s): 18518.52 items/sec
   4 worker(s): 35714.29 items/sec
   8 worker(s): 50000.00 items/sec
```

## Performance Testing

### Throughput Test

Measure items/second:

```rust
let num_items = 100_000;
let start = Instant::now();

// ... run pipeline ...

let elapsed = start.elapsed();
let throughput = num_items as f64 / elapsed.as_secs_f64();
println!("Throughput: {:.2} items/sec", throughput);
```

### Latency Test

Measure end-to-end latency:

```rust
use std::time::Instant;

let pipeline = ConcurrentPipeline::new(10, 100);

let start = Instant::now();
pipeline.push(42).unwrap();
let token = pipeline.try_pop().unwrap();
let latency = start.elapsed();

println!("Latency: {:?}", latency);
```

### Memory Test

Monitor memory usage:

```bash
# Linux
/usr/bin/time -v cargo run --release --example pipeline_benchmark

# macOS
/usr/bin/time -l cargo run --release --example pipeline_benchmark
```

## Stress Testing

### High-Load Test

```rust
#[test]
#[ignore] // Run with: cargo test --ignored
fn stress_test_high_load() {
    let pipeline = ConcurrentPipeline::new(1000, 1_000_000);
    
    let num_producers = 8;
    let items_per_producer = 100_000;
    
    let mut producers = vec![];
    for _ in 0..num_producers {
        let p = pipeline.clone();
        producers.push(thread::spawn(move || {
            for i in 0..items_per_producer {
                p.push(i).unwrap();
            }
        }));
    }
    
    // ... verify all items processed ...
}
```

### Backpressure Stress

```rust
#[test]
fn stress_test_backpressure() {
    let pipeline = ConcurrentPipeline::new(5, 100);
    
    // Fast producer, slow consumer
    let p = pipeline.clone();
    let producer = thread::spawn(move || {
        let mut failures = 0;
        for i in 0..1000 {
            if p.push(i).is_err() {
                failures += 1;
            }
        }
        failures
    });
    
    thread::sleep(Duration::from_millis(100));
    
    let c = pipeline.clone();
    let consumer = thread::spawn(move || {
        while c.tokens_in_flight() > 0 {
            c.try_pop();
            thread::sleep(Duration::from_millis(1));
        }
    });
    
    let failures = producer.join().unwrap();
    consumer.join().unwrap();
    
    assert!(failures > 0, "Expected backpressure events");
}
```

## Debugging Tips

### Enable Logging

Add logging to track pipeline behavior:

```rust
if let Some(token) = pipeline.try_pop() {
    println!("[{}] Processed token #{}: {}", 
             thread::current().name().unwrap_or("?"),
             token.index, 
             token.data);
}
```

### Monitor Tokens

Track tokens in flight:

```rust
loop {
    let in_flight = pipeline.tokens_in_flight();
    if in_flight > threshold {
        println!("Warning: {} tokens in flight", in_flight);
    }
    thread::sleep(Duration::from_millis(100));
}
```

### Deadlock Detection

If pipeline hangs:

1. Check if all threads are joined
2. Verify stop() is called
3. Ensure no circular dependencies between stages
4. Check for missing try_pop() calls

## Common Issues

### Issue 1: Pipeline Hangs

**Symptom**: Consumer never finishes  
**Cause**: Forgot to call `stop()`  
**Fix**: Always call `pipeline.stop()` after last push

### Issue 2: Backpressure Errors

**Symptom**: `push()` returns `Err`  
**Cause**: Buffer full, consumer too slow  
**Fix**: Increase buffer size or add retry logic

### Issue 3: Token Loss

**Symptom**: Fewer items out than in  
**Cause**: Not waiting for tokens in flight  
**Fix**: Check `tokens_in_flight()` before exiting

```rust
// Wrong
while !pipeline.is_stopped() {
    pipeline.try_pop();
}

// Right
while !pipeline.is_stopped() || pipeline.tokens_in_flight() > 0 {
    pipeline.try_pop();
}
```

## Next Steps

After verifying basic pipeline functionality:

1. Test with real workloads
2. Benchmark against requirements
3. Tune buffer sizes
4. Add monitoring/metrics
5. Consider type-safe pipeline builder for complex flows