rafx_framework/resources/
dyn_commands.rs1use crossbeam_channel::{Receiver, Sender};
2use fnv::FnvHashMap;
3use rafx_api::{
4 RafxCommandBuffer, RafxCommandBufferDef, RafxCommandPool, RafxCommandPoolDef, RafxQueue,
5 RafxResult,
6};
7use std::collections::BTreeMap;
8use std::ops::Deref;
9use std::sync::{Arc, Mutex};
10
11pub struct DynCommandBuffer(Arc<RafxCommandBuffer>);
12
13impl Deref for DynCommandBuffer {
14 type Target = RafxCommandBuffer;
15
16 fn deref(&self) -> &Self::Target {
17 &self.0
18 }
19}
20
21impl Clone for DynCommandBuffer {
22 fn clone(&self) -> Self {
23 DynCommandBuffer(self.0.clone())
24 }
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct CommandPoolMeta {
30 queue_id: u32,
31 command_pool_def: RafxCommandPoolDef,
32}
33
34struct DynCommandPoolInner {
36 command_pool: RafxCommandPool,
37 command_pool_meta: CommandPoolMeta,
38 allocated_command_buffers: Vec<DynCommandBuffer>,
39 submits_in_frame_index: u64,
40
41 pool_id: u64,
43}
44
45impl DynCommandPoolInner {
46 fn reset_command_pool(&mut self) -> RafxResult<()> {
47 for command_buffer in &self.allocated_command_buffers {
48 command_buffer.return_to_pool()?;
49 }
50
51 self.allocated_command_buffers.clear();
52 self.command_pool.reset_command_pool()
53 }
54}
55
56pub struct DynCommandPool {
65 inner: Option<DynCommandPoolInner>,
68 drop_tx: Sender<DynCommandPoolInner>,
69}
70
71impl DynCommandPool {
72 fn new(
73 inner: DynCommandPoolInner,
74 drop_tx: Sender<DynCommandPoolInner>,
75 ) -> Self {
76 log::trace!(
77 "Creating DynCommandPool({}) {:?}",
78 inner.pool_id,
79 inner.command_pool_meta
80 );
81
82 DynCommandPool {
83 inner: Some(inner),
84 drop_tx,
85 }
86 }
87
88 pub fn allocate_dyn_command_buffer(
90 &mut self,
91 command_buffer_def: &RafxCommandBufferDef,
92 ) -> RafxResult<DynCommandBuffer> {
93 let inner = self.inner.as_mut().unwrap();
94 log::trace!(
95 "DynCommandPool({}) allocate_command_buffer: {:?}",
96 inner.pool_id,
97 command_buffer_def
98 );
99
100 let command_buffer = inner
101 .command_pool
102 .create_command_buffer(command_buffer_def)?;
103 let command_buffer_inner = Arc::new(command_buffer);
106 let dyn_command_buffer = DynCommandBuffer(command_buffer_inner.clone());
107
108 inner
109 .allocated_command_buffers
110 .push(dyn_command_buffer.clone());
111 Ok(dyn_command_buffer)
112 }
113
114 pub fn pool(&mut self) -> &mut RafxCommandPool {
119 &mut self.inner.as_mut().unwrap().command_pool
120 }
121}
122
123impl Drop for DynCommandPool {
124 fn drop(&mut self) {
125 let inner = self.inner.take().unwrap();
126 self.drop_tx.send(inner).unwrap();
127 }
128}
129
130#[derive(Debug, Clone, PartialEq, Eq, Hash)]
131struct PendingCommandPoolMeta {
132 submits_in_frame_index: u64,
133 command_pool_meta: CommandPoolMeta,
134}
135
136struct DynCommandPoolAllocatorInner {
137 unused_pools: FnvHashMap<CommandPoolMeta, Vec<DynCommandPoolInner>>,
139
140 pending_pools: FnvHashMap<PendingCommandPoolMeta, Vec<DynCommandPoolInner>>,
142
143 submitted_pools: BTreeMap<u64, Vec<DynCommandPoolInner>>,
146
147 max_frames_in_flight: u64,
148 current_frame_index: u64,
149
150 drop_tx: Sender<DynCommandPoolInner>,
151 drop_rx: Receiver<DynCommandPoolInner>,
152
153 next_pool_id: u64,
155}
156
157#[derive(Clone)]
162pub struct DynCommandPoolAllocator {
163 inner: Arc<Mutex<DynCommandPoolAllocatorInner>>,
164}
165
166impl DynCommandPoolAllocator {
167 pub fn new(max_frames_in_flight: u32) -> Self {
169 let (drop_tx, drop_rx) = crossbeam_channel::unbounded();
170
171 let inner = DynCommandPoolAllocatorInner {
172 max_frames_in_flight: max_frames_in_flight as u64,
173 pending_pools: Default::default(),
174 submitted_pools: Default::default(),
175 unused_pools: Default::default(),
176 current_frame_index: 0,
177 drop_tx,
178 drop_rx,
179 next_pool_id: 0,
180 };
181
182 DynCommandPoolAllocator {
183 inner: Arc::new(Mutex::new(inner)),
184 }
185 }
186
187 pub fn allocate_dyn_pool(
197 &self,
198 queue: &RafxQueue,
199 command_pool_def: &RafxCommandPoolDef,
200 delay_submission_by_frame_count: u64,
201 ) -> RafxResult<DynCommandPool> {
202 let mut guard = self.inner.lock().unwrap();
203
204 let submits_in_frame_index = guard.current_frame_index + delay_submission_by_frame_count;
206
207 let meta = PendingCommandPoolMeta {
209 submits_in_frame_index,
210 command_pool_meta: CommandPoolMeta {
211 queue_id: queue.queue_id(),
212 command_pool_def: command_pool_def.clone(),
213 },
214 };
215
216 log::trace!("DynCommandPoolAllocator::allocate_dyn_pool {:?}", meta);
217
218 Self::drain_drop_rx(&mut *guard);
219
220 if let Some(pools) = guard.pending_pools.get_mut(&meta) {
222 if let Some(pool) = pools.pop() {
223 log::trace!(
224 "DynCommandPoolAllocator::allocate_dyn_pool {:?} reusing pending pool DynCommandPool({})",
225 meta,
226 pool.pool_id
227 );
228 assert_eq!(pool.submits_in_frame_index, submits_in_frame_index);
229 return Ok(DynCommandPool::new(pool, guard.drop_tx.clone()));
230 }
231 }
232
233 if let Some(pools) = guard.unused_pools.get_mut(&meta.command_pool_meta) {
235 if let Some(mut pool) = pools.pop() {
236 log::trace!(
237 "DynCommandPoolAllocator::allocate_dyn_pool {:?} reusing unused pool DynCommandPool({})",
238 meta,
239 pool.pool_id
240 );
241 pool.submits_in_frame_index = submits_in_frame_index;
242 return Ok(DynCommandPool::new(pool, guard.drop_tx.clone()));
243 }
244 }
245
246 let pool_id = guard.next_pool_id;
247 guard.next_pool_id += 1;
248
249 log::trace!(
250 "DynCommandPoolAllocator::allocate_dyn_pool {:?} creating new DynCommandPool({})",
251 meta,
252 pool_id
253 );
254
255 let command_pool_meta = CommandPoolMeta {
256 queue_id: queue.queue_id(),
257 command_pool_def: command_pool_def.clone(),
258 };
259
260 let command_pool = queue.create_command_pool(command_pool_def)?;
261
262 let inner = DynCommandPoolInner {
263 command_pool,
264 command_pool_meta,
265 allocated_command_buffers: Vec::default(),
266 submits_in_frame_index,
267 pool_id,
268 };
269
270 Ok(DynCommandPool::new(inner, guard.drop_tx.clone()))
271 }
272
273 #[profiling::function]
275 pub fn on_frame_complete(&self) -> RafxResult<()> {
276 let mut guard = self.inner.lock().unwrap();
277 log::trace!("DynCommandPoolAllocator::on_frame_complete: DynCommandPoolAllocator on_frame_complete finishing frame {}", guard.current_frame_index);
278
279 {
280 profiling::scope!("drain_drop_rx");
281 Self::drain_drop_rx(&mut *guard);
282 }
283
284 let mut pending_pool_keys = Vec::default();
286 for key in guard.pending_pools.keys() {
287 if key.submits_in_frame_index == guard.current_frame_index {
288 pending_pool_keys.push(key.clone());
289 }
290 }
291
292 for key in pending_pool_keys {
294 let mut pending_pools = guard.pending_pools.remove(&key).unwrap();
295
296 for pending_pool in &pending_pools {
297 log::trace!(
298 "DynCommandPoolAllocator::on_frame_complete: DynCommandPool({}) being moved to submitted pool list",
299 pending_pool.pool_id,
300 );
301 }
302
303 guard
304 .submitted_pools
305 .entry(key.submits_in_frame_index)
306 .or_default()
307 .append(&mut pending_pools);
308 }
309
310 let mut submitted_pool_keys = Vec::default();
312 for &submits_in_frame_index in guard.submitted_pools.keys() {
313 if guard.current_frame_index >= submits_in_frame_index + guard.max_frames_in_flight {
316 submitted_pool_keys.push(submits_in_frame_index);
317 } else {
318 break;
320 }
321 }
322
323 for key in submitted_pool_keys {
325 let submitted_pools = guard.submitted_pools.remove(&key).unwrap();
326 for mut submitted_pool in submitted_pools {
327 log::trace!(
328 "DynCommandPoolAllocator::on_frame_complete: DynCommandPool({}) being moved to unused pool map",
329 submitted_pool.pool_id,
330 );
331
332 let meta = submitted_pool.command_pool_meta.clone();
333 {
334 profiling::scope!("reset_command_pool");
335 submitted_pool.reset_command_pool()?;
336 }
337
338 guard
339 .unused_pools
340 .entry(meta)
341 .or_default()
342 .push(submitted_pool);
343 }
344 }
345
346 log::trace!("DynCommandPoolAllocator::on_frame_complete: DynCommandPoolAllocator on_frame_complete completed finishing frame {}", guard.current_frame_index);
347
348 guard.current_frame_index += 1;
350 Ok(())
351 }
352
353 fn drain_drop_rx(inner: &mut DynCommandPoolAllocatorInner) {
354 for pool in inner.drop_rx.try_iter() {
355 if pool.submits_in_frame_index >= inner.current_frame_index {
356 let meta = PendingCommandPoolMeta {
358 submits_in_frame_index: pool.submits_in_frame_index,
359 command_pool_meta: pool.command_pool_meta.clone(),
360 };
361
362 log::trace!(
363 "DynCommandPoolAllocator::drain_drop_rx: dropped DynCommandPool({}) moved in pending map {:?}",
364 pool.pool_id,
365 meta,
366 );
367
368 inner.pending_pools.entry(meta).or_default().push(pool);
369 } else {
370 log::trace!(
371 "DynCommandPoolAllocator::drain_drop_rx: dropped DynCommandPool({}) moved to submitted map {}",
372 pool.pool_id,
373 pool.submits_in_frame_index
374 );
375
376 inner
378 .submitted_pools
379 .entry(pool.submits_in_frame_index)
380 .or_default()
381 .push(pool);
382 }
383 }
384 }
385}