rafx_framework/upload/
upload_queue.rs

1use crate::upload::image_upload::ImageUploadParams;
2use crate::upload::GpuImageData;
3use crate::upload::{buffer_upload, image_upload};
4use crate::{BufferResource, ResourceArc};
5use crossbeam_channel::{Receiver, Sender};
6use rafx_api::{
7    extra::upload::*, RafxBuffer, RafxDeviceContext, RafxError, RafxQueue, RafxResourceType,
8    RafxResult, RafxTexture,
9};
10
11pub trait UploadOp<ResourceT>: Send + Sync {
12    fn complete(
13        self: Box<Self>,
14        resource: ResourceT,
15    );
16
17    fn error(
18        self: Box<Self>,
19        error: RafxError,
20    );
21}
22
23type ImageUploadOp = Box<dyn UploadOp<RafxTexture>>;
24type BufferUploadOp = Box<dyn UploadOp<RafxBuffer>>;
25type ExistingResourceUploadOp = Box<dyn UploadOp<()>>;
26
27//
28// Represents a single request inserted into the upload queue that hasn't started yet
29//
30//TODO: Make a helper object that carries an Arc<Receiver> that can be called
31struct PendingImageUpload {
32    upload_op: ImageUploadOp,
33    image_data: GpuImageData,
34    resource_type: RafxResourceType,
35    generate_mips: bool,
36}
37
38struct PendingBufferUpload {
39    upload_op: BufferUploadOp,
40    resource_type: RafxResourceType,
41    data: Vec<u8>,
42}
43
44struct PendingExistingBufferUpload {
45    upload_op: ExistingResourceUploadOp,
46    resource_type: RafxResourceType,
47    data: Vec<u8>,
48    dst_buffer: ResourceArc<BufferResource>,
49    dst_byte_offset: u64,
50}
51
52enum PendingUpload {
53    Image(PendingImageUpload),
54    Buffer(PendingBufferUpload),
55    ExistingBuffer(PendingExistingBufferUpload),
56}
57
58impl PendingUpload {
59    // Ok(None) = upload enqueue
60    // Ok(Some) = upload not enqueued because there was not enough room
61    // Err = Vulkan error
62    fn try_enqueue_image_upload(
63        device_context: &RafxDeviceContext,
64        upload: &mut RafxTransferUpload,
65        pending_image: PendingImageUpload,
66        in_flight_uploads: &mut Vec<InFlightUpload>,
67    ) -> RafxResult<Option<PendingImageUpload>> {
68        let result = image_upload::enqueue_load_image(
69            device_context,
70            upload,
71            // self.transfer_queue.queue_family_index(),
72            // self.graphics_queue.queue_family_index(),
73            &pending_image.image_data,
74            ImageUploadParams {
75                resource_type: pending_image.resource_type,
76                generate_mips: pending_image.generate_mips,
77                ..Default::default()
78            },
79        );
80
81        match result {
82            Ok(texture) => {
83                in_flight_uploads.push(InFlightUpload::Image(InFlightImageUpload {
84                    texture,
85                    upload_op: pending_image.upload_op,
86                }));
87                Ok(None)
88            }
89            Err(RafxUploadError::Other(e)) => Err(e),
90            Err(RafxUploadError::BufferFull) => Ok(Some(pending_image)),
91        }
92    }
93
94    // Ok(None) = upload enqueue
95    // Ok(Some) = upload not enqueued because there was not enough room
96    // Err = Vulkan error
97    fn try_enqueue_buffer_upload(
98        device_context: &RafxDeviceContext,
99        upload: &mut RafxTransferUpload,
100        pending_buffer: PendingBufferUpload,
101        in_flight_uploads: &mut Vec<InFlightUpload>,
102    ) -> RafxResult<Option<PendingBufferUpload>> {
103        let result = buffer_upload::enqueue_load_buffer(
104            device_context,
105            upload,
106            pending_buffer.resource_type,
107            // self.transfer_queue.queue_family_index(),
108            // self.graphics_queue.queue_family_index(),
109            &pending_buffer.data,
110            None,
111            0,
112        );
113
114        match result {
115            Ok(buffer) => {
116                // We created a buffer, so this should always be Some
117                let buffer = buffer.unwrap();
118                in_flight_uploads.push(InFlightUpload::Buffer(InFlightBufferUpload {
119                    buffer,
120                    upload_op: pending_buffer.upload_op,
121                }));
122                Ok(None)
123            }
124            Err(RafxUploadError::Other(e)) => Err(e),
125            Err(RafxUploadError::BufferFull) => Ok(Some(pending_buffer)),
126        }
127    }
128
129    // Ok(None) = upload enqueue
130    // Ok(Some) = upload not enqueued because there was not enough room
131    // Err = Vulkan error
132    fn try_enqueue_existing_buffer_upload(
133        device_context: &RafxDeviceContext,
134        upload: &mut RafxTransferUpload,
135        pending_buffer: PendingExistingBufferUpload,
136        in_flight_uploads: &mut Vec<InFlightUpload>,
137    ) -> RafxResult<Option<PendingExistingBufferUpload>> {
138        let result = buffer_upload::enqueue_load_buffer(
139            device_context,
140            upload,
141            pending_buffer.resource_type,
142            // self.transfer_queue.queue_family_index(),
143            // self.graphics_queue.queue_family_index(),
144            &pending_buffer.data,
145            Some(&*pending_buffer.dst_buffer.get_raw().buffer),
146            pending_buffer.dst_byte_offset,
147        );
148
149        match result {
150            Ok(buffer) => {
151                //It should be none, we used an existing buffer
152                assert!(buffer.is_none());
153                in_flight_uploads.push(InFlightUpload::ExistingResource(
154                    InFlightExistingResourceUpload {
155                        upload_op: pending_buffer.upload_op,
156                    },
157                ));
158                Ok(None)
159            }
160            Err(RafxUploadError::Other(e)) => Err(e),
161            Err(RafxUploadError::BufferFull) => Ok(Some(pending_buffer)),
162        }
163    }
164
165    fn try_enqueue_upload(
166        self,
167        device_context: &RafxDeviceContext,
168        upload: &mut RafxTransferUpload,
169        in_flight_uploads: &mut Vec<InFlightUpload>,
170    ) -> RafxResult<Option<Self>> {
171        Ok(match self {
172            PendingUpload::Image(pending_upload) => Self::try_enqueue_image_upload(
173                device_context,
174                upload,
175                pending_upload,
176                in_flight_uploads,
177            )?
178            .map(|x| PendingUpload::Image(x)),
179            PendingUpload::Buffer(pending_upload) => Self::try_enqueue_buffer_upload(
180                device_context,
181                upload,
182                pending_upload,
183                in_flight_uploads,
184            )?
185            .map(|x| PendingUpload::Buffer(x)),
186            PendingUpload::ExistingBuffer(pending_upload) => {
187                Self::try_enqueue_existing_buffer_upload(
188                    device_context,
189                    upload,
190                    pending_upload,
191                    in_flight_uploads,
192                )?
193                .map(|x| PendingUpload::ExistingBuffer(x))
194            }
195        })
196    }
197
198    fn required_bytes(
199        &self,
200        device_context: &RafxDeviceContext,
201    ) -> usize {
202        match self {
203            PendingUpload::Image(image) => {
204                let device_info = device_context.device_info();
205                image.image_data.total_size(
206                    device_info.upload_texture_alignment,
207                    device_info.upload_texture_row_alignment,
208                ) as usize
209            }
210            PendingUpload::Buffer(buffer) => buffer.data.len(),
211            PendingUpload::ExistingBuffer(buffer) => buffer.data.len(),
212        }
213    }
214}
215
216//
217// Represents a single request that the upload queue has started
218//
219struct InFlightImageUpload {
220    upload_op: ImageUploadOp,
221    texture: RafxTexture,
222}
223
224struct InFlightBufferUpload {
225    upload_op: BufferUploadOp,
226    buffer: RafxBuffer,
227}
228
229struct InFlightExistingResourceUpload {
230    upload_op: ExistingResourceUploadOp,
231}
232
233enum InFlightUpload {
234    Image(InFlightImageUpload),
235    Buffer(InFlightBufferUpload),
236    ExistingResource(InFlightExistingResourceUpload),
237}
238
239impl InFlightUpload {
240    fn complete(self) {
241        match self {
242            InFlightUpload::Image(image) => {
243                image.upload_op.complete(image.texture);
244            }
245            InFlightUpload::Buffer(buffer) => {
246                buffer.upload_op.complete(buffer.buffer);
247            }
248            InFlightUpload::ExistingResource(existing) => {
249                existing.upload_op.complete(());
250            }
251        }
252    }
253
254    fn error(
255        self,
256        error: RafxError,
257    ) {
258        match self {
259            InFlightUpload::Image(image) => {
260                image.upload_op.error(error);
261                // image.texture is dropped here
262            }
263            InFlightUpload::Buffer(buffer) => {
264                buffer.upload_op.error(error);
265                // image.buffer is dropped here
266            }
267            InFlightUpload::ExistingResource(existing) => {
268                existing.upload_op.error(error);
269            }
270        }
271    }
272}
273
274//
275// Represents a batch of requests that has been started, contains multiple InFlightImageUpload and
276// InFlightBufferUploads
277//
278
279// The result from polling a single upload (which may contain multiple images in it)
280enum InProgressUploadBatchPollResult {
281    Pending,
282    Complete,
283    Error,
284    Destroyed,
285}
286
287// This is an inner of InProgressImageUpload - it is wrapped in a Option to avoid borrowing issues
288// when polling by allowing us to temporarily take ownership of it and then put it back
289struct InProgressUploadBatchInner {
290    in_flight_uploads: Vec<InFlightUpload>,
291    upload: RafxTransferUpload,
292}
293
294struct InProgressUploadBatchDebugInfo {
295    upload_id: usize,
296    start_time: rafx_base::Instant,
297    size: u64,
298    resource_count: usize,
299}
300
301// A single upload which may contain multiple images
302struct InProgressUploadBatch {
303    // Only valid if the upload is actually in progress
304    inner: Option<InProgressUploadBatchInner>,
305    debug_info: InProgressUploadBatchDebugInfo,
306}
307
308impl InProgressUploadBatch {
309    pub fn new(
310        in_flight_uploads: Vec<InFlightUpload>,
311        upload: RafxTransferUpload,
312        debug_info: InProgressUploadBatchDebugInfo,
313    ) -> Self {
314        let inner = InProgressUploadBatchInner {
315            in_flight_uploads,
316            upload,
317        };
318
319        InProgressUploadBatch {
320            inner: Some(inner),
321            debug_info,
322        }
323    }
324
325    // The main state machine for an upload:
326    // - Submits on the transfer queue and waits
327    // - Submits on the graphics queue and waits
328    //
329    // Calls upload_op.complete() or upload_op.error() as appropriate
330    pub fn poll_load(&mut self) -> InProgressUploadBatchPollResult {
331        loop {
332            if let Some(mut inner) = self.take_inner() {
333                match inner.upload.state() {
334                    Ok(state) => match state {
335                        RafxTransferUploadState::Writable => {
336                            //log::trace!("RafxTransferUploadState::Writable");
337                            inner.upload.submit_transfer().unwrap();
338                            self.inner = Some(inner);
339                        }
340                        RafxTransferUploadState::SentToTransferQueue => {
341                            //log::trace!("RafxTransferUploadState::SentToTransferQueue");
342                            self.inner = Some(inner);
343                            break InProgressUploadBatchPollResult::Pending;
344                        }
345                        RafxTransferUploadState::PendingSubmitDstQueue => {
346                            //log::trace!("RafxTransferUploadState::PendingSubmitDstQueue");
347                            inner.upload.submit_dst().unwrap();
348                            self.inner = Some(inner);
349                        }
350                        RafxTransferUploadState::SentToDstQueue => {
351                            //log::trace!("RafxTransferUploadState::SentToDstQueue");
352                            self.inner = Some(inner);
353                            break InProgressUploadBatchPollResult::Pending;
354                        }
355                        RafxTransferUploadState::Complete => {
356                            //log::trace!("RafxTransferUploadState::Complete");
357                            for in_flight_upload in inner.in_flight_uploads {
358                                in_flight_upload.complete();
359                            }
360
361                            break InProgressUploadBatchPollResult::Complete;
362                        }
363                    },
364                    Err(err) => {
365                        for in_flight_upload in inner.in_flight_uploads {
366                            in_flight_upload.error(err.clone());
367                        }
368
369                        break InProgressUploadBatchPollResult::Error;
370                    }
371                }
372            } else {
373                break InProgressUploadBatchPollResult::Destroyed;
374            }
375        }
376    }
377
378    // Allows taking ownership of the inner object
379    fn take_inner(&mut self) -> Option<InProgressUploadBatchInner> {
380        let mut inner = None;
381        std::mem::swap(&mut self.inner, &mut inner);
382        inner
383    }
384}
385
386impl Drop for InProgressUploadBatch {
387    fn drop(&mut self) {
388        if let Some(mut inner) = self.take_inner() {
389            // I don't think order of destruction matters but just in case
390            inner.in_flight_uploads.clear();
391        }
392    }
393}
394
395pub struct UploadQueueConfig {
396    pub max_bytes_per_upload: usize,
397    pub max_concurrent_uploads: usize,
398    pub max_new_uploads_in_single_frame: usize,
399}
400
401//
402// Receives sets of images/buffers that need to be uploaded and kicks off the uploads in batches
403//
404pub struct UploadQueue {
405    device_context: RafxDeviceContext,
406    config: UploadQueueConfig,
407
408    pending_upload_tx: Sender<PendingUpload>,
409    pending_upload_rx: Receiver<PendingUpload>,
410    // If we fail to upload due to size limitation, keep the failed upload here to retry later
411    next_upload: Option<PendingUpload>,
412
413    // These are uploads that are currently in progress
414    uploads_in_progress: Vec<InProgressUploadBatch>,
415
416    upload_buffer_pool: RafxUploadBufferPool,
417
418    graphics_queue: RafxQueue,
419    transfer_queue: RafxQueue,
420
421    next_upload_id: usize,
422}
423
424impl UploadQueue {
425    pub fn new(
426        device_context: &RafxDeviceContext,
427        config: UploadQueueConfig,
428        graphics_queue: RafxQueue,
429        transfer_queue: RafxQueue,
430    ) -> RafxResult<Self> {
431        let (pending_upload_tx, pending_upload_rx) = crossbeam_channel::unbounded();
432        let upload_buffer_pool = RafxUploadBufferPool::new(
433            device_context,
434            config.max_concurrent_uploads as u32,
435            config.max_bytes_per_upload as u64,
436        )?;
437
438        Ok(UploadQueue {
439            device_context: device_context.clone(),
440            config,
441            upload_buffer_pool,
442            pending_upload_tx,
443            pending_upload_rx,
444            next_upload: None,
445            uploads_in_progress: Default::default(),
446            next_upload_id: 1,
447            graphics_queue,
448            transfer_queue,
449        })
450    }
451
452    pub fn upload_queue_context(&self) -> UploadQueueContext {
453        UploadQueueContext {
454            pending_upload_tx: self.pending_upload_tx.clone(),
455        }
456    }
457
458    pub fn update(&mut self) -> RafxResult<()> {
459        self.start_new_upload_batches()?;
460        self.update_existing_upload_batches();
461        Ok(())
462    }
463
464    fn gather_pending_uploads_for_single_upload_batch(
465        &mut self,
466        upload: &mut RafxTransferUpload,
467    ) -> RafxResult<Vec<InFlightUpload>> {
468        let mut in_flight_uploads = vec![];
469
470        // If we had a pending image upload from before, try to upload it now
471        self.next_upload = if let Some(next_upload) = self.next_upload.take() {
472            next_upload.try_enqueue_upload(&self.device_context, upload, &mut in_flight_uploads)?
473        } else {
474            None
475        };
476
477        // The first image we tried to upload failed. Log an error since we aren't making forward progress
478        if let Some(next_upload) = &self.next_upload {
479            log::error!(
480                "Resource of {} bytes has repeatedly exceeded the available room in the upload buffer. ({} of {} bytes free)",
481                next_upload.required_bytes(&upload.dst_queue().device_context()),
482                upload.bytes_free(),
483                upload.buffer_size()
484            );
485            return Ok(vec![]);
486        }
487
488        let rx = self.pending_upload_rx.clone();
489        for pending_upload in rx.try_iter() {
490            self.next_upload = pending_upload.try_enqueue_upload(
491                &self.device_context,
492                upload,
493                &mut in_flight_uploads,
494            )?;
495
496            if let Some(next_upload) = &self.next_upload {
497                log::debug!(
498                    "Resource of {} bytes exceeds the available room in the upload buffer. ({} of {} bytes free)",
499                    next_upload.required_bytes(&upload.dst_queue().device_context()),
500                    upload.bytes_free(),
501                    upload.buffer_size(),
502                );
503                break;
504            }
505        }
506
507        Ok(in_flight_uploads)
508    }
509
510    fn try_start_single_upload_batch(&mut self) -> RafxResult<bool> {
511        let mut upload = RafxTransferUpload::new(
512            &self.device_context,
513            &self.transfer_queue,
514            &self.graphics_queue,
515            self.config.max_bytes_per_upload as u64,
516            Some(&mut self.upload_buffer_pool),
517        )?;
518
519        let in_flight_uploads = self.gather_pending_uploads_for_single_upload_batch(&mut upload)?;
520
521        if !in_flight_uploads.is_empty() {
522            let upload_id = self.next_upload_id;
523            self.next_upload_id += 1;
524
525            log::debug!(
526                "Submitting {} byte upload with {} uploads, UploadId = {}",
527                upload.bytes_written(),
528                in_flight_uploads.len(),
529                upload_id
530            );
531
532            upload.submit_transfer()?;
533
534            let debug_info = InProgressUploadBatchDebugInfo {
535                upload_id,
536                resource_count: in_flight_uploads.len(),
537                size: upload.bytes_written(),
538                start_time: rafx_base::Instant::now(),
539            };
540
541            self.uploads_in_progress.push(InProgressUploadBatch::new(
542                in_flight_uploads,
543                upload,
544                debug_info,
545            ));
546
547            Ok(true)
548        } else {
549            Ok(false)
550        }
551    }
552
553    fn start_new_upload_batches(&mut self) -> RafxResult<()> {
554        for _ in 0..self.config.max_new_uploads_in_single_frame {
555            if self.pending_upload_rx.is_empty() && self.next_upload.is_none() {
556                return Ok(());
557            }
558
559            if self.uploads_in_progress.len() >= self.config.max_concurrent_uploads {
560                log::trace!(
561                    "Max number of uploads already in progress. Waiting to start a new one"
562                );
563                return Ok(());
564            }
565
566            if !self.try_start_single_upload_batch()? {
567                return Ok(());
568            }
569        }
570
571        Ok(())
572    }
573
574    fn update_existing_upload_batches(&mut self) {
575        // iterate backwards so we can use swap_remove
576        for i in (0..self.uploads_in_progress.len()).rev() {
577            let result = self.uploads_in_progress[i].poll_load();
578            match result {
579                InProgressUploadBatchPollResult::Pending => {
580                    // do nothing
581                }
582                InProgressUploadBatchPollResult::Complete => {
583                    //load_op.complete() is called by poll_load
584
585                    let debug_info = &self.uploads_in_progress[i].debug_info;
586                    log::debug!(
587                        "Completed {} byte upload with {} resources in {} ms, UploadId = {}",
588                        debug_info.size,
589                        debug_info.resource_count,
590                        debug_info.start_time.elapsed().as_secs_f32(),
591                        debug_info.upload_id
592                    );
593
594                    self.uploads_in_progress.swap_remove(i);
595                }
596                InProgressUploadBatchPollResult::Error => {
597                    //load_op.error() is called by poll_load
598
599                    let debug_info = &self.uploads_in_progress[i].debug_info;
600                    log::error!(
601                        "Failed {} byte upload with {} resources in {} ms, UploadId = {}",
602                        debug_info.size,
603                        debug_info.resource_count,
604                        debug_info.start_time.elapsed().as_secs_f32(),
605                        debug_info.upload_id
606                    );
607
608                    self.uploads_in_progress.swap_remove(i);
609                }
610                InProgressUploadBatchPollResult::Destroyed => {
611                    // not expected - this only occurs if polling the upload when it is already in a complete or error state
612                    unreachable!();
613                }
614            }
615        }
616    }
617}
618
619#[derive(Clone)]
620pub struct UploadQueueContext {
621    pending_upload_tx: Sender<PendingUpload>,
622}
623
624impl UploadQueueContext {
625    pub fn upload_new_image(
626        &self,
627        upload_op: ImageUploadOp,
628        image_data: GpuImageData,
629        resource_type: RafxResourceType,
630        generate_mips: bool,
631    ) -> RafxResult<()> {
632        self.pending_upload_tx
633            .send(PendingUpload::Image(PendingImageUpload {
634                upload_op,
635                image_data,
636                resource_type,
637                generate_mips,
638            }))
639            .map_err(|_err| {
640                let error = format!("Could not enqueue image upload");
641                log::error!("{}", error);
642                RafxError::StringError(error)
643            })
644    }
645
646    pub fn upload_new_buffer(
647        &self,
648        upload_op: BufferUploadOp,
649        resource_type: RafxResourceType,
650        data: Vec<u8>,
651    ) -> RafxResult<()> {
652        self.pending_upload_tx
653            .send(PendingUpload::Buffer(PendingBufferUpload {
654                upload_op,
655                resource_type,
656                data,
657            }))
658            .map_err(|_err| {
659                let error = format!("Could not enqueue buffer upload");
660                log::error!("{}", error);
661                RafxError::StringError(error)
662            })
663    }
664
665    pub fn upload_to_existing_buffer(
666        &self,
667        upload_op: ExistingResourceUploadOp,
668        resource_type: RafxResourceType,
669        data: Vec<u8>,
670        dst_buffer: ResourceArc<BufferResource>,
671        dst_byte_offset: u64,
672    ) -> RafxResult<()> {
673        self.pending_upload_tx
674            .send(PendingUpload::ExistingBuffer(PendingExistingBufferUpload {
675                upload_op,
676                resource_type,
677                data,
678                dst_buffer,
679                dst_byte_offset,
680            }))
681            .map_err(|_err| {
682                let error = format!("Could not enqueue buffer upload");
683                log::error!("{}", error);
684                RafxError::StringError(error)
685            })
686    }
687}