use std::collections::VecDeque;
use std::fmt;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum PipelineStage {
HostToDevice,
Compute,
DeviceToHost,
}
#[derive(Debug, Clone)]
pub struct PipelineOp {
pub id: usize,
pub stage: PipelineStage,
pub data_bytes: usize,
pub estimated_duration: Duration,
pub start_time: Option<Instant>,
pub end_time: Option<Instant>,
pub stream_id: usize,
}
pub struct AsyncPipeline {
num_streams: usize,
pipeline_depth: usize,
stream_queues: Vec<VecDeque<PipelineOp>>,
completed: Vec<PipelineOp>,
next_id: usize,
batch_count: usize,
h2d_bandwidth: f64,
d2h_bandwidth: f64,
compute_throughput: f64,
}
impl AsyncPipeline {
pub fn new(num_streams: usize) -> Self {
Self {
num_streams,
pipeline_depth: num_streams,
stream_queues: (0..num_streams).map(|_| VecDeque::new()).collect(),
completed: Vec::new(),
next_id: 0,
batch_count: 0,
h2d_bandwidth: 12e9, d2h_bandwidth: 12e9,
compute_throughput: 20e12, }
}
pub fn with_bandwidth(mut self, h2d: f64, d2h: f64, compute: f64) -> Self {
self.h2d_bandwidth = h2d;
self.d2h_bandwidth = d2h;
self.compute_throughput = compute;
self
}
pub fn enqueue_batch(&mut self, data_bytes: usize, compute_flops: u64) -> PipelineBatch {
let stream_id = self.batch_count % self.num_streams;
self.batch_count += 1;
let h2d = PipelineOp {
id: self.next_id,
stage: PipelineStage::HostToDevice,
data_bytes,
estimated_duration: Duration::from_secs_f64(data_bytes as f64 / self.h2d_bandwidth),
start_time: None,
end_time: None,
stream_id,
};
self.next_id += 1;
let compute = PipelineOp {
id: self.next_id,
stage: PipelineStage::Compute,
data_bytes: 0,
estimated_duration: Duration::from_secs_f64(compute_flops as f64 / self.compute_throughput),
start_time: None,
end_time: None,
stream_id,
};
self.next_id += 1;
let d2h = PipelineOp {
id: self.next_id,
stage: PipelineStage::DeviceToHost,
data_bytes,
estimated_duration: Duration::from_secs_f64(data_bytes as f64 / self.d2h_bandwidth),
start_time: None,
end_time: None,
stream_id,
};
self.next_id += 1;
let batch = PipelineBatch {
h2d_id: h2d.id,
compute_id: compute.id,
d2h_id: d2h.id,
stream_id,
total_estimated: h2d.estimated_duration + compute.estimated_duration + d2h.estimated_duration,
};
self.stream_queues[stream_id].push_back(h2d);
self.stream_queues[stream_id].push_back(compute);
self.stream_queues[stream_id].push_back(d2h);
batch
}
pub fn simulate(&mut self) -> PipelineTimeline {
let start = Instant::now();
let mut events = Vec::new();
let mut stream_end_times = vec![Duration::ZERO; self.num_streams];
loop {
let mut any_progress = false;
for stream_id in 0..self.num_streams {
if let Some(mut op) = self.stream_queues[stream_id].pop_front() {
any_progress = true;
let op_start = stream_end_times[stream_id];
let op_end = op_start + op.estimated_duration;
stream_end_times[stream_id] = op_end;
op.start_time = Some(start + op_start);
op.end_time = Some(start + op_end);
events.push(PipelineEvent {
op_id: op.id,
stage: op.stage,
stream_id: op.stream_id,
start_offset: op_start,
end_offset: op_end,
data_bytes: op.data_bytes,
});
self.completed.push(op);
}
}
if !any_progress {
break;
}
}
let total_time = stream_end_times.iter().cloned().max().unwrap_or(Duration::ZERO);
let sequential_time: Duration = events.iter()
.map(|e| e.end_offset - e.start_offset)
.sum();
PipelineTimeline {
events,
total_time,
sequential_time,
speedup: if total_time.as_secs_f64() > 0.0 {
sequential_time.as_secs_f64() / total_time.as_secs_f64()
} else {
1.0
},
num_streams: self.num_streams,
}
}
pub fn utilization(&self) -> PipelineUtilization {
let total_ops: usize = self.stream_queues.iter().map(|q| q.len()).sum();
let active_streams = self.stream_queues.iter().filter(|q| !q.is_empty()).count();
PipelineUtilization {
total_pending_ops: total_ops,
active_streams,
total_streams: self.num_streams,
pipeline_depth: self.pipeline_depth,
utilization: if self.num_streams > 0 {
active_streams as f64 / self.num_streams as f64
} else {
0.0
},
}
}
}
#[derive(Debug, Clone)]
pub struct PipelineBatch {
pub h2d_id: usize,
pub compute_id: usize,
pub d2h_id: usize,
pub stream_id: usize,
pub total_estimated: Duration,
}
#[derive(Debug, Clone)]
pub struct PipelineEvent {
pub op_id: usize,
pub stage: PipelineStage,
pub stream_id: usize,
pub start_offset: Duration,
pub end_offset: Duration,
pub data_bytes: usize,
}
#[derive(Debug)]
pub struct PipelineTimeline {
pub events: Vec<PipelineEvent>,
pub total_time: Duration,
pub sequential_time: Duration,
pub speedup: f64,
pub num_streams: usize,
}
impl fmt::Display for PipelineTimeline {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Pipeline: {:.2}ms total (sequential: {:.2}ms), {:.2}x speedup, {} streams",
self.total_time.as_secs_f64() * 1000.0,
self.sequential_time.as_secs_f64() * 1000.0,
self.speedup,
self.num_streams)
}
}
#[derive(Debug)]
pub struct PipelineUtilization {
pub total_pending_ops: usize,
pub active_streams: usize,
pub total_streams: usize,
pub pipeline_depth: usize,
pub utilization: f64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipeline_basic() {
let mut pipeline = AsyncPipeline::new(3);
let batch = pipeline.enqueue_batch(1024 * 1024, 1_000_000);
assert_eq!(batch.stream_id, 0);
assert!(batch.total_estimated > Duration::ZERO);
}
#[test]
fn test_pipeline_multi_stream() {
let mut pipeline = AsyncPipeline::new(3);
let b0 = pipeline.enqueue_batch(1_000_000, 1_000_000);
let b1 = pipeline.enqueue_batch(1_000_000, 1_000_000);
let b2 = pipeline.enqueue_batch(1_000_000, 1_000_000);
assert_eq!(b0.stream_id, 0);
assert_eq!(b1.stream_id, 1);
assert_eq!(b2.stream_id, 2);
}
#[test]
fn test_pipeline_simulate() {
let mut pipeline = AsyncPipeline::new(2);
pipeline.enqueue_batch(1_000_000, 1_000_000_000);
pipeline.enqueue_batch(1_000_000, 1_000_000_000);
let timeline = pipeline.simulate();
assert!(!timeline.events.is_empty());
assert!(timeline.total_time > Duration::ZERO);
assert!(timeline.speedup >= 1.0);
}
#[test]
fn test_pipeline_speedup() {
let mut pipeline = AsyncPipeline::new(3)
.with_bandwidth(10e9, 10e9, 10e12);
for _ in 0..6 {
pipeline.enqueue_batch(10_000_000, 100_000_000_000);
}
let timeline = pipeline.simulate();
assert!(timeline.speedup >= 1.0,
"Expected speedup >= 1.0, got {}", timeline.speedup);
}
#[test]
fn test_pipeline_utilization() {
let mut pipeline = AsyncPipeline::new(4);
pipeline.enqueue_batch(1024, 1000);
pipeline.enqueue_batch(1024, 1000);
let util = pipeline.utilization();
assert_eq!(util.total_streams, 4);
assert_eq!(util.active_streams, 2);
assert!(util.utilization > 0.0 && util.utilization <= 1.0);
}
#[test]
fn test_pipeline_empty() {
let mut pipeline = AsyncPipeline::new(2);
let timeline = pipeline.simulate();
assert!(timeline.events.is_empty());
assert_eq!(timeline.total_time, Duration::ZERO);
}
#[test]
fn test_pipeline_single_stream() {
let mut pipeline = AsyncPipeline::new(1);
pipeline.enqueue_batch(1024, 1000);
pipeline.enqueue_batch(1024, 1000);
let timeline = pipeline.simulate();
assert!(timeline.speedup <= 1.01);
}
#[test]
fn test_pipeline_display() {
let mut pipeline = AsyncPipeline::new(2);
pipeline.enqueue_batch(1_000_000, 1_000_000);
let timeline = pipeline.simulate();
let s = format!("{}", timeline);
assert!(s.contains("Pipeline:"));
assert!(s.contains("speedup"));
}
}