1use crate::{
2 RafxBuffer, RafxBufferDef, RafxCommandBuffer, RafxCommandBufferDef, RafxCommandPool,
3 RafxCommandPoolDef, RafxDeviceContext, RafxError, RafxFence, RafxFenceStatus, RafxMemoryUsage,
4 RafxQueue, RafxQueueType, RafxResourceType, RafxResult,
5};
6use crossbeam_channel::{Receiver, Sender};
7use std::ops::{Deref, DerefMut};
8
9pub struct RafxUploadBuffer {
13 buffer: Option<RafxBuffer>,
14 upload_buffer_released_tx: Sender<RafxBuffer>,
15}
16
17impl Drop for RafxUploadBuffer {
18 fn drop(&mut self) {
19 let buffer = self.buffer.take().unwrap();
21 self.upload_buffer_released_tx.send(buffer).unwrap();
23 }
24}
25
26impl RafxUploadBuffer {
27 fn buffer(&self) -> &RafxBuffer {
28 self.buffer.as_ref().unwrap()
29 }
30
31 fn buffer_mut(&mut self) -> &mut RafxBuffer {
32 self.buffer.as_mut().unwrap()
33 }
34}
35
36pub struct RafxUploadBufferPool {
37 _device_context: RafxDeviceContext,
38 buffer_count: u32,
39 buffer_size: u64,
40 unused_buffers: Vec<RafxBuffer>,
41 upload_buffer_released_tx: Sender<RafxBuffer>,
42 upload_buffer_released_rx: Receiver<RafxBuffer>,
43}
44
45impl Drop for RafxUploadBufferPool {
46 fn drop(&mut self) {
47 self.handle_dropped_buffers();
48 assert_eq!(self.unused_buffers.len(), self.buffer_count as usize);
50 }
51}
52
53impl RafxUploadBufferPool {
54 pub fn new(
55 device_context: &RafxDeviceContext,
56 buffer_count: u32,
57 buffer_size: u64,
58 ) -> RafxResult<Self> {
59 let (upload_buffer_released_tx, upload_buffer_released_rx) = crossbeam_channel::unbounded();
60 let mut unused_buffers = Vec::with_capacity(buffer_count as usize);
61
62 for i in 0..buffer_count {
63 let buffer = device_context.create_buffer(&RafxBufferDef {
64 size: buffer_size,
65 memory_usage: RafxMemoryUsage::CpuToGpu,
66 queue_type: RafxQueueType::Transfer,
67 resource_type: RafxResourceType::BUFFER,
68 ..Default::default()
69 })?;
70 if device_context.device_info().debug_names_enabled {
71 buffer.set_debug_name(format!("RafxUploadBufferPool {}", i));
72 }
73
74 unused_buffers.push(buffer);
75 }
76
77 Ok(RafxUploadBufferPool {
78 _device_context: device_context.clone(),
79 buffer_count,
80 buffer_size,
81 unused_buffers,
82 upload_buffer_released_tx,
83 upload_buffer_released_rx,
84 })
85 }
86
87 fn take(
88 &mut self,
89 required_size_bytes: u64,
90 ) -> RafxResult<RafxUploadBuffer> {
91 if self.buffer_size < required_size_bytes {
92 return Err(format!(
93 "Buffer of size {} requested but the pool's buffers are only {} in size",
94 required_size_bytes, self.buffer_size
95 ))?;
96 }
97
98 self.handle_dropped_buffers();
100
101 if let Some(buffer) = self.unused_buffers.pop() {
103 Ok(RafxUploadBuffer {
104 buffer: Some(buffer),
105 upload_buffer_released_tx: self.upload_buffer_released_tx.clone(),
106 })
107 } else {
108 Err("RafxUploadBufferPool has no more available buffers")?
109 }
110 }
111
112 fn handle_dropped_buffers(&mut self) {
114 for buffer in self.upload_buffer_released_rx.try_iter() {
115 self.unused_buffers.push(buffer);
116 }
117 }
118}
119
120#[derive(Debug)]
121pub enum RafxUploadError {
122 BufferFull,
123 Other(RafxError),
124}
125
126impl RafxUploadError {
127 pub fn into_rafx_error(self) -> RafxError {
129 self.into()
130 }
131}
132
133impl core::fmt::Display for RafxUploadError {
134 fn fmt(
135 &self,
136 fmt: &mut core::fmt::Formatter,
137 ) -> core::fmt::Result {
138 match *self {
139 RafxUploadError::BufferFull => write!(fmt, "UploadBufferFull"),
140 RafxUploadError::Other(ref e) => e.fmt(fmt),
141 }
142 }
143}
144
145impl From<RafxError> for RafxUploadError {
146 fn from(error: RafxError) -> Self {
147 RafxUploadError::Other(error)
148 }
149}
150
151impl Into<RafxError> for RafxUploadError {
152 fn into(self) -> RafxError {
153 match self {
154 RafxUploadError::BufferFull => {
155 RafxError::StringError("Upload buffer is full".to_string())
156 }
157 RafxUploadError::Other(e) => e,
158 }
159 }
160}
161
162impl From<&str> for RafxUploadError {
163 fn from(str: &str) -> Self {
164 RafxError::StringError(str.to_string()).into()
165 }
166}
167
168impl From<String> for RafxUploadError {
169 fn from(string: String) -> Self {
170 RafxError::StringError(string).into()
171 }
172}
173
174#[derive(PartialEq)]
175pub enum RafxUploadState {
176 Writable,
178
179 SentToGpu,
181
182 Complete,
184}
185
186enum UploadBuffer {
187 Pooled(RafxUploadBuffer),
188 NonPooled(RafxBuffer),
189}
190
191impl Deref for UploadBuffer {
192 type Target = RafxBuffer;
193
194 fn deref(&self) -> &Self::Target {
195 self.buffer()
196 }
197}
198
199impl DerefMut for UploadBuffer {
200 fn deref_mut(&mut self) -> &mut Self::Target {
201 self.buffer_mut()
202 }
203}
204
205impl UploadBuffer {
206 fn buffer(&self) -> &RafxBuffer {
207 match self {
208 UploadBuffer::Pooled(buffer) => buffer.buffer(),
209 UploadBuffer::NonPooled(buffer) => buffer,
210 }
211 }
212
213 fn buffer_mut(&mut self) -> &mut RafxBuffer {
214 match self {
215 UploadBuffer::Pooled(buffer) => buffer.buffer_mut(),
216 UploadBuffer::NonPooled(buffer) => buffer,
217 }
218 }
219}
220
221pub struct RafxUpload {
224 queue: RafxQueue,
225 command_pool: RafxCommandPool,
226 command_buffer: RafxCommandBuffer,
227
228 buffer: UploadBuffer,
229
230 writable: bool,
231 fence: RafxFence,
232
233 buffer_begin: *mut u8,
234 buffer_end: *mut u8,
235 buffer_write_pointer: *mut u8,
236}
237
238unsafe impl Send for RafxUpload {}
239unsafe impl Sync for RafxUpload {}
240
241impl RafxUpload {
242 pub fn new(
243 device_context: &RafxDeviceContext,
244 queue: &RafxQueue,
245 buffer_size: u64,
246 buffer_pool: Option<&mut RafxUploadBufferPool>,
247 ) -> RafxResult<Self> {
248 let mut command_pool =
252 queue.create_command_pool(&RafxCommandPoolDef { transient: true })?;
253 let command_buffer = command_pool.create_command_buffer(&RafxCommandBufferDef {
254 is_secondary: false,
255 })?;
256 command_buffer.begin()?;
257
258 let buffer = if let Some(buffer_pool) = buffer_pool {
259 UploadBuffer::Pooled(buffer_pool.take(buffer_size)?)
260 } else {
261 let buffer = device_context.create_buffer(&RafxBufferDef {
262 size: buffer_size,
263 memory_usage: RafxMemoryUsage::CpuToGpu,
264 queue_type: RafxQueueType::Transfer,
265 resource_type: RafxResourceType::BUFFER,
266 ..Default::default()
267 })?;
268 buffer.set_debug_name("RafxUpload Non-Pooled Buffer");
269 UploadBuffer::NonPooled(buffer)
270 };
271
272 let (buffer_begin, buffer_end, buffer_write_pointer) = unsafe {
273 let buffer_begin = buffer.map_buffer()?;
274
275 let buffer_end = buffer_begin.add(buffer_size as usize);
276 let buffer_write_pointer = buffer_begin;
277
278 (buffer_begin, buffer_end, buffer_write_pointer)
279 };
280
281 let fence = device_context.create_fence()?;
282
283 let upload = RafxUpload {
284 queue: queue.clone(),
285 command_pool,
286 command_buffer,
287 buffer,
288 fence,
289 writable: true,
290 buffer_begin,
291 buffer_end,
292 buffer_write_pointer,
293 };
294
295 Ok(upload)
296 }
297
298 pub fn has_space_available(
299 &self,
300 bytes_to_write: usize,
301 required_alignment: usize,
302 number_of_writes: usize,
303 ) -> bool {
304 let mut write_end_ptr = self.buffer_write_pointer as usize;
305
306 for _ in 0..number_of_writes {
307 let write_begin_ptr = ((write_end_ptr + required_alignment - 1) / required_alignment)
309 * required_alignment;
310 write_end_ptr = write_begin_ptr + bytes_to_write;
311 }
312
313 write_end_ptr <= self.buffer_end as usize
315 }
316
317 pub fn push(
318 &mut self,
319 data: &[u8],
320 required_alignment: usize,
321 ) -> Result<u64, RafxUploadError> {
322 log::trace!("Pushing {} bytes into upload", data.len());
323
324 if self.writable {
325 unsafe {
326 let write_begin_ptr = (((self.buffer_write_pointer as usize + required_alignment
328 - 1)
329 / required_alignment)
330 * required_alignment) as *mut u8; let write_end_ptr = write_begin_ptr.add(data.len());
332
333 if write_end_ptr > self.buffer_end {
335 Err(RafxUploadError::BufferFull)?;
336 }
337
338 std::ptr::copy_nonoverlapping(data.as_ptr(), write_begin_ptr, data.len());
339 self.buffer_write_pointer = write_end_ptr;
340
341 Ok(write_begin_ptr as u64 - self.buffer_begin as u64)
342 }
343 } else {
344 Err("Upload buffer is not writable")?
345 }
346 }
347
348 pub fn buffer_size(&self) -> u64 {
349 self.buffer_end as u64 - self.buffer_begin as u64
350 }
351
352 pub fn bytes_written(&self) -> u64 {
353 self.buffer_write_pointer as u64 - self.buffer_begin as u64
354 }
355
356 pub fn bytes_free(&self) -> u64 {
357 self.buffer_end as u64 - self.buffer_write_pointer as u64
358 }
359
360 pub fn command_pool(&self) -> &RafxCommandPool {
361 &self.command_pool
362 }
363
364 pub fn command_buffer(&self) -> &RafxCommandBuffer {
365 &self.command_buffer
366 }
367
368 pub fn staging_buffer(&self) -> &RafxBuffer {
369 &self.buffer
370 }
371
372 pub fn queue(&self) -> &RafxQueue {
373 &self.queue
374 }
375
376 pub fn submit(&mut self) -> RafxResult<()> {
377 if self.writable {
378 self.command_buffer.end()?;
379 self.queue
380 .submit(&[&self.command_buffer], &[], &[], Some(&self.fence))?;
381 self.writable = false;
382 }
383
384 Ok(())
385 }
386
387 pub fn state(&self) -> RafxResult<RafxUploadState> {
388 let state = if self.writable {
389 RafxUploadState::Writable
390 } else {
391 if self.fence.get_fence_status()? != RafxFenceStatus::Incomplete {
392 RafxUploadState::Complete
393 } else {
394 RafxUploadState::SentToGpu
395 }
396 };
397
398 Ok(state)
399 }
400
401 fn wait_for_idle(&self) -> RafxResult<()> {
402 self.fence.wait()
403 }
404}
405
406impl Drop for RafxUpload {
407 fn drop(&mut self) {
408 log::trace!("destroying RafxUpload");
409
410 self.wait_for_idle().unwrap();
412
413 self.buffer.unmap_buffer().unwrap();
414
415 log::trace!("destroyed RafxUpload");
418 }
419}
420
421#[derive(PartialEq)]
422pub enum RafxTransferUploadState {
423 Writable,
425
426 SentToTransferQueue,
428
429 PendingSubmitDstQueue,
434
435 SentToDstQueue,
437
438 Complete,
440}
441
442pub struct RafxTransferUpload {
445 upload: RafxUpload,
446
447 dst_queue: RafxQueue,
448 dst_command_pool: RafxCommandPool,
449 dst_command_buffer: RafxCommandBuffer,
450
451 dst_fence: RafxFence,
452 sent_to_dst_queue: bool,
453}
454
455impl RafxTransferUpload {
456 pub fn new(
457 device_context: &RafxDeviceContext,
458 transfer_queue: &RafxQueue,
459 dst_queue: &RafxQueue,
460 size: u64,
461 buffer_pool: Option<&mut RafxUploadBufferPool>,
462 ) -> RafxResult<Self> {
463 let mut dst_command_pool =
467 dst_queue.create_command_pool(&RafxCommandPoolDef { transient: true })?;
468 let dst_command_buffer = dst_command_pool.create_command_buffer(&RafxCommandBufferDef {
469 is_secondary: false,
470 })?;
471
472 dst_command_buffer.begin()?;
473
474 let upload = RafxUpload::new(device_context, transfer_queue, size, buffer_pool)?;
475
476 let dst_fence = device_context.create_fence()?;
477
478 Ok(RafxTransferUpload {
479 upload,
480 dst_queue: dst_queue.clone(),
481 dst_command_pool,
482 dst_command_buffer,
483 dst_fence,
484 sent_to_dst_queue: false,
485 })
486 }
487
488 pub fn has_space_available(
489 &self,
490 bytes_to_write: usize,
491 required_alignment: usize,
492 number_of_writes: usize,
493 ) -> bool {
494 self.upload
495 .has_space_available(bytes_to_write, required_alignment, number_of_writes)
496 }
497
498 pub fn push(
499 &mut self,
500 data: &[u8],
501 required_alignment: usize,
502 ) -> Result<u64, RafxUploadError> {
503 self.upload.push(data, required_alignment)
504 }
505
506 pub fn buffer_size(&self) -> u64 {
507 self.upload.buffer_size()
508 }
509
510 pub fn bytes_written(&self) -> u64 {
511 self.upload.bytes_written()
512 }
513
514 pub fn bytes_free(&self) -> u64 {
515 self.upload.bytes_free()
516 }
517
518 pub fn staging_buffer(&self) -> &RafxBuffer {
519 &self.upload.staging_buffer()
520 }
521
522 pub fn transfer_command_pool(&self) -> &RafxCommandPool {
523 self.upload.command_pool()
524 }
525
526 pub fn transfer_command_buffer(&self) -> &RafxCommandBuffer {
527 self.upload.command_buffer()
528 }
529
530 pub fn dst_command_pool(&self) -> &RafxCommandPool {
531 &self.dst_command_pool
532 }
533
534 pub fn dst_command_buffer(&self) -> &RafxCommandBuffer {
535 &self.dst_command_buffer
536 }
537
538 pub fn transfer_queue(&self) -> &RafxQueue {
539 self.upload.queue()
540 }
541
542 pub fn dst_queue(&self) -> &RafxQueue {
543 &self.dst_queue
544 }
545
546 pub fn submit_transfer(&mut self) -> RafxResult<()> {
547 self.upload.submit()
548 }
549
550 pub fn submit_dst(&mut self) -> RafxResult<()> {
551 if self.state()? == RafxTransferUploadState::PendingSubmitDstQueue {
552 self.dst_command_buffer.end()?;
553 self.dst_queue
554 .submit(&[&self.dst_command_buffer], &[], &[], Some(&self.dst_fence))?;
555 self.sent_to_dst_queue = true;
556 }
557
558 Ok(())
559 }
560
561 pub fn state(&self) -> RafxResult<RafxTransferUploadState> {
562 let state = if self.sent_to_dst_queue {
563 if self.dst_fence.get_fence_status()? != RafxFenceStatus::Incomplete {
564 RafxTransferUploadState::Complete
565 } else {
566 RafxTransferUploadState::SentToDstQueue
567 }
568 } else {
569 match self.upload.state()? {
570 RafxUploadState::Writable => RafxTransferUploadState::Writable,
571 RafxUploadState::SentToGpu => RafxTransferUploadState::SentToTransferQueue,
572 RafxUploadState::Complete => RafxTransferUploadState::PendingSubmitDstQueue,
573 }
574 };
575
576 Ok(state)
577 }
578
579 fn wait_for_idle(&self) -> RafxResult<()> {
580 if self.sent_to_dst_queue {
581 self.dst_fence.wait()
582 } else {
583 Ok(())
584 }
585 }
586
587 pub fn block_until_upload_complete(&mut self) -> RafxResult<()> {
588 log::trace!("wait on transfer queue {:?}", self.upload.queue);
589 self.submit_transfer()?;
590 loop {
591 if self.state()? == RafxTransferUploadState::PendingSubmitDstQueue {
592 break;
593 }
594 }
595
596 log::trace!("blocking on dst queue {:?}", self.dst_queue);
597 self.submit_dst()?;
598 loop {
599 if self.state()? == RafxTransferUploadState::Complete {
600 break;
601 }
602 }
603
604 Ok(())
605 }
606}
607
608impl Drop for RafxTransferUpload {
609 fn drop(&mut self) {
610 log::trace!("destroying RafxUpload");
611
612 self.upload.wait_for_idle().unwrap();
614 self.wait_for_idle().unwrap();
615
616 log::trace!("destroyed RafxUpload");
617 }
618}