Module pipeline

Source
Expand description

This module exposes pipelining utilities for multi-stage asynchronous data copies with latency hiding. We call producers all threads that call producer_acquire and producer_commit, and consumers threads that call consumer_wait and consumer_release.

§Example

In this example, threads play both the role of producer and consumer

#[cube(launch)]
/// Calculate the sum of an array, using pipelining
fn pipelined_sum<F: Float>(
    input: &Array<Line<F>>,
    output: &mut Array<Line<F>>,
    #[comptime] batch_len: u32,
) {
    let smem_size = 2 * batch_len;
    let num_batches = input.len() / batch_len;
    let mut shared_memory = SharedMemory::<F>::new_lined(smem_size, input.line_size());
    let pipeline = Pipeline::new();

    let mut sum = Line::<F>::empty(input.line_size()).fill(F::new(0.));

    // Copy the first batch to shared memory
    pipeline.producer_acquire();
    pipeline.memcpy_async(
        input.slice(0, batch_len),
        shared_memory.slice_mut(0, batch_len),
    );
    pipeline.producer_commit();

    for input_batch in 1..num_batches {
        // Copy and compute index always alternate
        let copy_index = input_batch % 2;
        let compute_index = (input_batch + 1) % 2;

        // Copy the next batch to shared memory
        pipeline.producer_acquire();
        pipeline.memcpy_async(
            input.slice(batch_len * input_batch, batch_len * (input_batch + 1)),
            shared_memory.slice_mut(batch_len * copy_index, batch_len * (copy_index + 1)),
        );
        pipeline.producer_commit();

        // Compute the batch that is ready
        pipeline.consumer_wait();
        let compute_slice =
            shared_memory.slice(batch_len * compute_index, batch_len * (compute_index + 1));
        for i in 0..batch_len {
            sum += compute_slice[i];
        }
        pipeline.consumer_release();
    }

    // Compute the last batch
    pipeline.consumer_wait();
    let compute_slice = shared_memory.slice(
        batch_len * ((num_batches + 1) % 2),
        batch_len * ((num_batches + 1) % 2 + 1),
    );
    for i in 0..batch_len {
        sum += compute_slice[i];
    }
    pipeline.consumer_release();

    output[0] = sum;
}

Structs§

Pipeline
A mechanism for managing a sequence of memcpy_async For now, it only works at the Cube scope
PipelineExpand
Expand type of Pipeline