rafx_api/extra/
upload.rs

1use crate::{
2    RafxBuffer, RafxBufferDef, RafxCommandBuffer, RafxCommandBufferDef, RafxCommandPool,
3    RafxCommandPoolDef, RafxDeviceContext, RafxError, RafxFence, RafxFenceStatus, RafxMemoryUsage,
4    RafxQueue, RafxQueueType, RafxResourceType, RafxResult,
5};
6use crossbeam_channel::{Receiver, Sender};
7use std::ops::{Deref, DerefMut};
8
9// Based on UploadHeap in cauldron
10// (https://github.com/GPUOpen-LibrariesAndSDKs/Cauldron/blob/5acc12602c55e469cc1f9181967dbcb122f8e6c7/src/VK/base/UploadHeap.h)
11
12pub struct RafxUploadBuffer {
13    buffer: Option<RafxBuffer>,
14    upload_buffer_released_tx: Sender<RafxBuffer>,
15}
16
17impl Drop for RafxUploadBuffer {
18    fn drop(&mut self) {
19        // self.buffer is only allowed to be None after dropping
20        let buffer = self.buffer.take().unwrap();
21        // we assume RafxUploadBufferPool lifetime is greater than RafxUploadBuffer
22        self.upload_buffer_released_tx.send(buffer).unwrap();
23    }
24}
25
26impl RafxUploadBuffer {
27    fn buffer(&self) -> &RafxBuffer {
28        self.buffer.as_ref().unwrap()
29    }
30
31    fn buffer_mut(&mut self) -> &mut RafxBuffer {
32        self.buffer.as_mut().unwrap()
33    }
34}
35
36pub struct RafxUploadBufferPool {
37    _device_context: RafxDeviceContext,
38    buffer_count: u32,
39    buffer_size: u64,
40    unused_buffers: Vec<RafxBuffer>,
41    upload_buffer_released_tx: Sender<RafxBuffer>,
42    upload_buffer_released_rx: Receiver<RafxBuffer>,
43}
44
45impl Drop for RafxUploadBufferPool {
46    fn drop(&mut self) {
47        self.handle_dropped_buffers();
48        // If this trips a buffer was in use when this pool was dropped
49        assert_eq!(self.unused_buffers.len(), self.buffer_count as usize);
50    }
51}
52
53impl RafxUploadBufferPool {
54    pub fn new(
55        device_context: &RafxDeviceContext,
56        buffer_count: u32,
57        buffer_size: u64,
58    ) -> RafxResult<Self> {
59        let (upload_buffer_released_tx, upload_buffer_released_rx) = crossbeam_channel::unbounded();
60        let mut unused_buffers = Vec::with_capacity(buffer_count as usize);
61
62        for i in 0..buffer_count {
63            let buffer = device_context.create_buffer(&RafxBufferDef {
64                size: buffer_size,
65                memory_usage: RafxMemoryUsage::CpuToGpu,
66                queue_type: RafxQueueType::Transfer,
67                resource_type: RafxResourceType::BUFFER,
68                ..Default::default()
69            })?;
70            if device_context.device_info().debug_names_enabled {
71                buffer.set_debug_name(format!("RafxUploadBufferPool {}", i));
72            }
73
74            unused_buffers.push(buffer);
75        }
76
77        Ok(RafxUploadBufferPool {
78            _device_context: device_context.clone(),
79            buffer_count,
80            buffer_size,
81            unused_buffers,
82            upload_buffer_released_tx,
83            upload_buffer_released_rx,
84        })
85    }
86
87    fn take(
88        &mut self,
89        required_size_bytes: u64,
90    ) -> RafxResult<RafxUploadBuffer> {
91        if self.buffer_size < required_size_bytes {
92            return Err(format!(
93                "Buffer of size {} requested but the pool's buffers are only {} in size",
94                required_size_bytes, self.buffer_size
95            ))?;
96        }
97
98        // Move any release buffers back into the unused_buffers list
99        self.handle_dropped_buffers();
100
101        // Take a buffer, if one is available, return it. Otherwise return an error.
102        if let Some(buffer) = self.unused_buffers.pop() {
103            Ok(RafxUploadBuffer {
104                buffer: Some(buffer),
105                upload_buffer_released_tx: self.upload_buffer_released_tx.clone(),
106            })
107        } else {
108            Err("RafxUploadBufferPool has no more available buffers")?
109        }
110    }
111
112    // Move any release buffers back into the unused_buffers list
113    fn handle_dropped_buffers(&mut self) {
114        for buffer in self.upload_buffer_released_rx.try_iter() {
115            self.unused_buffers.push(buffer);
116        }
117    }
118}
119
120#[derive(Debug)]
121pub enum RafxUploadError {
122    BufferFull,
123    Other(RafxError),
124}
125
126impl RafxUploadError {
127    // Helpful for when types are not being inferred as expected
128    pub fn into_rafx_error(self) -> RafxError {
129        self.into()
130    }
131}
132
133impl core::fmt::Display for RafxUploadError {
134    fn fmt(
135        &self,
136        fmt: &mut core::fmt::Formatter,
137    ) -> core::fmt::Result {
138        match *self {
139            RafxUploadError::BufferFull => write!(fmt, "UploadBufferFull"),
140            RafxUploadError::Other(ref e) => e.fmt(fmt),
141        }
142    }
143}
144
145impl From<RafxError> for RafxUploadError {
146    fn from(error: RafxError) -> Self {
147        RafxUploadError::Other(error)
148    }
149}
150
151impl Into<RafxError> for RafxUploadError {
152    fn into(self) -> RafxError {
153        match self {
154            RafxUploadError::BufferFull => {
155                RafxError::StringError("Upload buffer is full".to_string())
156            }
157            RafxUploadError::Other(e) => e,
158        }
159    }
160}
161
162impl From<&str> for RafxUploadError {
163    fn from(str: &str) -> Self {
164        RafxError::StringError(str.to_string()).into()
165    }
166}
167
168impl From<String> for RafxUploadError {
169    fn from(string: String) -> Self {
170        RafxError::StringError(string).into()
171    }
172}
173
174#[derive(PartialEq)]
175pub enum RafxUploadState {
176    /// The upload is not submitted yet and data may be appended to it
177    Writable,
178
179    /// The buffer has been sent to the GPU and is no longer writable
180    SentToGpu,
181
182    /// The upload is finished and the resources may be used
183    Complete,
184}
185
186enum UploadBuffer {
187    Pooled(RafxUploadBuffer),
188    NonPooled(RafxBuffer),
189}
190
191impl Deref for UploadBuffer {
192    type Target = RafxBuffer;
193
194    fn deref(&self) -> &Self::Target {
195        self.buffer()
196    }
197}
198
199impl DerefMut for UploadBuffer {
200    fn deref_mut(&mut self) -> &mut Self::Target {
201        self.buffer_mut()
202    }
203}
204
205impl UploadBuffer {
206    fn buffer(&self) -> &RafxBuffer {
207        match self {
208            UploadBuffer::Pooled(buffer) => buffer.buffer(),
209            UploadBuffer::NonPooled(buffer) => buffer,
210        }
211    }
212
213    fn buffer_mut(&mut self) -> &mut RafxBuffer {
214        match self {
215            UploadBuffer::Pooled(buffer) => buffer.buffer_mut(),
216            UploadBuffer::NonPooled(buffer) => buffer,
217        }
218    }
219}
220
221/// Convenience struct that allows accumulating writes into a staging buffer and commands
222/// to execute on the staging buffer. This allows for batching uploading resources.
223pub struct RafxUpload {
224    queue: RafxQueue,
225    command_pool: RafxCommandPool,
226    command_buffer: RafxCommandBuffer,
227
228    buffer: UploadBuffer,
229
230    writable: bool,
231    fence: RafxFence,
232
233    buffer_begin: *mut u8,
234    buffer_end: *mut u8,
235    buffer_write_pointer: *mut u8,
236}
237
238unsafe impl Send for RafxUpload {}
239unsafe impl Sync for RafxUpload {}
240
241impl RafxUpload {
242    pub fn new(
243        device_context: &RafxDeviceContext,
244        queue: &RafxQueue,
245        buffer_size: u64,
246        buffer_pool: Option<&mut RafxUploadBufferPool>,
247    ) -> RafxResult<Self> {
248        //
249        // Command Buffers
250        //
251        let mut command_pool =
252            queue.create_command_pool(&RafxCommandPoolDef { transient: true })?;
253        let command_buffer = command_pool.create_command_buffer(&RafxCommandBufferDef {
254            is_secondary: false,
255        })?;
256        command_buffer.begin()?;
257
258        let buffer = if let Some(buffer_pool) = buffer_pool {
259            UploadBuffer::Pooled(buffer_pool.take(buffer_size)?)
260        } else {
261            let buffer = device_context.create_buffer(&RafxBufferDef {
262                size: buffer_size,
263                memory_usage: RafxMemoryUsage::CpuToGpu,
264                queue_type: RafxQueueType::Transfer,
265                resource_type: RafxResourceType::BUFFER,
266                ..Default::default()
267            })?;
268            buffer.set_debug_name("RafxUpload Non-Pooled Buffer");
269            UploadBuffer::NonPooled(buffer)
270        };
271
272        let (buffer_begin, buffer_end, buffer_write_pointer) = unsafe {
273            let buffer_begin = buffer.map_buffer()?;
274
275            let buffer_end = buffer_begin.add(buffer_size as usize);
276            let buffer_write_pointer = buffer_begin;
277
278            (buffer_begin, buffer_end, buffer_write_pointer)
279        };
280
281        let fence = device_context.create_fence()?;
282
283        let upload = RafxUpload {
284            queue: queue.clone(),
285            command_pool,
286            command_buffer,
287            buffer,
288            fence,
289            writable: true,
290            buffer_begin,
291            buffer_end,
292            buffer_write_pointer,
293        };
294
295        Ok(upload)
296    }
297
298    pub fn has_space_available(
299        &self,
300        bytes_to_write: usize,
301        required_alignment: usize,
302        number_of_writes: usize,
303    ) -> bool {
304        let mut write_end_ptr = self.buffer_write_pointer as usize;
305
306        for _ in 0..number_of_writes {
307            // Align the current write pointer
308            let write_begin_ptr = ((write_end_ptr + required_alignment - 1) / required_alignment)
309                * required_alignment;
310            write_end_ptr = write_begin_ptr + bytes_to_write;
311        }
312
313        // See if we would walk past the end of the buffer
314        write_end_ptr <= self.buffer_end as usize
315    }
316
317    pub fn push(
318        &mut self,
319        data: &[u8],
320        required_alignment: usize,
321    ) -> Result<u64, RafxUploadError> {
322        log::trace!("Pushing {} bytes into upload", data.len());
323
324        if self.writable {
325            unsafe {
326                // Figure out the span of memory we will write over
327                let write_begin_ptr = (((self.buffer_write_pointer as usize + required_alignment
328                    - 1)
329                    / required_alignment)
330                    * required_alignment) as *mut u8; // as const *u8;
331                let write_end_ptr = write_begin_ptr.add(data.len());
332
333                // If the span walks past the end of the buffer, fail
334                if write_end_ptr > self.buffer_end {
335                    Err(RafxUploadError::BufferFull)?;
336                }
337
338                std::ptr::copy_nonoverlapping(data.as_ptr(), write_begin_ptr, data.len());
339                self.buffer_write_pointer = write_end_ptr;
340
341                Ok(write_begin_ptr as u64 - self.buffer_begin as u64)
342            }
343        } else {
344            Err("Upload buffer is not writable")?
345        }
346    }
347
348    pub fn buffer_size(&self) -> u64 {
349        self.buffer_end as u64 - self.buffer_begin as u64
350    }
351
352    pub fn bytes_written(&self) -> u64 {
353        self.buffer_write_pointer as u64 - self.buffer_begin as u64
354    }
355
356    pub fn bytes_free(&self) -> u64 {
357        self.buffer_end as u64 - self.buffer_write_pointer as u64
358    }
359
360    pub fn command_pool(&self) -> &RafxCommandPool {
361        &self.command_pool
362    }
363
364    pub fn command_buffer(&self) -> &RafxCommandBuffer {
365        &self.command_buffer
366    }
367
368    pub fn staging_buffer(&self) -> &RafxBuffer {
369        &self.buffer
370    }
371
372    pub fn queue(&self) -> &RafxQueue {
373        &self.queue
374    }
375
376    pub fn submit(&mut self) -> RafxResult<()> {
377        if self.writable {
378            self.command_buffer.end()?;
379            self.queue
380                .submit(&[&self.command_buffer], &[], &[], Some(&self.fence))?;
381            self.writable = false;
382        }
383
384        Ok(())
385    }
386
387    pub fn state(&self) -> RafxResult<RafxUploadState> {
388        let state = if self.writable {
389            RafxUploadState::Writable
390        } else {
391            if self.fence.get_fence_status()? != RafxFenceStatus::Incomplete {
392                RafxUploadState::Complete
393            } else {
394                RafxUploadState::SentToGpu
395            }
396        };
397
398        Ok(state)
399    }
400
401    fn wait_for_idle(&self) -> RafxResult<()> {
402        self.fence.wait()
403    }
404}
405
406impl Drop for RafxUpload {
407    fn drop(&mut self) {
408        log::trace!("destroying RafxUpload");
409
410        // If the transfer is in flight, wait for it to complete
411        self.wait_for_idle().unwrap();
412
413        self.buffer.unmap_buffer().unwrap();
414
415        // buffer, command pool, and fence are destroyed by dropping them
416
417        log::trace!("destroyed RafxUpload");
418    }
419}
420
421#[derive(PartialEq)]
422pub enum RafxTransferUploadState {
423    /// The upload is not submitted yet and data may be appended to it
424    Writable,
425
426    /// The buffer has been sent to the GPU's transfer queue and is no longer writable
427    SentToTransferQueue,
428
429    /// The submit to the transfer queue finished. We are ready to submit to the graphics queue
430    /// but we wait here until called explicitly because submitting to a queue is not thread-safe.
431    /// Additionally, it's likely we will want to batch this submit with other command buffers going
432    /// to the same queue
433    PendingSubmitDstQueue,
434
435    /// The buffer has been sent to the GPU's graphics queue but has not finished
436    SentToDstQueue,
437
438    /// The submit has finished on both queues and the uploaded resources are ready for use
439    Complete,
440}
441
442/// A state machine and associated buffers/synchronization primitives to simplify uploading resources
443/// to the GPU via a transfer queue, and then submitting a memory barrier to the graphics queue
444pub struct RafxTransferUpload {
445    upload: RafxUpload,
446
447    dst_queue: RafxQueue,
448    dst_command_pool: RafxCommandPool,
449    dst_command_buffer: RafxCommandBuffer,
450
451    dst_fence: RafxFence,
452    sent_to_dst_queue: bool,
453}
454
455impl RafxTransferUpload {
456    pub fn new(
457        device_context: &RafxDeviceContext,
458        transfer_queue: &RafxQueue,
459        dst_queue: &RafxQueue,
460        size: u64,
461        buffer_pool: Option<&mut RafxUploadBufferPool>,
462    ) -> RafxResult<Self> {
463        //
464        // Command Buffers
465        //
466        let mut dst_command_pool =
467            dst_queue.create_command_pool(&RafxCommandPoolDef { transient: true })?;
468        let dst_command_buffer = dst_command_pool.create_command_buffer(&RafxCommandBufferDef {
469            is_secondary: false,
470        })?;
471
472        dst_command_buffer.begin()?;
473
474        let upload = RafxUpload::new(device_context, transfer_queue, size, buffer_pool)?;
475
476        let dst_fence = device_context.create_fence()?;
477
478        Ok(RafxTransferUpload {
479            upload,
480            dst_queue: dst_queue.clone(),
481            dst_command_pool,
482            dst_command_buffer,
483            dst_fence,
484            sent_to_dst_queue: false,
485        })
486    }
487
488    pub fn has_space_available(
489        &self,
490        bytes_to_write: usize,
491        required_alignment: usize,
492        number_of_writes: usize,
493    ) -> bool {
494        self.upload
495            .has_space_available(bytes_to_write, required_alignment, number_of_writes)
496    }
497
498    pub fn push(
499        &mut self,
500        data: &[u8],
501        required_alignment: usize,
502    ) -> Result<u64, RafxUploadError> {
503        self.upload.push(data, required_alignment)
504    }
505
506    pub fn buffer_size(&self) -> u64 {
507        self.upload.buffer_size()
508    }
509
510    pub fn bytes_written(&self) -> u64 {
511        self.upload.bytes_written()
512    }
513
514    pub fn bytes_free(&self) -> u64 {
515        self.upload.bytes_free()
516    }
517
518    pub fn staging_buffer(&self) -> &RafxBuffer {
519        &self.upload.staging_buffer()
520    }
521
522    pub fn transfer_command_pool(&self) -> &RafxCommandPool {
523        self.upload.command_pool()
524    }
525
526    pub fn transfer_command_buffer(&self) -> &RafxCommandBuffer {
527        self.upload.command_buffer()
528    }
529
530    pub fn dst_command_pool(&self) -> &RafxCommandPool {
531        &self.dst_command_pool
532    }
533
534    pub fn dst_command_buffer(&self) -> &RafxCommandBuffer {
535        &self.dst_command_buffer
536    }
537
538    pub fn transfer_queue(&self) -> &RafxQueue {
539        self.upload.queue()
540    }
541
542    pub fn dst_queue(&self) -> &RafxQueue {
543        &self.dst_queue
544    }
545
546    pub fn submit_transfer(&mut self) -> RafxResult<()> {
547        self.upload.submit()
548    }
549
550    pub fn submit_dst(&mut self) -> RafxResult<()> {
551        if self.state()? == RafxTransferUploadState::PendingSubmitDstQueue {
552            self.dst_command_buffer.end()?;
553            self.dst_queue
554                .submit(&[&self.dst_command_buffer], &[], &[], Some(&self.dst_fence))?;
555            self.sent_to_dst_queue = true;
556        }
557
558        Ok(())
559    }
560
561    pub fn state(&self) -> RafxResult<RafxTransferUploadState> {
562        let state = if self.sent_to_dst_queue {
563            if self.dst_fence.get_fence_status()? != RafxFenceStatus::Incomplete {
564                RafxTransferUploadState::Complete
565            } else {
566                RafxTransferUploadState::SentToDstQueue
567            }
568        } else {
569            match self.upload.state()? {
570                RafxUploadState::Writable => RafxTransferUploadState::Writable,
571                RafxUploadState::SentToGpu => RafxTransferUploadState::SentToTransferQueue,
572                RafxUploadState::Complete => RafxTransferUploadState::PendingSubmitDstQueue,
573            }
574        };
575
576        Ok(state)
577    }
578
579    fn wait_for_idle(&self) -> RafxResult<()> {
580        if self.sent_to_dst_queue {
581            self.dst_fence.wait()
582        } else {
583            Ok(())
584        }
585    }
586
587    pub fn block_until_upload_complete(&mut self) -> RafxResult<()> {
588        log::trace!("wait on transfer queue {:?}", self.upload.queue);
589        self.submit_transfer()?;
590        loop {
591            if self.state()? == RafxTransferUploadState::PendingSubmitDstQueue {
592                break;
593            }
594        }
595
596        log::trace!("blocking on dst queue {:?}", self.dst_queue);
597        self.submit_dst()?;
598        loop {
599            if self.state()? == RafxTransferUploadState::Complete {
600                break;
601            }
602        }
603
604        Ok(())
605    }
606}
607
608impl Drop for RafxTransferUpload {
609    fn drop(&mut self) {
610        log::trace!("destroying RafxUpload");
611
612        // If the transfer is in flight, wait for it to complete
613        self.upload.wait_for_idle().unwrap();
614        self.wait_for_idle().unwrap();
615
616        log::trace!("destroyed RafxUpload");
617    }
618}