pub(super) mod cuda;
mod memcpy;
mod nixl;
use super::strategy::select_strategy;
use super::validation::validate_block_transfer;
use super::{PhysicalLayout, TransferContext, TransferOptions, TransferPlan, TransferStrategy};
use crate::block_manager::v2::physical::transfer::{
StorageKind, context::TransferCompleteNotification,
};
use anyhow::Result;
use std::ops::Range;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::Mutex;
pub use nixl::NixlTransferBuilder;
pub fn execute_transfer(
src: &PhysicalLayout,
dst: &PhysicalLayout,
src_block_ids: &[usize],
dst_block_ids: &[usize],
options: TransferOptions,
ctx: &TransferContext,
) -> Result<TransferCompleteNotification> {
validate_block_transfer(src_block_ids, dst_block_ids, None, src, dst, None)?;
let plan = select_strategy(src, dst, ctx)?;
match plan {
TransferPlan::Direct(strategy) => execute_direct_transfer(
src,
dst,
src_block_ids,
dst_block_ids,
options.layer_range,
strategy,
ctx,
),
TransferPlan::TwoHop {
first,
bounce_location,
second,
} => execute_two_hop_transfer(TwoHopTransferParams {
src,
dst,
src_block_ids,
dst_block_ids,
first_strategy: first,
bounce_location,
second_strategy: second,
options,
ctx,
}),
}
}
fn execute_direct_transfer(
src: &PhysicalLayout,
dst: &PhysicalLayout,
src_block_ids: &[usize],
dst_block_ids: &[usize],
layer_range: Option<Range<usize>>,
strategy: TransferStrategy,
ctx: &TransferContext,
) -> Result<TransferCompleteNotification> {
match strategy {
TransferStrategy::Memcpy => {
memcpy::execute_memcpy_transfer(src, dst, src_block_ids, dst_block_ids, layer_range)
}
TransferStrategy::CudaAsyncH2D
| TransferStrategy::CudaAsyncD2H
| TransferStrategy::CudaAsyncD2D
| TransferStrategy::CudaBlockingH2D
| TransferStrategy::CudaBlockingD2H => Ok(cuda::execute_cuda_transfer(
src,
dst,
src_block_ids,
dst_block_ids,
layer_range,
strategy,
ctx,
)?),
TransferStrategy::NixlRead
| TransferStrategy::NixlWrite
| TransferStrategy::NixlReadFlipped
| TransferStrategy::NixlWriteFlipped => {
let mut builder = NixlTransferBuilder::new()
.src(src)
.dst(dst)
.src_blocks(src_block_ids)
.dst_blocks(dst_block_ids)
.strategy(strategy);
if let Some(range) = layer_range {
builder = builder.layer_range(range);
}
builder.execute(ctx)
}
TransferStrategy::Invalid => Err(anyhow::anyhow!(
"Invalid transfer strategy for src={:?}, dst={:?}",
src.location(),
dst.location()
)),
}
}
#[allow(clippy::too_many_arguments)]
async fn execute_two_hop_transfer_chunk(
src: &PhysicalLayout,
bounce_layout: &PhysicalLayout,
dst: &PhysicalLayout,
src_block_ids: &[usize],
bounce_block_ids: &[usize],
dst_block_ids: &[usize],
first_strategy: TransferStrategy,
second_strategy: TransferStrategy,
layer_range: &Option<Range<usize>>,
ctx: &TransferContext,
) -> Result<()> {
let bounce_ids_to_use = &bounce_block_ids[..src_block_ids.len()];
execute_direct_transfer(
src,
bounce_layout,
src_block_ids,
bounce_ids_to_use,
layer_range.clone(),
first_strategy,
ctx,
)?
.await?;
execute_direct_transfer(
bounce_layout,
dst,
bounce_ids_to_use,
dst_block_ids,
layer_range.clone(),
second_strategy,
ctx,
)?
.await?;
Ok(())
}
struct TwoHopTransferParams<'a> {
src: &'a PhysicalLayout,
dst: &'a PhysicalLayout,
src_block_ids: &'a [usize],
dst_block_ids: &'a [usize],
first_strategy: TransferStrategy,
bounce_location: StorageKind,
second_strategy: TransferStrategy,
options: TransferOptions,
ctx: &'a TransferContext,
}
#[allow(clippy::too_many_arguments)]
async fn handle_buffered_transfer(
src: &PhysicalLayout,
bounce_layout: &PhysicalLayout,
dst: &PhysicalLayout,
src_block_ids: &[usize],
bounce_block_ids: &[usize],
dst_block_ids: &[usize],
first_strategy: TransferStrategy,
second_strategy: TransferStrategy,
layer_range: &Option<Range<usize>>,
ctx: &TransferContext,
) -> Result<()> {
let bounce_groups =
&bounce_block_ids[0..std::cmp::min(src_block_ids.len(), bounce_block_ids.len())];
let (bounce_group_0, bounce_group_1) = bounce_groups.split_at(bounce_groups.len() / 2);
let bounce_group_0 = bounce_group_0.to_vec();
let bounce_group_1 = bounce_group_1.to_vec();
let src_dst_iter = Arc::new(Mutex::new(src_block_ids.iter().zip(dst_block_ids.iter())));
let transfer_task = async move |bounce_group: &[usize]| -> Result<()> {
loop {
let (src_ids, dst_ids): (Vec<usize>, Vec<usize>);
{
let mut x = src_dst_iter.lock().await;
(src_ids, dst_ids) = x.by_ref().take(bounce_group.len()).unzip();
if src_ids.is_empty() {
break;
}
}
execute_two_hop_transfer_chunk(
src,
bounce_layout,
dst,
&src_ids,
&bounce_group[0..src_ids.len()],
&dst_ids,
first_strategy,
second_strategy,
layer_range,
ctx,
)
.await?;
}
Ok(())
};
let transfer_0 = transfer_task(&bounce_group_0);
let transfer_1 = transfer_task(&bounce_group_1);
futures::future::try_join(transfer_0, transfer_1).await?;
Ok(())
}
fn execute_two_hop_transfer(params: TwoHopTransferParams) -> Result<TransferCompleteNotification> {
let TwoHopTransferParams {
src,
dst,
src_block_ids,
dst_block_ids,
first_strategy,
bounce_location,
second_strategy,
options,
ctx,
} = params;
let (tx, rx) = tokio::sync::oneshot::channel();
let src_clone = src.clone();
let dst_clone = dst.clone();
let src_block_ids = src_block_ids.to_vec();
let dst_block_ids = dst_block_ids.to_vec();
let options_clone = options.clone();
let handle = ctx.tokio();
let ctx_clone = ctx.clone();
handle.spawn(async move {
let Some(ref bounce_buffer_spec) = options_clone.bounce_buffer else {
tx.send(Err(anyhow::anyhow!(
"Two-hop transfers require a bounce buffer."
)))
.unwrap();
return;
};
if bounce_buffer_spec.layout().location() != bounce_location {
tx.send(Err(anyhow::anyhow!(
"Bounce buffer layout does not match bounce location."
)))
.unwrap();
return;
}
if bounce_buffer_spec.block_ids().is_empty() {
tx.send(Err(anyhow::anyhow!(
"Bounce buffer must have at least one block."
)))
.unwrap();
return;
}
let num_bounce_blocks = bounce_buffer_spec.block_ids().len();
if num_bounce_blocks == 1 {
let bounce_block = bounce_buffer_spec.block_ids()[0];
for (src_block_id, dst_block_id) in src_block_ids.iter().zip(dst_block_ids.iter()) {
if let Err(e) = execute_two_hop_transfer_chunk(
&src_clone,
bounce_buffer_spec.layout(),
&dst_clone,
&[*src_block_id],
&[bounce_block],
&[*dst_block_id],
first_strategy,
second_strategy,
&options_clone.layer_range,
&ctx_clone,
)
.await
{
let _ = tx.send(Err(e));
return;
}
}
tx.send(Ok(())).unwrap();
} else {
if let Err(e) = handle_buffered_transfer(
&src_clone,
bounce_buffer_spec.layout(),
&dst_clone,
&src_block_ids,
bounce_buffer_spec.block_ids(),
&dst_block_ids,
first_strategy,
second_strategy,
&options_clone.layer_range,
&ctx_clone,
)
.await
{
let _ = tx.send(Err(e));
return;
}
let _ = tx.send(Ok(()));
}
});
Ok(TransferCompleteNotification { status: rx })
}
pub struct TransferNotification {
status: Arc<AtomicBool>,
}
impl Default for TransferNotification {
fn default() -> Self {
Self::new()
}
}
impl TransferNotification {
pub fn new() -> Self {
Self {
status: Arc::new(AtomicBool::new(false)),
}
}
pub fn done() -> Self {
Self {
status: Arc::new(AtomicBool::new(true)),
}
}
pub fn is_complete(&self) -> bool {
self.status.load(Ordering::Relaxed)
}
}