1use crate::upload::image_upload::ImageUploadParams;
2use crate::upload::GpuImageData;
3use crate::upload::{buffer_upload, image_upload};
4use crate::{BufferResource, ResourceArc};
5use crossbeam_channel::{Receiver, Sender};
6use rafx_api::{
7 extra::upload::*, RafxBuffer, RafxDeviceContext, RafxError, RafxQueue, RafxResourceType,
8 RafxResult, RafxTexture,
9};
10
11pub trait UploadOp<ResourceT>: Send + Sync {
12 fn complete(
13 self: Box<Self>,
14 resource: ResourceT,
15 );
16
17 fn error(
18 self: Box<Self>,
19 error: RafxError,
20 );
21}
22
23type ImageUploadOp = Box<dyn UploadOp<RafxTexture>>;
24type BufferUploadOp = Box<dyn UploadOp<RafxBuffer>>;
25type ExistingResourceUploadOp = Box<dyn UploadOp<()>>;
26
27struct PendingImageUpload {
32 upload_op: ImageUploadOp,
33 image_data: GpuImageData,
34 resource_type: RafxResourceType,
35 generate_mips: bool,
36}
37
38struct PendingBufferUpload {
39 upload_op: BufferUploadOp,
40 resource_type: RafxResourceType,
41 data: Vec<u8>,
42}
43
44struct PendingExistingBufferUpload {
45 upload_op: ExistingResourceUploadOp,
46 resource_type: RafxResourceType,
47 data: Vec<u8>,
48 dst_buffer: ResourceArc<BufferResource>,
49 dst_byte_offset: u64,
50}
51
52enum PendingUpload {
53 Image(PendingImageUpload),
54 Buffer(PendingBufferUpload),
55 ExistingBuffer(PendingExistingBufferUpload),
56}
57
58impl PendingUpload {
59 fn try_enqueue_image_upload(
63 device_context: &RafxDeviceContext,
64 upload: &mut RafxTransferUpload,
65 pending_image: PendingImageUpload,
66 in_flight_uploads: &mut Vec<InFlightUpload>,
67 ) -> RafxResult<Option<PendingImageUpload>> {
68 let result = image_upload::enqueue_load_image(
69 device_context,
70 upload,
71 &pending_image.image_data,
74 ImageUploadParams {
75 resource_type: pending_image.resource_type,
76 generate_mips: pending_image.generate_mips,
77 ..Default::default()
78 },
79 );
80
81 match result {
82 Ok(texture) => {
83 in_flight_uploads.push(InFlightUpload::Image(InFlightImageUpload {
84 texture,
85 upload_op: pending_image.upload_op,
86 }));
87 Ok(None)
88 }
89 Err(RafxUploadError::Other(e)) => Err(e),
90 Err(RafxUploadError::BufferFull) => Ok(Some(pending_image)),
91 }
92 }
93
94 fn try_enqueue_buffer_upload(
98 device_context: &RafxDeviceContext,
99 upload: &mut RafxTransferUpload,
100 pending_buffer: PendingBufferUpload,
101 in_flight_uploads: &mut Vec<InFlightUpload>,
102 ) -> RafxResult<Option<PendingBufferUpload>> {
103 let result = buffer_upload::enqueue_load_buffer(
104 device_context,
105 upload,
106 pending_buffer.resource_type,
107 &pending_buffer.data,
110 None,
111 0,
112 );
113
114 match result {
115 Ok(buffer) => {
116 let buffer = buffer.unwrap();
118 in_flight_uploads.push(InFlightUpload::Buffer(InFlightBufferUpload {
119 buffer,
120 upload_op: pending_buffer.upload_op,
121 }));
122 Ok(None)
123 }
124 Err(RafxUploadError::Other(e)) => Err(e),
125 Err(RafxUploadError::BufferFull) => Ok(Some(pending_buffer)),
126 }
127 }
128
129 fn try_enqueue_existing_buffer_upload(
133 device_context: &RafxDeviceContext,
134 upload: &mut RafxTransferUpload,
135 pending_buffer: PendingExistingBufferUpload,
136 in_flight_uploads: &mut Vec<InFlightUpload>,
137 ) -> RafxResult<Option<PendingExistingBufferUpload>> {
138 let result = buffer_upload::enqueue_load_buffer(
139 device_context,
140 upload,
141 pending_buffer.resource_type,
142 &pending_buffer.data,
145 Some(&*pending_buffer.dst_buffer.get_raw().buffer),
146 pending_buffer.dst_byte_offset,
147 );
148
149 match result {
150 Ok(buffer) => {
151 assert!(buffer.is_none());
153 in_flight_uploads.push(InFlightUpload::ExistingResource(
154 InFlightExistingResourceUpload {
155 upload_op: pending_buffer.upload_op,
156 },
157 ));
158 Ok(None)
159 }
160 Err(RafxUploadError::Other(e)) => Err(e),
161 Err(RafxUploadError::BufferFull) => Ok(Some(pending_buffer)),
162 }
163 }
164
165 fn try_enqueue_upload(
166 self,
167 device_context: &RafxDeviceContext,
168 upload: &mut RafxTransferUpload,
169 in_flight_uploads: &mut Vec<InFlightUpload>,
170 ) -> RafxResult<Option<Self>> {
171 Ok(match self {
172 PendingUpload::Image(pending_upload) => Self::try_enqueue_image_upload(
173 device_context,
174 upload,
175 pending_upload,
176 in_flight_uploads,
177 )?
178 .map(|x| PendingUpload::Image(x)),
179 PendingUpload::Buffer(pending_upload) => Self::try_enqueue_buffer_upload(
180 device_context,
181 upload,
182 pending_upload,
183 in_flight_uploads,
184 )?
185 .map(|x| PendingUpload::Buffer(x)),
186 PendingUpload::ExistingBuffer(pending_upload) => {
187 Self::try_enqueue_existing_buffer_upload(
188 device_context,
189 upload,
190 pending_upload,
191 in_flight_uploads,
192 )?
193 .map(|x| PendingUpload::ExistingBuffer(x))
194 }
195 })
196 }
197
198 fn required_bytes(
199 &self,
200 device_context: &RafxDeviceContext,
201 ) -> usize {
202 match self {
203 PendingUpload::Image(image) => {
204 let device_info = device_context.device_info();
205 image.image_data.total_size(
206 device_info.upload_texture_alignment,
207 device_info.upload_texture_row_alignment,
208 ) as usize
209 }
210 PendingUpload::Buffer(buffer) => buffer.data.len(),
211 PendingUpload::ExistingBuffer(buffer) => buffer.data.len(),
212 }
213 }
214}
215
216struct InFlightImageUpload {
220 upload_op: ImageUploadOp,
221 texture: RafxTexture,
222}
223
224struct InFlightBufferUpload {
225 upload_op: BufferUploadOp,
226 buffer: RafxBuffer,
227}
228
229struct InFlightExistingResourceUpload {
230 upload_op: ExistingResourceUploadOp,
231}
232
233enum InFlightUpload {
234 Image(InFlightImageUpload),
235 Buffer(InFlightBufferUpload),
236 ExistingResource(InFlightExistingResourceUpload),
237}
238
239impl InFlightUpload {
240 fn complete(self) {
241 match self {
242 InFlightUpload::Image(image) => {
243 image.upload_op.complete(image.texture);
244 }
245 InFlightUpload::Buffer(buffer) => {
246 buffer.upload_op.complete(buffer.buffer);
247 }
248 InFlightUpload::ExistingResource(existing) => {
249 existing.upload_op.complete(());
250 }
251 }
252 }
253
254 fn error(
255 self,
256 error: RafxError,
257 ) {
258 match self {
259 InFlightUpload::Image(image) => {
260 image.upload_op.error(error);
261 }
263 InFlightUpload::Buffer(buffer) => {
264 buffer.upload_op.error(error);
265 }
267 InFlightUpload::ExistingResource(existing) => {
268 existing.upload_op.error(error);
269 }
270 }
271 }
272}
273
274enum InProgressUploadBatchPollResult {
281 Pending,
282 Complete,
283 Error,
284 Destroyed,
285}
286
287struct InProgressUploadBatchInner {
290 in_flight_uploads: Vec<InFlightUpload>,
291 upload: RafxTransferUpload,
292}
293
294struct InProgressUploadBatchDebugInfo {
295 upload_id: usize,
296 start_time: rafx_base::Instant,
297 size: u64,
298 resource_count: usize,
299}
300
301struct InProgressUploadBatch {
303 inner: Option<InProgressUploadBatchInner>,
305 debug_info: InProgressUploadBatchDebugInfo,
306}
307
308impl InProgressUploadBatch {
309 pub fn new(
310 in_flight_uploads: Vec<InFlightUpload>,
311 upload: RafxTransferUpload,
312 debug_info: InProgressUploadBatchDebugInfo,
313 ) -> Self {
314 let inner = InProgressUploadBatchInner {
315 in_flight_uploads,
316 upload,
317 };
318
319 InProgressUploadBatch {
320 inner: Some(inner),
321 debug_info,
322 }
323 }
324
325 pub fn poll_load(&mut self) -> InProgressUploadBatchPollResult {
331 loop {
332 if let Some(mut inner) = self.take_inner() {
333 match inner.upload.state() {
334 Ok(state) => match state {
335 RafxTransferUploadState::Writable => {
336 inner.upload.submit_transfer().unwrap();
338 self.inner = Some(inner);
339 }
340 RafxTransferUploadState::SentToTransferQueue => {
341 self.inner = Some(inner);
343 break InProgressUploadBatchPollResult::Pending;
344 }
345 RafxTransferUploadState::PendingSubmitDstQueue => {
346 inner.upload.submit_dst().unwrap();
348 self.inner = Some(inner);
349 }
350 RafxTransferUploadState::SentToDstQueue => {
351 self.inner = Some(inner);
353 break InProgressUploadBatchPollResult::Pending;
354 }
355 RafxTransferUploadState::Complete => {
356 for in_flight_upload in inner.in_flight_uploads {
358 in_flight_upload.complete();
359 }
360
361 break InProgressUploadBatchPollResult::Complete;
362 }
363 },
364 Err(err) => {
365 for in_flight_upload in inner.in_flight_uploads {
366 in_flight_upload.error(err.clone());
367 }
368
369 break InProgressUploadBatchPollResult::Error;
370 }
371 }
372 } else {
373 break InProgressUploadBatchPollResult::Destroyed;
374 }
375 }
376 }
377
378 fn take_inner(&mut self) -> Option<InProgressUploadBatchInner> {
380 let mut inner = None;
381 std::mem::swap(&mut self.inner, &mut inner);
382 inner
383 }
384}
385
386impl Drop for InProgressUploadBatch {
387 fn drop(&mut self) {
388 if let Some(mut inner) = self.take_inner() {
389 inner.in_flight_uploads.clear();
391 }
392 }
393}
394
395pub struct UploadQueueConfig {
396 pub max_bytes_per_upload: usize,
397 pub max_concurrent_uploads: usize,
398 pub max_new_uploads_in_single_frame: usize,
399}
400
401pub struct UploadQueue {
405 device_context: RafxDeviceContext,
406 config: UploadQueueConfig,
407
408 pending_upload_tx: Sender<PendingUpload>,
409 pending_upload_rx: Receiver<PendingUpload>,
410 next_upload: Option<PendingUpload>,
412
413 uploads_in_progress: Vec<InProgressUploadBatch>,
415
416 upload_buffer_pool: RafxUploadBufferPool,
417
418 graphics_queue: RafxQueue,
419 transfer_queue: RafxQueue,
420
421 next_upload_id: usize,
422}
423
424impl UploadQueue {
425 pub fn new(
426 device_context: &RafxDeviceContext,
427 config: UploadQueueConfig,
428 graphics_queue: RafxQueue,
429 transfer_queue: RafxQueue,
430 ) -> RafxResult<Self> {
431 let (pending_upload_tx, pending_upload_rx) = crossbeam_channel::unbounded();
432 let upload_buffer_pool = RafxUploadBufferPool::new(
433 device_context,
434 config.max_concurrent_uploads as u32,
435 config.max_bytes_per_upload as u64,
436 )?;
437
438 Ok(UploadQueue {
439 device_context: device_context.clone(),
440 config,
441 upload_buffer_pool,
442 pending_upload_tx,
443 pending_upload_rx,
444 next_upload: None,
445 uploads_in_progress: Default::default(),
446 next_upload_id: 1,
447 graphics_queue,
448 transfer_queue,
449 })
450 }
451
452 pub fn upload_queue_context(&self) -> UploadQueueContext {
453 UploadQueueContext {
454 pending_upload_tx: self.pending_upload_tx.clone(),
455 }
456 }
457
458 pub fn update(&mut self) -> RafxResult<()> {
459 self.start_new_upload_batches()?;
460 self.update_existing_upload_batches();
461 Ok(())
462 }
463
464 fn gather_pending_uploads_for_single_upload_batch(
465 &mut self,
466 upload: &mut RafxTransferUpload,
467 ) -> RafxResult<Vec<InFlightUpload>> {
468 let mut in_flight_uploads = vec![];
469
470 self.next_upload = if let Some(next_upload) = self.next_upload.take() {
472 next_upload.try_enqueue_upload(&self.device_context, upload, &mut in_flight_uploads)?
473 } else {
474 None
475 };
476
477 if let Some(next_upload) = &self.next_upload {
479 log::error!(
480 "Resource of {} bytes has repeatedly exceeded the available room in the upload buffer. ({} of {} bytes free)",
481 next_upload.required_bytes(&upload.dst_queue().device_context()),
482 upload.bytes_free(),
483 upload.buffer_size()
484 );
485 return Ok(vec![]);
486 }
487
488 let rx = self.pending_upload_rx.clone();
489 for pending_upload in rx.try_iter() {
490 self.next_upload = pending_upload.try_enqueue_upload(
491 &self.device_context,
492 upload,
493 &mut in_flight_uploads,
494 )?;
495
496 if let Some(next_upload) = &self.next_upload {
497 log::debug!(
498 "Resource of {} bytes exceeds the available room in the upload buffer. ({} of {} bytes free)",
499 next_upload.required_bytes(&upload.dst_queue().device_context()),
500 upload.bytes_free(),
501 upload.buffer_size(),
502 );
503 break;
504 }
505 }
506
507 Ok(in_flight_uploads)
508 }
509
510 fn try_start_single_upload_batch(&mut self) -> RafxResult<bool> {
511 let mut upload = RafxTransferUpload::new(
512 &self.device_context,
513 &self.transfer_queue,
514 &self.graphics_queue,
515 self.config.max_bytes_per_upload as u64,
516 Some(&mut self.upload_buffer_pool),
517 )?;
518
519 let in_flight_uploads = self.gather_pending_uploads_for_single_upload_batch(&mut upload)?;
520
521 if !in_flight_uploads.is_empty() {
522 let upload_id = self.next_upload_id;
523 self.next_upload_id += 1;
524
525 log::debug!(
526 "Submitting {} byte upload with {} uploads, UploadId = {}",
527 upload.bytes_written(),
528 in_flight_uploads.len(),
529 upload_id
530 );
531
532 upload.submit_transfer()?;
533
534 let debug_info = InProgressUploadBatchDebugInfo {
535 upload_id,
536 resource_count: in_flight_uploads.len(),
537 size: upload.bytes_written(),
538 start_time: rafx_base::Instant::now(),
539 };
540
541 self.uploads_in_progress.push(InProgressUploadBatch::new(
542 in_flight_uploads,
543 upload,
544 debug_info,
545 ));
546
547 Ok(true)
548 } else {
549 Ok(false)
550 }
551 }
552
553 fn start_new_upload_batches(&mut self) -> RafxResult<()> {
554 for _ in 0..self.config.max_new_uploads_in_single_frame {
555 if self.pending_upload_rx.is_empty() && self.next_upload.is_none() {
556 return Ok(());
557 }
558
559 if self.uploads_in_progress.len() >= self.config.max_concurrent_uploads {
560 log::trace!(
561 "Max number of uploads already in progress. Waiting to start a new one"
562 );
563 return Ok(());
564 }
565
566 if !self.try_start_single_upload_batch()? {
567 return Ok(());
568 }
569 }
570
571 Ok(())
572 }
573
574 fn update_existing_upload_batches(&mut self) {
575 for i in (0..self.uploads_in_progress.len()).rev() {
577 let result = self.uploads_in_progress[i].poll_load();
578 match result {
579 InProgressUploadBatchPollResult::Pending => {
580 }
582 InProgressUploadBatchPollResult::Complete => {
583 let debug_info = &self.uploads_in_progress[i].debug_info;
586 log::debug!(
587 "Completed {} byte upload with {} resources in {} ms, UploadId = {}",
588 debug_info.size,
589 debug_info.resource_count,
590 debug_info.start_time.elapsed().as_secs_f32(),
591 debug_info.upload_id
592 );
593
594 self.uploads_in_progress.swap_remove(i);
595 }
596 InProgressUploadBatchPollResult::Error => {
597 let debug_info = &self.uploads_in_progress[i].debug_info;
600 log::error!(
601 "Failed {} byte upload with {} resources in {} ms, UploadId = {}",
602 debug_info.size,
603 debug_info.resource_count,
604 debug_info.start_time.elapsed().as_secs_f32(),
605 debug_info.upload_id
606 );
607
608 self.uploads_in_progress.swap_remove(i);
609 }
610 InProgressUploadBatchPollResult::Destroyed => {
611 unreachable!();
613 }
614 }
615 }
616 }
617}
618
619#[derive(Clone)]
620pub struct UploadQueueContext {
621 pending_upload_tx: Sender<PendingUpload>,
622}
623
624impl UploadQueueContext {
625 pub fn upload_new_image(
626 &self,
627 upload_op: ImageUploadOp,
628 image_data: GpuImageData,
629 resource_type: RafxResourceType,
630 generate_mips: bool,
631 ) -> RafxResult<()> {
632 self.pending_upload_tx
633 .send(PendingUpload::Image(PendingImageUpload {
634 upload_op,
635 image_data,
636 resource_type,
637 generate_mips,
638 }))
639 .map_err(|_err| {
640 let error = format!("Could not enqueue image upload");
641 log::error!("{}", error);
642 RafxError::StringError(error)
643 })
644 }
645
646 pub fn upload_new_buffer(
647 &self,
648 upload_op: BufferUploadOp,
649 resource_type: RafxResourceType,
650 data: Vec<u8>,
651 ) -> RafxResult<()> {
652 self.pending_upload_tx
653 .send(PendingUpload::Buffer(PendingBufferUpload {
654 upload_op,
655 resource_type,
656 data,
657 }))
658 .map_err(|_err| {
659 let error = format!("Could not enqueue buffer upload");
660 log::error!("{}", error);
661 RafxError::StringError(error)
662 })
663 }
664
665 pub fn upload_to_existing_buffer(
666 &self,
667 upload_op: ExistingResourceUploadOp,
668 resource_type: RafxResourceType,
669 data: Vec<u8>,
670 dst_buffer: ResourceArc<BufferResource>,
671 dst_byte_offset: u64,
672 ) -> RafxResult<()> {
673 self.pending_upload_tx
674 .send(PendingUpload::ExistingBuffer(PendingExistingBufferUpload {
675 upload_op,
676 resource_type,
677 data,
678 dst_buffer,
679 dst_byte_offset,
680 }))
681 .map_err(|_err| {
682 let error = format!("Could not enqueue buffer upload");
683 log::error!("{}", error);
684 RafxError::StringError(error)
685 })
686 }
687}