1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
use crossbeam_channel::{Receiver, Sender};
use fnv::FnvHashMap;
use rafx_api::{
    RafxCommandBuffer, RafxCommandBufferDef, RafxCommandPool, RafxCommandPoolDef, RafxQueue,
    RafxResult,
};
use std::collections::BTreeMap;
use std::ops::Deref;
use std::sync::{Arc, Mutex};

pub struct DynCommandBuffer(Arc<RafxCommandBuffer>);

impl Deref for DynCommandBuffer {
    type Target = RafxCommandBuffer;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl Clone for DynCommandBuffer {
    fn clone(&self) -> Self {
        DynCommandBuffer(self.0.clone())
    }
}

/// Info we hash across to identify equivalent command pools, allowing us to share them
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CommandPoolMeta {
    queue_id: u32,
    command_pool_def: RafxCommandPoolDef,
}

// Internally represents a VkCommandPool with automatic lifetime/reuse management
struct DynCommandPoolInner {
    command_pool: RafxCommandPool,
    command_pool_meta: CommandPoolMeta,
    allocated_command_buffers: Vec<DynCommandBuffer>,
    submits_in_frame_index: u64,

    // Just a debugging aid
    pool_id: u64,
}

impl DynCommandPoolInner {
    fn reset_command_pool(&mut self) -> RafxResult<()> {
        for command_buffer in &self.allocated_command_buffers {
            command_buffer.return_to_pool()?;
        }

        self.allocated_command_buffers.clear();
        self.command_pool.reset_command_pool()
    }
}

/// A helper that can be allocated as needed to create very short-lived command buffers. The object
/// may not be persisted across frames. Instead, allocated a new one every frame. They are pooled,
/// allocation is cheap and thread-safe.
///
/// This is designed for fire-and-forget command buffers. A DynCommandPool borrows a command pool
/// that is not in use and not in flight, allocates out of it, resets itself after the appropriate
/// number of frames pass, and returns itself to the pool for future reuse. See allocate_dyn_pool
/// for more details
pub struct DynCommandPool {
    // This should never be None. We always allocate this to a non-none value and we don't clear
    // it until the drop handler
    inner: Option<DynCommandPoolInner>,
    drop_tx: Sender<DynCommandPoolInner>,
}

impl DynCommandPool {
    fn new(
        inner: DynCommandPoolInner,
        drop_tx: Sender<DynCommandPoolInner>,
    ) -> Self {
        log::trace!(
            "Creating DynCommandPool({}) {:?}",
            inner.pool_id,
            inner.command_pool_meta
        );

        DynCommandPool {
            inner: Some(inner),
            drop_tx,
        }
    }

    /// Allocate a command buffer and call begin() on it
    pub fn allocate_dyn_command_buffer(
        &mut self,
        command_buffer_def: &RafxCommandBufferDef,
    ) -> RafxResult<DynCommandBuffer> {
        let inner = self.inner.as_mut().unwrap();
        log::trace!(
            "DynCommandPool({}) allocate_command_buffer: {:?}",
            inner.pool_id,
            command_buffer_def
        );

        let command_buffer = inner
            .command_pool
            .create_command_buffer(command_buffer_def)?;
        //command_buffer.begin()?;

        let command_buffer_inner = Arc::new(command_buffer);
        let dyn_command_buffer = DynCommandBuffer(command_buffer_inner.clone());

        inner
            .allocated_command_buffers
            .push(dyn_command_buffer.clone());
        Ok(dyn_command_buffer)
    }

    /// Get the underlying pool within the allocator. The pool will be destroyed after
    /// MAX_FRAMES_IN_FLIGHT pass, and all command buffers created with it must follow the same
    /// restrictions as a command buffer created via begin_command_buffer/end_command_buffer. It's
    /// recommended to use begin_writing/end_writing as it is less error prone.
    pub fn pool(&mut self) -> &mut RafxCommandPool {
        &mut self.inner.as_mut().unwrap().command_pool
    }
}

impl Drop for DynCommandPool {
    fn drop(&mut self) {
        let inner = self.inner.take().unwrap();
        self.drop_tx.send(inner).unwrap();
    }
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct PendingCommandPoolMeta {
    submits_in_frame_index: u64,
    command_pool_meta: CommandPoolMeta,
}

struct DynCommandPoolAllocatorInner {
    // Command pools that are ready to use but have no recorded commands
    unused_pools: FnvHashMap<CommandPoolMeta, Vec<DynCommandPoolInner>>,

    // Command pools that are in use and have a frame that we know they will be submitted in
    pending_pools: FnvHashMap<PendingCommandPoolMeta, Vec<DynCommandPoolInner>>,

    // submitted pools
    // TODO: Would be less allocations if this was a static array of vecs
    submitted_pools: BTreeMap<u64, Vec<DynCommandPoolInner>>,

    max_frames_in_flight: u64,
    current_frame_index: u64,

    drop_tx: Sender<DynCommandPoolInner>,
    drop_rx: Receiver<DynCommandPoolInner>,

    // Just a debugging aid
    next_pool_id: u64,
}

/// An allocator for DynCommandPools, objects that are short-lived and NOT persisted across
/// frames. Meant for allocating command buffers that are usually single use and only for the
/// current frame. The allocator is multi-thread friendly, but the pools themselves are not. So
/// if writing command buffers from multiple threads, allocate a pool per thread.
#[derive(Clone)]
pub struct DynCommandPoolAllocator {
    inner: Arc<Mutex<DynCommandPoolAllocatorInner>>,
}

impl DynCommandPoolAllocator {
    /// Create an allocator for DynCommandPools.
    pub fn new(max_frames_in_flight: u32) -> Self {
        let (drop_tx, drop_rx) = crossbeam_channel::unbounded();

        let inner = DynCommandPoolAllocatorInner {
            max_frames_in_flight: max_frames_in_flight as u64,
            pending_pools: Default::default(),
            submitted_pools: Default::default(),
            unused_pools: Default::default(),
            current_frame_index: 0,
            drop_tx,
            drop_rx,
            next_pool_id: 0,
        };

        DynCommandPoolAllocator {
            inner: Arc::new(Mutex::new(inner)),
        }
    }

    /// Allocates a pool. DynPools wrap CommandPools. The parameters match inputs for
    /// CommandPool::new. `delay_submission_by_frame_count` indicates how many frames will pass
    /// before the commands will be submitted (which affects how long-lived they will be). DO NOT
    /// submit command buffers earlier than this as the commands pools themselves are pooled and
    /// may be available to writing in future frames.
    ///
    /// The common case for delay_submission_by_frame_count is to pass 0. You might pass 1 if for
    /// example, you are building a command buffer for frame N + 1 while frame N is not yet
    /// submitted.
    pub fn allocate_dyn_pool(
        &self,
        queue: &RafxQueue,
        command_pool_def: &RafxCommandPoolDef,
        delay_submission_by_frame_count: u64,
    ) -> RafxResult<DynCommandPool> {
        let mut guard = self.inner.lock().unwrap();

        // Determine what frame this will be committed in
        let submits_in_frame_index = guard.current_frame_index + delay_submission_by_frame_count;

        // Build a key to search for an existing pool to reuse
        let meta = PendingCommandPoolMeta {
            submits_in_frame_index,
            command_pool_meta: CommandPoolMeta {
                queue_id: queue.queue_id(),
                command_pool_def: command_pool_def.clone(),
            },
        };

        log::trace!("DynCommandPoolAllocator::allocate_dyn_pool {:?}", meta);

        Self::drain_drop_rx(&mut *guard);

        // Try to find something in the pending collection and reuse it
        if let Some(pools) = guard.pending_pools.get_mut(&meta) {
            if let Some(pool) = pools.pop() {
                log::trace!(
                    "DynCommandPoolAllocator::allocate_dyn_pool {:?} reusing pending pool DynCommandPool({})",
                    meta,
                    pool.pool_id
                );
                assert_eq!(pool.submits_in_frame_index, submits_in_frame_index);
                return Ok(DynCommandPool::new(pool, guard.drop_tx.clone()));
            }
        }

        // If we don't have a "dirty" pool for this frame yet, try to reuse an existing unused one
        if let Some(pools) = guard.unused_pools.get_mut(&meta.command_pool_meta) {
            if let Some(mut pool) = pools.pop() {
                log::trace!(
                    "DynCommandPoolAllocator::allocate_dyn_pool {:?} reusing unused pool DynCommandPool({})",
                    meta,
                    pool.pool_id
                );
                pool.submits_in_frame_index = submits_in_frame_index;
                return Ok(DynCommandPool::new(pool, guard.drop_tx.clone()));
            }
        }

        let pool_id = guard.next_pool_id;
        guard.next_pool_id += 1;

        log::trace!(
            "DynCommandPoolAllocator::allocate_dyn_pool {:?} creating new DynCommandPool({})",
            meta,
            pool_id
        );

        let command_pool_meta = CommandPoolMeta {
            queue_id: queue.queue_id(),
            command_pool_def: command_pool_def.clone(),
        };

        let command_pool = queue.create_command_pool(command_pool_def)?;

        let inner = DynCommandPoolInner {
            command_pool,
            command_pool_meta,
            allocated_command_buffers: Vec::default(),
            submits_in_frame_index,
            pool_id,
        };

        Ok(DynCommandPool::new(inner, guard.drop_tx.clone()))
    }

    /// Call every frame to recycle command pools that are no-longer in flight
    #[profiling::function]
    pub fn on_frame_complete(&self) -> RafxResult<()> {
        let mut guard = self.inner.lock().unwrap();
        log::trace!("DynCommandPoolAllocator::on_frame_complete: DynCommandPoolAllocator on_frame_complete finishing frame {}", guard.current_frame_index);

        {
            profiling::scope!("drain_drop_rx");
            Self::drain_drop_rx(&mut *guard);
        }

        // Find any pending pools that should submit during this frame
        let mut pending_pool_keys = Vec::default();
        for key in guard.pending_pools.keys() {
            if key.submits_in_frame_index == guard.current_frame_index {
                pending_pool_keys.push(key.clone());
            }
        }

        // Move them to the submitted pools collection
        for key in pending_pool_keys {
            let mut pending_pools = guard.pending_pools.remove(&key).unwrap();

            for pending_pool in &pending_pools {
                log::trace!(
                    "DynCommandPoolAllocator::on_frame_complete: DynCommandPool({}) being moved to submitted pool list",
                    pending_pool.pool_id,
                );
            }

            guard
                .submitted_pools
                .entry(key.submits_in_frame_index)
                .or_default()
                .append(&mut pending_pools);
        }

        // Find all the submitted pools that are old enough to no longer be in flight
        let mut submitted_pool_keys = Vec::default();
        for &submits_in_frame_index in guard.submitted_pools.keys() {
            // We can use >= here because we're bumping current_frame_index at the end of this
            // function
            if guard.current_frame_index >= submits_in_frame_index + guard.max_frames_in_flight {
                submitted_pool_keys.push(submits_in_frame_index);
            } else {
                // The map is sorted by frame count
                break;
            }
        }

        // Move them to the unused collection
        for key in submitted_pool_keys {
            let submitted_pools = guard.submitted_pools.remove(&key).unwrap();
            for mut submitted_pool in submitted_pools {
                log::trace!(
                    "DynCommandPoolAllocator::on_frame_complete: DynCommandPool({}) being moved to unused pool map",
                    submitted_pool.pool_id,
                );

                let meta = submitted_pool.command_pool_meta.clone();
                {
                    profiling::scope!("reset_command_pool");
                    submitted_pool.reset_command_pool()?;
                }

                guard
                    .unused_pools
                    .entry(meta)
                    .or_default()
                    .push(submitted_pool);
            }
        }

        log::trace!("DynCommandPoolAllocator::on_frame_complete: DynCommandPoolAllocator on_frame_complete completed finishing frame {}", guard.current_frame_index);

        // Bump current frame index
        guard.current_frame_index += 1;
        Ok(())
    }

    fn drain_drop_rx(inner: &mut DynCommandPoolAllocatorInner) {
        for pool in inner.drop_rx.try_iter() {
            if pool.submits_in_frame_index >= inner.current_frame_index {
                // insert in pending
                let meta = PendingCommandPoolMeta {
                    submits_in_frame_index: pool.submits_in_frame_index,
                    command_pool_meta: pool.command_pool_meta.clone(),
                };

                log::trace!(
                    "DynCommandPoolAllocator::drain_drop_rx: dropped DynCommandPool({}) moved in pending map {:?}",
                    pool.pool_id,
                    meta,
                );

                inner.pending_pools.entry(meta).or_default().push(pool);
            } else {
                log::trace!(
                    "DynCommandPoolAllocator::drain_drop_rx: dropped DynCommandPool({}) moved to submitted map {}",
                    pool.pool_id,
                    pool.submits_in_frame_index
                );

                // insert in submitted
                inner
                    .submitted_pools
                    .entry(pool.submits_in_frame_index)
                    .or_default()
                    .push(pool);
            }
        }
    }
}