rafx_framework/resources/
dyn_commands.rs

1use crossbeam_channel::{Receiver, Sender};
2use fnv::FnvHashMap;
3use rafx_api::{
4    RafxCommandBuffer, RafxCommandBufferDef, RafxCommandPool, RafxCommandPoolDef, RafxQueue,
5    RafxResult,
6};
7use std::collections::BTreeMap;
8use std::ops::Deref;
9use std::sync::{Arc, Mutex};
10
11pub struct DynCommandBuffer(Arc<RafxCommandBuffer>);
12
13impl Deref for DynCommandBuffer {
14    type Target = RafxCommandBuffer;
15
16    fn deref(&self) -> &Self::Target {
17        &self.0
18    }
19}
20
21impl Clone for DynCommandBuffer {
22    fn clone(&self) -> Self {
23        DynCommandBuffer(self.0.clone())
24    }
25}
26
27/// Info we hash across to identify equivalent command pools, allowing us to share them
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct CommandPoolMeta {
30    queue_id: u32,
31    command_pool_def: RafxCommandPoolDef,
32}
33
34// Internally represents a VkCommandPool with automatic lifetime/reuse management
35struct DynCommandPoolInner {
36    command_pool: RafxCommandPool,
37    command_pool_meta: CommandPoolMeta,
38    allocated_command_buffers: Vec<DynCommandBuffer>,
39    submits_in_frame_index: u64,
40
41    // Just a debugging aid
42    pool_id: u64,
43}
44
45impl DynCommandPoolInner {
46    fn reset_command_pool(&mut self) -> RafxResult<()> {
47        for command_buffer in &self.allocated_command_buffers {
48            command_buffer.return_to_pool()?;
49        }
50
51        self.allocated_command_buffers.clear();
52        self.command_pool.reset_command_pool()
53    }
54}
55
56/// A helper that can be allocated as needed to create very short-lived command buffers. The object
57/// may not be persisted across frames. Instead, allocated a new one every frame. They are pooled,
58/// allocation is cheap and thread-safe.
59///
60/// This is designed for fire-and-forget command buffers. A DynCommandPool borrows a command pool
61/// that is not in use and not in flight, allocates out of it, resets itself after the appropriate
62/// number of frames pass, and returns itself to the pool for future reuse. See allocate_dyn_pool
63/// for more details
64pub struct DynCommandPool {
65    // This should never be None. We always allocate this to a non-none value and we don't clear
66    // it until the drop handler
67    inner: Option<DynCommandPoolInner>,
68    drop_tx: Sender<DynCommandPoolInner>,
69}
70
71impl DynCommandPool {
72    fn new(
73        inner: DynCommandPoolInner,
74        drop_tx: Sender<DynCommandPoolInner>,
75    ) -> Self {
76        log::trace!(
77            "Creating DynCommandPool({}) {:?}",
78            inner.pool_id,
79            inner.command_pool_meta
80        );
81
82        DynCommandPool {
83            inner: Some(inner),
84            drop_tx,
85        }
86    }
87
88    /// Allocate a command buffer and call begin() on it
89    pub fn allocate_dyn_command_buffer(
90        &mut self,
91        command_buffer_def: &RafxCommandBufferDef,
92    ) -> RafxResult<DynCommandBuffer> {
93        let inner = self.inner.as_mut().unwrap();
94        log::trace!(
95            "DynCommandPool({}) allocate_command_buffer: {:?}",
96            inner.pool_id,
97            command_buffer_def
98        );
99
100        let command_buffer = inner
101            .command_pool
102            .create_command_buffer(command_buffer_def)?;
103        //command_buffer.begin()?;
104
105        let command_buffer_inner = Arc::new(command_buffer);
106        let dyn_command_buffer = DynCommandBuffer(command_buffer_inner.clone());
107
108        inner
109            .allocated_command_buffers
110            .push(dyn_command_buffer.clone());
111        Ok(dyn_command_buffer)
112    }
113
114    /// Get the underlying pool within the allocator. The pool will be destroyed after
115    /// MAX_FRAMES_IN_FLIGHT pass, and all command buffers created with it must follow the same
116    /// restrictions as a command buffer created via begin_command_buffer/end_command_buffer. It's
117    /// recommended to use begin_writing/end_writing as it is less error prone.
118    pub fn pool(&mut self) -> &mut RafxCommandPool {
119        &mut self.inner.as_mut().unwrap().command_pool
120    }
121}
122
123impl Drop for DynCommandPool {
124    fn drop(&mut self) {
125        let inner = self.inner.take().unwrap();
126        self.drop_tx.send(inner).unwrap();
127    }
128}
129
130#[derive(Debug, Clone, PartialEq, Eq, Hash)]
131struct PendingCommandPoolMeta {
132    submits_in_frame_index: u64,
133    command_pool_meta: CommandPoolMeta,
134}
135
136struct DynCommandPoolAllocatorInner {
137    // Command pools that are ready to use but have no recorded commands
138    unused_pools: FnvHashMap<CommandPoolMeta, Vec<DynCommandPoolInner>>,
139
140    // Command pools that are in use and have a frame that we know they will be submitted in
141    pending_pools: FnvHashMap<PendingCommandPoolMeta, Vec<DynCommandPoolInner>>,
142
143    // submitted pools
144    // TODO: Would be less allocations if this was a static array of vecs
145    submitted_pools: BTreeMap<u64, Vec<DynCommandPoolInner>>,
146
147    max_frames_in_flight: u64,
148    current_frame_index: u64,
149
150    drop_tx: Sender<DynCommandPoolInner>,
151    drop_rx: Receiver<DynCommandPoolInner>,
152
153    // Just a debugging aid
154    next_pool_id: u64,
155}
156
157/// An allocator for DynCommandPools, objects that are short-lived and NOT persisted across
158/// frames. Meant for allocating command buffers that are usually single use and only for the
159/// current frame. The allocator is multi-thread friendly, but the pools themselves are not. So
160/// if writing command buffers from multiple threads, allocate a pool per thread.
161#[derive(Clone)]
162pub struct DynCommandPoolAllocator {
163    inner: Arc<Mutex<DynCommandPoolAllocatorInner>>,
164}
165
166impl DynCommandPoolAllocator {
167    /// Create an allocator for DynCommandPools.
168    pub fn new(max_frames_in_flight: u32) -> Self {
169        let (drop_tx, drop_rx) = crossbeam_channel::unbounded();
170
171        let inner = DynCommandPoolAllocatorInner {
172            max_frames_in_flight: max_frames_in_flight as u64,
173            pending_pools: Default::default(),
174            submitted_pools: Default::default(),
175            unused_pools: Default::default(),
176            current_frame_index: 0,
177            drop_tx,
178            drop_rx,
179            next_pool_id: 0,
180        };
181
182        DynCommandPoolAllocator {
183            inner: Arc::new(Mutex::new(inner)),
184        }
185    }
186
187    /// Allocates a pool. DynPools wrap CommandPools. The parameters match inputs for
188    /// CommandPool::new. `delay_submission_by_frame_count` indicates how many frames will pass
189    /// before the commands will be submitted (which affects how long-lived they will be). DO NOT
190    /// submit command buffers earlier than this as the commands pools themselves are pooled and
191    /// may be available to writing in future frames.
192    ///
193    /// The common case for delay_submission_by_frame_count is to pass 0. You might pass 1 if for
194    /// example, you are building a command buffer for frame N + 1 while frame N is not yet
195    /// submitted.
196    pub fn allocate_dyn_pool(
197        &self,
198        queue: &RafxQueue,
199        command_pool_def: &RafxCommandPoolDef,
200        delay_submission_by_frame_count: u64,
201    ) -> RafxResult<DynCommandPool> {
202        let mut guard = self.inner.lock().unwrap();
203
204        // Determine what frame this will be committed in
205        let submits_in_frame_index = guard.current_frame_index + delay_submission_by_frame_count;
206
207        // Build a key to search for an existing pool to reuse
208        let meta = PendingCommandPoolMeta {
209            submits_in_frame_index,
210            command_pool_meta: CommandPoolMeta {
211                queue_id: queue.queue_id(),
212                command_pool_def: command_pool_def.clone(),
213            },
214        };
215
216        log::trace!("DynCommandPoolAllocator::allocate_dyn_pool {:?}", meta);
217
218        Self::drain_drop_rx(&mut *guard);
219
220        // Try to find something in the pending collection and reuse it
221        if let Some(pools) = guard.pending_pools.get_mut(&meta) {
222            if let Some(pool) = pools.pop() {
223                log::trace!(
224                    "DynCommandPoolAllocator::allocate_dyn_pool {:?} reusing pending pool DynCommandPool({})",
225                    meta,
226                    pool.pool_id
227                );
228                assert_eq!(pool.submits_in_frame_index, submits_in_frame_index);
229                return Ok(DynCommandPool::new(pool, guard.drop_tx.clone()));
230            }
231        }
232
233        // If we don't have a "dirty" pool for this frame yet, try to reuse an existing unused one
234        if let Some(pools) = guard.unused_pools.get_mut(&meta.command_pool_meta) {
235            if let Some(mut pool) = pools.pop() {
236                log::trace!(
237                    "DynCommandPoolAllocator::allocate_dyn_pool {:?} reusing unused pool DynCommandPool({})",
238                    meta,
239                    pool.pool_id
240                );
241                pool.submits_in_frame_index = submits_in_frame_index;
242                return Ok(DynCommandPool::new(pool, guard.drop_tx.clone()));
243            }
244        }
245
246        let pool_id = guard.next_pool_id;
247        guard.next_pool_id += 1;
248
249        log::trace!(
250            "DynCommandPoolAllocator::allocate_dyn_pool {:?} creating new DynCommandPool({})",
251            meta,
252            pool_id
253        );
254
255        let command_pool_meta = CommandPoolMeta {
256            queue_id: queue.queue_id(),
257            command_pool_def: command_pool_def.clone(),
258        };
259
260        let command_pool = queue.create_command_pool(command_pool_def)?;
261
262        let inner = DynCommandPoolInner {
263            command_pool,
264            command_pool_meta,
265            allocated_command_buffers: Vec::default(),
266            submits_in_frame_index,
267            pool_id,
268        };
269
270        Ok(DynCommandPool::new(inner, guard.drop_tx.clone()))
271    }
272
273    /// Call every frame to recycle command pools that are no-longer in flight
274    #[profiling::function]
275    pub fn on_frame_complete(&self) -> RafxResult<()> {
276        let mut guard = self.inner.lock().unwrap();
277        log::trace!("DynCommandPoolAllocator::on_frame_complete: DynCommandPoolAllocator on_frame_complete finishing frame {}", guard.current_frame_index);
278
279        {
280            profiling::scope!("drain_drop_rx");
281            Self::drain_drop_rx(&mut *guard);
282        }
283
284        // Find any pending pools that should submit during this frame
285        let mut pending_pool_keys = Vec::default();
286        for key in guard.pending_pools.keys() {
287            if key.submits_in_frame_index == guard.current_frame_index {
288                pending_pool_keys.push(key.clone());
289            }
290        }
291
292        // Move them to the submitted pools collection
293        for key in pending_pool_keys {
294            let mut pending_pools = guard.pending_pools.remove(&key).unwrap();
295
296            for pending_pool in &pending_pools {
297                log::trace!(
298                    "DynCommandPoolAllocator::on_frame_complete: DynCommandPool({}) being moved to submitted pool list",
299                    pending_pool.pool_id,
300                );
301            }
302
303            guard
304                .submitted_pools
305                .entry(key.submits_in_frame_index)
306                .or_default()
307                .append(&mut pending_pools);
308        }
309
310        // Find all the submitted pools that are old enough to no longer be in flight
311        let mut submitted_pool_keys = Vec::default();
312        for &submits_in_frame_index in guard.submitted_pools.keys() {
313            // We can use >= here because we're bumping current_frame_index at the end of this
314            // function
315            if guard.current_frame_index >= submits_in_frame_index + guard.max_frames_in_flight {
316                submitted_pool_keys.push(submits_in_frame_index);
317            } else {
318                // The map is sorted by frame count
319                break;
320            }
321        }
322
323        // Move them to the unused collection
324        for key in submitted_pool_keys {
325            let submitted_pools = guard.submitted_pools.remove(&key).unwrap();
326            for mut submitted_pool in submitted_pools {
327                log::trace!(
328                    "DynCommandPoolAllocator::on_frame_complete: DynCommandPool({}) being moved to unused pool map",
329                    submitted_pool.pool_id,
330                );
331
332                let meta = submitted_pool.command_pool_meta.clone();
333                {
334                    profiling::scope!("reset_command_pool");
335                    submitted_pool.reset_command_pool()?;
336                }
337
338                guard
339                    .unused_pools
340                    .entry(meta)
341                    .or_default()
342                    .push(submitted_pool);
343            }
344        }
345
346        log::trace!("DynCommandPoolAllocator::on_frame_complete: DynCommandPoolAllocator on_frame_complete completed finishing frame {}", guard.current_frame_index);
347
348        // Bump current frame index
349        guard.current_frame_index += 1;
350        Ok(())
351    }
352
353    fn drain_drop_rx(inner: &mut DynCommandPoolAllocatorInner) {
354        for pool in inner.drop_rx.try_iter() {
355            if pool.submits_in_frame_index >= inner.current_frame_index {
356                // insert in pending
357                let meta = PendingCommandPoolMeta {
358                    submits_in_frame_index: pool.submits_in_frame_index,
359                    command_pool_meta: pool.command_pool_meta.clone(),
360                };
361
362                log::trace!(
363                    "DynCommandPoolAllocator::drain_drop_rx: dropped DynCommandPool({}) moved in pending map {:?}",
364                    pool.pool_id,
365                    meta,
366                );
367
368                inner.pending_pools.entry(meta).or_default().push(pool);
369            } else {
370                log::trace!(
371                    "DynCommandPoolAllocator::drain_drop_rx: dropped DynCommandPool({}) moved to submitted map {}",
372                    pool.pool_id,
373                    pool.submits_in_frame_index
374                );
375
376                // insert in submitted
377                inner
378                    .submitted_pools
379                    .entry(pool.submits_in_frame_index)
380                    .or_default()
381                    .push(pool);
382            }
383        }
384    }
385}