use std::future::Future;
use std::sync::{Arc, RwLock};
#[cfg(feature = "main-thread")]
use std::sync::atomic::{AtomicU32, AtomicU8};
#[cfg(feature = "main-thread")]
use crossbeam_channel::unbounded as channel;
use mvutils::id_eq;
use mvutils::utils::next_id;
#[cfg(feature = "main-thread")]
use mvutils::utils::Recover;
use crate::block::Signal;
use crate::queue::Queue;
#[cfg(feature = "main-thread")]
use crate::queue::WorkerThread;
use crate::sync::{Fence, Semaphore};
use crate::task::{Task, TaskHandle, TaskState};
#[cfg(feature = "command-buffers")]
use crate::command_buffers::buffer::{CommandBuffer, CommandBufferAllocationError};
pub mod prelude;
pub(crate) mod run;
pub mod queue;
pub mod task;
pub mod sync;
pub mod block;
pub mod utils;
pub mod timer;
#[cfg(feature = "command-buffers")]
pub mod command_buffers;
pub trait MVSynced: Send + Sync + 'static {}
impl<T> MVSynced for T where T: Send + Sync + 'static {}
pub struct MVSync {
id: u64,
queue: Arc<Queue>,
signal: Arc<Signal>,
#[cfg(feature = "main-thread")]
worker: RwLock<Option<WorkerThread>>
}
impl MVSync {
pub fn new(specs: MVSyncSpecs) -> Arc<MVSync> {
next_id("MVSync");
let signal = Arc::new(Signal::new());
Arc::new(MVSync {
id: next_id("MVSync"),
queue: Arc::new(Queue::new(specs, vec![], signal.clone())),
signal,
#[cfg(feature = "main-thread")]
worker: RwLock::new(None)
})
}
pub fn labelled(specs: MVSyncSpecs, labels: Vec<&'static str>) -> Arc<MVSync> {
next_id("MVSync");
let signal = Arc::new(Signal::new());
Arc::new(MVSync {
id: next_id("MVSync"),
queue: Arc::new(Queue::new(specs, labels.into_iter().map(ToString::to_string).collect(), signal.clone())),
signal,
#[cfg(feature = "main-thread")]
worker: RwLock::new(None)
})
}
#[cfg(feature = "main-thread")]
pub fn register_main_thread<F: Future<Output = ()> + Send>(self: &Arc<MVSync>, init: impl FnOnce(Arc<MVSync>) -> F + Send + 'static, end_when_done: bool) {
let mut worker = self.worker.write().recover();
if worker.is_none() || worker.as_ref().unwrap().finished() {
let signal = Arc::new(Signal::new());
let (sender, receiver) = channel();
let free_workers = Arc::new(AtomicU32::new(4294967295));
let access = free_workers.clone();
let signal_clone = signal.clone();
let end = Arc::new(AtomicU8::new(0));
let end_clone = end.clone();
let thread = WorkerThread {
id: next_id("MVSync"),
sender,
label: None,
free_workers,
end,
signal
};
worker.replace(thread);
let this = self.clone();
let (task, _) = self.create_async_task(|| async move {
init(this).await;
});
worker.as_ref().unwrap().send(task);
drop(worker);
WorkerThread::run(receiver, access, signal_clone, end_clone, end_when_done);
}
else {
panic!("This MVSync instance already has a main thread registered!");
}
}
#[cfg(feature = "main-thread")]
pub fn submit_to_main_thread(self: &Arc<MVSync>, task: Task) {
let worker = self.worker.read().recover();
if worker.is_none() || worker.as_ref().unwrap().ended() {
panic!("This MVSync instance does not have a main thread registered!");
}
else {
worker.as_ref().unwrap().send(task);
}
}
#[cfg(feature = "main-thread")]
pub fn end_main_thread(self: &Arc<MVSync>) {
let worker = self.worker.write().recover();
if !(worker.is_none() || worker.as_ref().unwrap().ended()) {
worker.as_ref().unwrap().end();
}
}
#[cfg(feature = "main-thread")]
pub fn main_thread_ended(self: &Arc<MVSync>) -> bool {
let worker = self.worker.write().recover();
worker.is_none() || worker.as_ref().unwrap().ended()
}
#[cfg(feature = "main-thread")]
pub fn main_thread_finished(self: &Arc<MVSync>) -> bool {
let worker = self.worker.write().recover();
worker.is_none() || worker.as_ref().unwrap().finished()
}
pub fn get_queue(self: &Arc<MVSync>) -> Arc<Queue> {
self.queue.clone()
}
pub fn create_semaphore(self: &Arc<MVSync>) -> Arc<Semaphore> {
Arc::new(Semaphore::new())
}
pub fn create_fence(self: &Arc<MVSync>) -> Arc<Fence> {
Arc::new(Fence::new())
}
#[cfg(feature = "command-buffers")]
pub fn allocate_command_buffer(self: &Arc<MVSync>) -> Result<CommandBuffer, CommandBufferAllocationError> {
CommandBuffer::new(self.signal.clone())
}
pub fn create_task<T: MVSynced>(self: &Arc<MVSync>, function: impl FnOnce() -> T + Send + 'static) -> (Task, TaskHandle<T>) {
let buffer = Arc::new(RwLock::new(None));
let state = Arc::new(RwLock::new(TaskState::Pending));
let signal = Arc::new(Signal::new());
let result = TaskHandle::new(buffer.clone(), state.clone(), signal.clone());
let task = Task::from_function(function, buffer, state, [signal, self.signal.clone()]);
(task, result)
}
pub fn create_continuation<T: MVSynced, R: MVSynced>(self: &Arc<MVSync>, function: impl FnOnce(T) -> R + Send + 'static, predecessor: TaskHandle<T>) -> (Task, TaskHandle<R>) {
let buffer = Arc::new(RwLock::new(None));
let state = Arc::new(RwLock::new(TaskState::Pending));
let signal = Arc::new(Signal::new());
let result = TaskHandle::new(buffer.clone(), state.clone(), signal.clone());
let task = Task::from_continuation(function, buffer, state, [signal, self.signal.clone()], predecessor);
(task, result)
}
pub fn create_async_task<T: MVSynced, F: Future<Output = T> + Send>(self: &Arc<MVSync>, function: impl FnOnce() -> F + Send + 'static) -> (Task, TaskHandle<T>) {
let buffer = Arc::new(RwLock::new(None));
let state = Arc::new(RwLock::new(TaskState::Pending));
let signal = Arc::new(Signal::new());
let result = TaskHandle::new(buffer.clone(), state.clone(), signal.clone());
let task = Task::from_async(function, buffer, state, [signal, self.signal.clone()]);
(task, result)
}
pub fn create_future_task<T: MVSynced>(self: &Arc<MVSync>, function: impl Future<Output = T> + Send + 'static) -> (Task, TaskHandle<T>) {
let buffer = Arc::new(RwLock::new(None));
let state = Arc::new(RwLock::new(TaskState::Pending));
let signal = Arc::new(Signal::new());
let result = TaskHandle::new(buffer.clone(), state.clone(), signal.clone());
let task = Task::from_future(function, buffer, state, [signal, self.signal.clone()]);
(task, result)
}
pub fn create_async_continuation<T: MVSynced, R: MVSynced, F: Future<Output = R> + Send>(self: &Arc<MVSync>, function: impl FnOnce(T) -> F + Send + 'static, predecessor: TaskHandle<T>) -> (Task, TaskHandle<R>) {
let buffer = Arc::new(RwLock::new(None));
let state = Arc::new(RwLock::new(TaskState::Pending));
let signal = Arc::new(Signal::new());
let result = TaskHandle::new(buffer.clone(), state.clone(), signal.clone());
let task = Task::from_async_continuation(function, buffer, state, [signal, self.signal.clone()], predecessor);
(task, result)
}
}
id_eq!(MVSync);
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub struct MVSyncSpecs {
pub thread_count: u32,
pub workers_per_thread: u32,
}
impl MVSyncSpecs {
pub fn max_performance(workers_per_thread: u32) -> Self {
let threads = num_cpus::get();
MVSyncSpecs {
thread_count: threads as u32 - 2,
workers_per_thread
}
}
pub fn all_cores(workers_per_thread: u32) -> Self {
let threads = num_cpus::get();
MVSyncSpecs {
thread_count: threads as u32,
workers_per_thread
}
}
}
impl Default for MVSyncSpecs {
fn default() -> Self {
MVSyncSpecs {
thread_count: 1,
workers_per_thread: 1
}
}
}
#[cfg(test)]
mod tests {
use crate::{MVSync, MVSyncSpecs};
use crate::utils::async_yield;
#[test]
fn it_works() {
let sync = MVSync::new(MVSyncSpecs {
thread_count: 1,
workers_per_thread: 2
});
sync.register_main_thread(|sync| async move {
let (a, _) = sync.create_async_task(|| async move {
run("A").await;
});
let (b, _) = sync.create_async_task(|| async move {
run("B").await;
});
sync.submit_to_main_thread(a);
sync.submit_to_main_thread(b);
}, true);
}
async fn run(name: &str) {
for i in 0..10 {
println!("{}: {}", name, i);
async_yield().await;
}
}
}