Expand description
This module exposes pipelining utilities for multi-stage asynchronous data copies with latency hiding. We call producers all threads that call producer_acquire and producer_commit, and consumers threads that call consumer_wait and consumer_release.
§Example
In this example, threads play both the role of producer and consumer
ⓘ
#[cube(launch)]
/// Calculate the sum of an array, using pipelining
fn pipelined_sum<F: Float>(
input: &Array<Line<F>>,
output: &mut Array<Line<F>>,
#[comptime] batch_len: u32,
) {
let smem_size = 2 * batch_len;
let num_batches = input.len() / batch_len;
let mut shared_memory = SharedMemory::<F>::new_lined(smem_size, input.line_size());
let pipeline = Pipeline::new();
let mut sum = Line::<F>::empty(input.line_size()).fill(F::new(0.));
// Copy the first batch to shared memory
pipeline.producer_acquire();
pipeline.memcpy_async(
input.slice(0, batch_len),
shared_memory.slice_mut(0, batch_len),
);
pipeline.producer_commit();
for input_batch in 1..num_batches {
// Copy and compute index always alternate
let copy_index = input_batch % 2;
let compute_index = (input_batch + 1) % 2;
// Copy the next batch to shared memory
pipeline.producer_acquire();
pipeline.memcpy_async(
input.slice(batch_len * input_batch, batch_len * (input_batch + 1)),
shared_memory.slice_mut(batch_len * copy_index, batch_len * (copy_index + 1)),
);
pipeline.producer_commit();
// Compute the batch that is ready
pipeline.consumer_wait();
let compute_slice =
shared_memory.slice(batch_len * compute_index, batch_len * (compute_index + 1));
for i in 0..batch_len {
sum += compute_slice[i];
}
pipeline.consumer_release();
}
// Compute the last batch
pipeline.consumer_wait();
let compute_slice = shared_memory.slice(
batch_len * ((num_batches + 1) % 2),
batch_len * ((num_batches + 1) % 2 + 1),
);
for i in 0..batch_len {
sum += compute_slice[i];
}
pipeline.consumer_release();
output[0] = sum;
}
Structs§
- Pipeline
- A mechanism for managing a sequence of
memcpy_async
For now, it only works at the Cube scope - Pipeline
Expand - Expand type of Pipeline