use std::sync::mpsc;
use re_log::debug_assert;
use crate::texture_info::Texture2DBufferInfo;
use crate::wgpu_resources::{BufferDesc, GpuBuffer, GpuBufferPool, GpuTexture};
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum CpuWriteGpuReadError {
#[error("Attempting to allocate an empty buffer.")]
ZeroSizeBufferAllocation,
#[error(
"Buffer is full, can't append more data! Buffer has a capacity for {buffer_capacity_elements} elements.
Tried to add {num_elements_attempted_to_add} elements, but only added {num_elements_actually_added}."
)]
BufferFull {
buffer_capacity_elements: usize,
num_elements_attempted_to_add: usize,
num_elements_actually_added: usize,
},
#[error(
"Target buffer has a size of {target_buffer_size}, can't write {copy_size} bytes with an offset of {destination_offset}!"
)]
TargetBufferTooSmall {
target_buffer_size: u64,
copy_size: u64,
destination_offset: u64,
},
#[error(
"Target texture doesn't fit the size of the written data to this buffer! Texture target buffer should be at most {max_copy_size} bytes, but the to be written data was {written_data_size} bytes."
)]
TargetTextureBufferSizeMismatch {
max_copy_size: u64,
written_data_size: usize,
},
}
pub struct CpuWriteGpuReadBuffer<T: bytemuck::Pod + Send + Sync> {
write_view: wgpu::BufferViewMut,
unwritten_element_range: std::ops::Range<usize>,
chunk_buffer: GpuBuffer,
byte_offset_in_chunk_buffer: wgpu::BufferAddress,
_type: std::marker::PhantomData<T>,
}
impl<T> CpuWriteGpuReadBuffer<T>
where
T: bytemuck::Pod + Send + Sync,
{
#[inline(always)]
fn write_only_slice(
&mut self,
range: impl std::ops::RangeBounds<usize>,
) -> wgpu::WriteOnly<'_, [u8]> {
let start = self.unwritten_element_range.start * std::mem::size_of::<T>();
let end = self.unwritten_element_range.end * std::mem::size_of::<T>();
self.write_view.slice(start..end).into_slice(range)
}
#[inline]
pub fn extend_from_slice(&mut self, elements: &[T]) -> Result<(), CpuWriteGpuReadError> {
if elements.is_empty() {
return Ok(());
}
re_tracing::profile_function_if!(10_000 < elements.len());
let remaining_capacity = self.remaining_capacity();
let (result, elements) = if elements.len() > remaining_capacity {
(
Err(CpuWriteGpuReadError::BufferFull {
buffer_capacity_elements: self.capacity(),
num_elements_attempted_to_add: elements.len(),
num_elements_actually_added: remaining_capacity,
}),
&elements[..remaining_capacity],
)
} else {
(Ok(()), elements)
};
let bytes = bytemuck::cast_slice(elements);
self.write_only_slice(..bytes.len()).copy_from_slice(bytes);
self.unwritten_element_range.start += elements.len();
result
}
#[inline]
pub fn extend(
&mut self,
mut elements: impl ExactSizeIterator<Item = T>,
) -> Result<usize, CpuWriteGpuReadError> {
re_tracing::profile_function!();
if true {
let vec: Vec<T> = elements.collect();
re_log::debug_assert_eq!(
vec.as_ptr() as usize % std::mem::align_of::<T>(),
0,
"Vec::collect collects into unaligned memory! Is this a bug in the allocator?"
);
self.extend_from_slice(vec.as_slice())?;
Ok(vec.len())
} else {
let num_written_before = self.num_written();
while let Some(element) = elements.next() {
if self.unwritten_element_range.start >= self.unwritten_element_range.end {
let num_elements_actually_added = self.num_written() - num_written_before;
return Err(CpuWriteGpuReadError::BufferFull {
buffer_capacity_elements: self.capacity(),
num_elements_attempted_to_add: num_elements_actually_added
+ elements.count()
+ 1,
num_elements_actually_added,
});
}
self.write_only_slice(..std::mem::size_of::<T>())
.copy_from_slice(bytemuck::bytes_of(&element));
self.unwritten_element_range.start += 1;
}
Ok(self.num_written() - num_written_before)
}
}
pub fn add_n(&mut self, element: T, num_elements: usize) -> Result<(), CpuWriteGpuReadError> {
if num_elements == 0 {
return Ok(());
}
re_tracing::profile_function_if!(10_000 < num_elements);
let remaining_capacity = self.remaining_capacity();
let (result, num_elements) = if num_elements > remaining_capacity {
(
Err(CpuWriteGpuReadError::BufferFull {
buffer_capacity_elements: self.capacity(),
num_elements_attempted_to_add: num_elements,
num_elements_actually_added: remaining_capacity,
}),
remaining_capacity,
)
} else {
(Ok(()), num_elements)
};
let mut offset = 0;
let element_bytes = bytemuck::bytes_of(&element);
let mut write_slice = self.write_only_slice(..num_elements * std::mem::size_of::<T>());
for _ in 0..num_elements {
let end = offset + std::mem::size_of::<T>();
write_slice
.slice(offset..end)
.copy_from_slice(element_bytes);
offset = end;
}
self.unwritten_element_range.start += num_elements;
result
}
#[inline]
pub fn push(&mut self, element: T) -> Result<(), CpuWriteGpuReadError> {
if self.remaining_capacity() == 0 {
return Err(CpuWriteGpuReadError::BufferFull {
buffer_capacity_elements: self.capacity(),
num_elements_attempted_to_add: 1,
num_elements_actually_added: 0,
});
}
self.write_only_slice(..std::mem::size_of::<T>())
.copy_from_slice(bytemuck::bytes_of(&element));
self.unwritten_element_range.start += 1;
Ok(())
}
#[inline]
pub fn is_empty(&self) -> bool {
self.unwritten_element_range.start == 0
}
#[inline]
pub fn num_written(&self) -> usize {
self.unwritten_element_range.start
}
#[inline]
pub fn remaining_capacity(&self) -> usize {
self.unwritten_element_range.end - self.unwritten_element_range.start
}
pub fn capacity(&self) -> usize {
self.unwritten_element_range.end
}
pub fn copy_to_texture2d_entire_first_layer(
self,
encoder: &mut wgpu::CommandEncoder,
destination: &GpuTexture,
) -> Result<(), CpuWriteGpuReadError> {
self.copy_to_texture2d(
encoder,
wgpu::TexelCopyTextureInfo {
texture: &destination.texture,
mip_level: 0,
origin: wgpu::Origin3d::ZERO,
aspect: wgpu::TextureAspect::All,
},
destination.texture.size(),
)
}
pub fn copy_to_texture2d(
self,
encoder: &mut wgpu::CommandEncoder,
destination: wgpu::TexelCopyTextureInfo<'_>,
copy_size: wgpu::Extent3d,
) -> Result<(), CpuWriteGpuReadError> {
let buffer_info = Texture2DBufferInfo::new(destination.texture.format(), copy_size);
if (buffer_info.buffer_size_padded as usize) < self.num_written() * std::mem::size_of::<T>()
{
return Err(CpuWriteGpuReadError::TargetTextureBufferSizeMismatch {
max_copy_size: buffer_info.buffer_size_padded,
written_data_size: self.num_written() * std::mem::size_of::<T>(),
});
}
encoder.copy_buffer_to_texture(
wgpu::TexelCopyBufferInfo {
buffer: &self.chunk_buffer,
layout: wgpu::TexelCopyBufferLayout {
offset: self.byte_offset_in_chunk_buffer,
bytes_per_row: Some(buffer_info.bytes_per_row_padded),
rows_per_image: None,
},
},
destination,
copy_size,
);
Ok(())
}
pub fn copy_to_buffer(
self,
encoder: &mut wgpu::CommandEncoder,
destination: &GpuBuffer,
destination_offset: wgpu::BufferAddress,
) -> Result<(), CpuWriteGpuReadError> {
let copy_size = (std::mem::size_of::<T>() * self.unwritten_element_range.start) as u64;
if copy_size > destination_offset + destination.size() {
return Err(CpuWriteGpuReadError::TargetBufferTooSmall {
target_buffer_size: destination.size(),
copy_size,
destination_offset,
});
}
encoder.copy_buffer_to_buffer(
&self.chunk_buffer,
self.byte_offset_in_chunk_buffer,
destination,
destination_offset,
copy_size,
);
Ok(())
}
}
struct Chunk {
buffer: GpuBuffer,
unused_offset: wgpu::BufferAddress,
}
impl Chunk {
fn remaining_capacity(&self) -> u64 {
self.buffer.size() - self.unused_offset
}
fn allocate<T: bytemuck::Pod + Send + Sync>(
&mut self,
num_elements: usize,
size_in_bytes: u64,
) -> CpuWriteGpuReadBuffer<T> {
debug_assert!(num_elements * std::mem::size_of::<T>() <= size_in_bytes as usize);
let byte_offset_in_chunk_buffer = self.unused_offset;
let end_offset = byte_offset_in_chunk_buffer + size_in_bytes;
debug_assert!(
byte_offset_in_chunk_buffer.is_multiple_of(CpuWriteGpuReadBelt::MIN_OFFSET_ALIGNMENT)
);
debug_assert!(end_offset <= self.buffer.size());
let buffer_slice = self.buffer.slice(byte_offset_in_chunk_buffer..end_offset);
let write_view = buffer_slice.get_mapped_range_mut();
self.unused_offset = end_offset;
CpuWriteGpuReadBuffer {
chunk_buffer: self.buffer.clone(),
byte_offset_in_chunk_buffer,
write_view,
unwritten_element_range: 0..num_elements,
_type: std::marker::PhantomData,
}
}
}
pub struct CpuWriteGpuReadBelt {
chunk_size: u64,
active_chunks: Vec<Chunk>,
closed_chunks: Vec<Chunk>,
free_chunks: Vec<Chunk>,
sender: mpsc::Sender<Chunk>,
receiver: mpsc::Receiver<Chunk>,
current_frame_allocated_bytes: u64,
smoothed_peak_bytes: f64,
}
impl CpuWriteGpuReadBelt {
pub const MIN_OFFSET_ALIGNMENT: u64 = 16;
const PEAK_DECAY: f64 = 0.95;
const HEADROOM_FACTOR: f64 = 1.5;
pub fn new(chunk_size: wgpu::BufferSize) -> Self {
static_assertions::const_assert!(
wgpu::MAP_ALIGNMENT <= CpuWriteGpuReadBelt::MIN_OFFSET_ALIGNMENT
);
static_assertions::const_assert!(
wgpu::COPY_BUFFER_ALIGNMENT <= CpuWriteGpuReadBelt::MIN_OFFSET_ALIGNMENT
);
debug_assert!(
wgpu::TextureFormat::Rgba32Uint
.block_copy_size(None)
.unwrap() as u64
<= Self::MIN_OFFSET_ALIGNMENT
);
#[expect(clippy::disallowed_methods)]
let (sender, receiver) = mpsc::channel();
Self {
chunk_size: wgpu::util::align_to(chunk_size.get(), Self::MIN_OFFSET_ALIGNMENT),
active_chunks: Vec::new(),
closed_chunks: Vec::new(),
free_chunks: Vec::new(),
sender,
receiver,
current_frame_allocated_bytes: 0,
smoothed_peak_bytes: 0.0,
}
}
pub fn allocate<T: bytemuck::Pod + Send + Sync>(
&mut self,
device: &wgpu::Device,
buffer_pool: &GpuBufferPool,
num_elements: usize,
) -> Result<CpuWriteGpuReadBuffer<T>, CpuWriteGpuReadError> {
if num_elements == 0 {
return Err(CpuWriteGpuReadError::ZeroSizeBufferAllocation);
}
re_tracing::profile_function!();
debug_assert!(num_elements > 0, "Cannot allocate zero-sized buffer");
let size = wgpu::util::align_to(
(std::mem::size_of::<T>() * num_elements) as wgpu::BufferAddress,
Self::MIN_OFFSET_ALIGNMENT,
);
let mut chunk = if let Some(index) = self
.active_chunks
.iter_mut()
.position(|chunk| chunk.remaining_capacity() >= size)
{
self.active_chunks.swap_remove(index)
} else {
self.receive_chunks();
if let Some(index) = self
.free_chunks
.iter()
.position(|chunk| chunk.remaining_capacity() >= size)
{
self.free_chunks.swap_remove(index)
} else {
let buffer_size =
wgpu::util::align_to(self.chunk_size.max(size), Self::MIN_OFFSET_ALIGNMENT);
re_log::trace!(
"Allocating new CpuWriteGpuReadBelt chunk of size {:.1} MiB",
buffer_size as f32 / (1024.0 * 1024.0)
);
let buffer = buffer_pool.alloc(
device,
&BufferDesc {
label: "CpuWriteGpuReadBelt chunk buffer".into(),
size: buffer_size,
usage: wgpu::BufferUsages::MAP_WRITE | wgpu::BufferUsages::COPY_SRC,
mapped_at_creation: true,
},
);
Chunk {
buffer,
unused_offset: 0,
}
}
};
self.current_frame_allocated_bytes += size;
let cpu_buffer_view = chunk.allocate(num_elements, size);
self.active_chunks.push(chunk);
Ok(cpu_buffer_view)
}
pub fn before_queue_submit(&mut self) {
re_tracing::profile_function!();
for chunk in self.active_chunks.drain(..) {
chunk.buffer.unmap();
self.closed_chunks.push(chunk);
}
}
pub fn after_queue_submit(&mut self) {
re_tracing::profile_function!();
self.smoothed_peak_bytes = f64::max(
self.smoothed_peak_bytes * Self::PEAK_DECAY,
self.current_frame_allocated_bytes as f64,
);
self.current_frame_allocated_bytes = 0;
self.receive_chunks();
self.shrink_free_chunks();
let sender = &self.sender;
for chunk in self.closed_chunks.drain(..) {
let sender = sender.clone();
chunk
.buffer
.clone()
.slice(..)
.map_async(wgpu::MapMode::Write, move |_| {
sender.send(chunk).ok();
});
}
}
fn receive_chunks(&mut self) {
while let Ok(mut chunk) = self.receiver.try_recv() {
chunk.unused_offset = 0;
self.free_chunks.push(chunk);
}
}
fn shrink_free_chunks(&mut self) {
let budget =
((self.smoothed_peak_bytes * Self::HEADROOM_FACTOR) as u64).max(self.chunk_size);
let total_bytes_in_free_chunks: u64 =
self.free_chunks.iter().map(|c| c.buffer.size()).sum();
if total_bytes_in_free_chunks <= budget {
return;
}
re_tracing::profile_function!();
self.free_chunks.sort_by_key(|c| c.buffer.size());
let excess = total_bytes_in_free_chunks - budget;
let num_to_drop = self
.free_chunks
.iter()
.scan(0u64, |cumulative, chunk| {
*cumulative += chunk.buffer.size();
Some(*cumulative)
})
.take_while(|cumulative| *cumulative <= excess)
.count()
+ 1; let num_to_drop = num_to_drop.min(self.free_chunks.len());
if num_to_drop != 0 {
re_log::trace!(
"CpuWriteGpuReadBelt: dropping {num_to_drop} free chunk(s), \
keeping {}, budget {:.1} MiB",
self.free_chunks.len() - num_to_drop,
budget as f64 / (1024.0 * 1024.0),
);
self.free_chunks.drain(..num_to_drop);
}
}
pub fn total_buffer_size_in_bytes(&self) -> u64 {
let sum = |chunks: &[Chunk]| -> u64 { chunks.iter().map(|c| c.buffer.size()).sum() };
sum(&self.active_chunks) + sum(&self.closed_chunks) + sum(&self.free_chunks)
}
}
impl std::fmt::Debug for CpuWriteGpuReadBelt {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CpuWriteGpuReadBelt")
.field("chunk_size", &self.chunk_size)
.field("active_chunks", &self.active_chunks.len())
.field("closed_chunks", &self.closed_chunks.len())
.field("free_chunks", &self.free_chunks.len())
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use crate::RenderContext;
const CHUNK_SIZE: u64 = RenderContext::CPU_WRITE_GPU_READ_BELT_DEFAULT_CHUNK_SIZE.get();
#[test]
fn belt_shrinks_after_usage_spike() {
let mut ctx = RenderContext::new_test();
ctx.execute_test_frame(|ctx| {
let mut belt = ctx.cpu_write_gpu_read_belt.lock();
for _ in 0..4 {
let _buf = belt
.allocate::<u8>(&ctx.device, &ctx.gpu_resources.buffers, CHUNK_SIZE as usize)
.unwrap();
}
[]
});
ctx.execute_test_frame(|_| []);
let size_after_spike = ctx
.cpu_write_gpu_read_belt
.lock()
.total_buffer_size_in_bytes();
assert!(
size_after_spike >= 4 * CHUNK_SIZE,
"Expected at least {} bytes after spike, got {size_after_spike}",
4 * CHUNK_SIZE,
);
for _ in 0..100 {
ctx.execute_test_frame(|ctx| {
let _buf = ctx
.cpu_write_gpu_read_belt
.lock()
.allocate::<u8>(&ctx.device, &ctx.gpu_resources.buffers, 256)
.unwrap();
[]
});
}
let size_after_decay = ctx
.cpu_write_gpu_read_belt
.lock()
.total_buffer_size_in_bytes();
assert!(
size_after_decay < size_after_spike,
"Expected belt to shrink after many small frames: \
before={size_after_spike}, after={size_after_decay}"
);
}
}