use std::collections::HashMap;
use std::sync::{Arc, Mutex, MutexGuard, TryLockError, TryLockResult};
use anyhow::Result;
use ash::vk;
use crate::{Allocator, CmdBuffer, DefaultAllocator, Device, Error, Fence, PhysicalDevice};
use crate::command_buffer::*;
use crate::core::queue::{DeviceQueue, Queue};
use crate::pool::{Poolable, Pooled, ResourcePool};
use crate::sync::domain::ExecutionDomain;
use crate::sync::submit_batch::SubmitBatch;
#[derive(Debug, Clone)]
pub struct ExecutionManager<A: Allocator = DefaultAllocator> {
device: Device,
queues: Arc<Vec<Mutex<Queue>>>,
pool: ResourcePool<A>,
}
fn max_queue_count(family: u32, families: &[vk::QueueFamilyProperties]) -> u32 {
families.get(family as usize).unwrap().queue_count
}
impl<A: Allocator> ExecutionManager<A> {
pub fn new(
device: Device,
physical_device: &PhysicalDevice,
pool: ResourcePool<A>,
) -> Result<Self> {
let mut counts = HashMap::new();
let mut device_queues = HashMap::new();
let queues = physical_device
.queues()
.iter()
.map(|queue| -> Result<Mutex<Queue>> {
let index = counts.entry(queue.family_index).or_insert(0);
let device_queue = if *index
>= max_queue_count(queue.family_index, physical_device.queue_families())
{
device_queues.get(&queue.family_index).cloned().unwrap()
} else {
let device_queue = Arc::new(Mutex::new(DeviceQueue {
handle: unsafe { device.get_device_queue(queue.family_index, *index) },
}));
*counts.get_mut(&queue.family_index).unwrap() += 1;
device_queues.insert(queue.family_index, device_queue.clone());
device_queue
};
Ok(Mutex::new(Queue::new(
device.clone(),
device_queue,
*queue,
*physical_device
.queue_families()
.get(queue.family_index as usize)
.unwrap(),
)?))
})
.collect::<Result<Vec<Mutex<Queue>>>>()?;
info!("Created device queues:");
for queue in &queues {
let lock = queue.lock().unwrap();
let info = lock.info();
info!(
"Queue #{:?}({}) supports {:?} (dedicated: {}, can present: {})",
info.queue_type, info.family_index, info.flags, info.dedicated, info.can_present
)
}
Ok(ExecutionManager {
device,
queues: Arc::new(queues),
pool,
})
}
pub fn try_on_domain<'q, D: ExecutionDomain>(&'q self) -> Result<D::CmdBuf<'q, A>> {
let queue = self.try_get_queue::<D>().map_err(|_| Error::QueueLocked)?;
Queue::allocate_command_buffer::<'q, A, D::CmdBuf<'q, A>>(
self.device.clone(),
queue,
self.pool.pipelines.clone(),
self.pool.descriptors.clone(),
)
}
pub fn on_domain<'q, D: ExecutionDomain>(&'q self) -> Result<D::CmdBuf<'q, A>> {
let queue = self.get_queue::<D>().ok_or_else(|| Error::NoCapableQueue)?;
Queue::allocate_command_buffer::<'q, A, D::CmdBuf<'q, A>>(
self.device.clone(),
queue,
self.pool.pipelines.clone(),
self.pool.descriptors.clone(),
)
}
pub fn start_submit_batch<D: ExecutionDomain + 'static>(&self) -> Result<SubmitBatch<D, A>> {
SubmitBatch::new(self.device.clone(), self.clone(), &self.pool)
}
pub(crate) fn submit_batch<D: ExecutionDomain>(
&self,
submits: &[vk::SubmitInfo2],
fence: &Fence,
) -> Result<()> {
let queue = self.get_queue::<D>().ok_or_else(|| Error::NoCapableQueue)?;
queue.submit2(submits, Some(fence))?;
Ok(())
}
pub(crate) fn get_present_queue(&self) -> Option<MutexGuard<Queue>> {
self.queues
.iter()
.find(|&queue| queue.lock().unwrap().info().can_present)
.map(|q| q.lock().unwrap())
}
pub fn try_get_queue<D: ExecutionDomain>(&self) -> TryLockResult<MutexGuard<Queue>> {
let q = self.queues.iter().find(|&q| {
let q = q.try_lock();
match q {
Ok(queue) => D::queue_is_compatible(&queue),
Err(_) => false,
}
});
match q {
None => Err(TryLockError::WouldBlock),
Some(q) => Ok(q.lock()?),
}
}
pub fn get_queue<D: ExecutionDomain>(&self) -> Option<MutexGuard<Queue>> {
self.queues
.iter()
.find(|&q| {
let q = q.lock().unwrap();
D::queue_is_compatible(&q)
})
.map(|q| q.lock().unwrap())
}
}
impl<A: Allocator + 'static> ExecutionManager<A> {
pub fn submit<D: ExecutionDomain + 'static>(
&self,
mut cmd: CommandBuffer<D>,
) -> Result<Pooled<Fence>> {
let mut fence = Fence::new_in_pool(&self.pool.fences, &())?;
let handle = unsafe { cmd.handle() };
let command_buffer_info = vk::CommandBufferSubmitInfo {
s_type: vk::StructureType::COMMAND_BUFFER_SUBMIT_INFO,
p_next: std::ptr::null(),
command_buffer: handle,
device_mask: 0,
};
let info = vk::SubmitInfo2 {
s_type: vk::StructureType::SUBMIT_INFO_2,
p_next: std::ptr::null(),
flags: Default::default(),
wait_semaphore_info_count: 0,
p_wait_semaphore_infos: std::ptr::null(),
command_buffer_info_count: 1,
p_command_buffer_infos: &command_buffer_info,
signal_semaphore_info_count: 0,
p_signal_semaphore_infos: std::ptr::null(),
};
let queue = self.get_queue::<D>().ok_or_else(|| Error::NoCapableQueue)?;
queue.submit2(std::slice::from_ref(&info), Some(&fence))?;
let exec = self.clone();
fence.replace(move |fence| {
fence.with_cleanup(move || unsafe {
cmd.delete(exec).unwrap();
})
});
Ok(fence)
}
}