use core::ffi::c_void;
use std::mem::size_of;
use std::slice;
use std::sync::{Arc, Mutex};
pub(crate) type WorkerMessageSender = ringbuf::HeapProducer<u8>;
pub(crate) type WorkerMessageReceiver = ringbuf::HeapConsumer<u8>;
const MAX_MESSAGE_SIZE: usize = 8192;
const N_MESSAGES: usize = 4;
type MessageBody = [u8; MAX_MESSAGE_SIZE];
#[derive(Debug)]
struct WorkerMessage {
size: usize,
body: MessageBody,
}
impl WorkerMessage {
fn data(&mut self) -> *mut c_void {
&mut self.body as *mut MessageBody as *mut c_void
}
}
pub(crate) fn instantiate_queue() -> (WorkerMessageSender, WorkerMessageReceiver) {
let (sender, receiver) = ringbuf::HeapRb::new(MAX_MESSAGE_SIZE * N_MESSAGES).split();
(sender, receiver)
}
fn publish_message(
sender: &mut WorkerMessageSender,
size: usize,
body: *mut u8,
) -> lv2_sys::LV2_Worker_Status {
if size > MAX_MESSAGE_SIZE {
return lv2_sys::LV2_Worker_Status_LV2_WORKER_ERR_NO_SPACE;
}
let mut body = unsafe { slice::from_raw_parts(body, size) };
let total_size = size_of::<usize>() + size;
if sender.free_len() < total_size {
return lv2_sys::LV2_Worker_Status_LV2_WORKER_ERR_NO_SPACE;
}
let size_as_bytes = size.to_be_bytes();
sender.push_slice(&size_as_bytes);
let result = sender.read_from(&mut body, Some(size));
match result {
Ok(_) => lv2_sys::LV2_Worker_Status_LV2_WORKER_SUCCESS,
Err(_) => lv2_sys::LV2_Worker_Status_LV2_WORKER_ERR_UNKNOWN,
}
}
fn pop_message(receiver: &mut WorkerMessageReceiver) -> WorkerMessage {
let mut size_as_bytes = [0; size_of::<usize>()];
receiver.pop_slice(&mut size_as_bytes);
let size = usize::from_be_bytes(size_as_bytes);
let mut body: MessageBody = [0; MAX_MESSAGE_SIZE];
let mut slice = &mut body[..];
receiver.write_into(&mut slice, Some(size)).unwrap();
WorkerMessage { size, body }
}
pub extern "C" fn schedule_work(
handle: lv2_sys::LV2_Worker_Schedule_Handle,
size: u32,
body: *const c_void,
) -> lv2_sys::LV2_Worker_Status {
let sender = unsafe { &mut *(handle as *mut WorkerMessageSender) };
publish_message(sender, size as usize, body as *mut u8)
}
extern "C" fn worker_respond(
handle: lv2_sys::LV2_Worker_Respond_Handle,
size: u32,
body: *const c_void,
) -> lv2_sys::LV2_Worker_Status {
let sender = unsafe { &mut *(handle as *mut WorkerMessageSender) };
publish_message(sender, size as usize, body as *mut u8)
}
pub struct Worker {
plugin_is_alive: Arc<Mutex<bool>>,
interface: lv2_sys::LV2_Worker_Interface,
instance_handle: lv2_sys::LV2_Handle,
receiver: WorkerMessageReceiver, sender: WorkerMessageSender, }
unsafe impl Send for Worker {}
unsafe impl Sync for Worker {}
impl Worker {
pub(crate) fn new(
plugin_is_alive: Arc<Mutex<bool>>,
interface: lv2_sys::LV2_Worker_Interface,
instance_handle: lv2_sys::LV2_Handle,
receiver: WorkerMessageReceiver,
sender: WorkerMessageSender,
) -> Self {
Worker {
plugin_is_alive,
interface,
instance_handle,
receiver,
sender,
}
}
pub fn do_work(&mut self) {
let plugin_is_alive = self.plugin_is_alive.lock().unwrap();
while *plugin_is_alive && self.receiver.len() > size_of::<usize>() {
let mut message = pop_message(&mut self.receiver);
if let Some(work_function) = self.interface.work {
let sender = &mut self.sender as *mut WorkerMessageSender as *mut c_void;
unsafe {
work_function(
self.instance_handle,
Some(worker_respond),
sender,
message.size as u32,
message.data(),
)
};
}
}
}
pub fn should_keep_working(&self) -> bool {
*self.plugin_is_alive.lock().unwrap()
}
}
impl std::fmt::Debug for Worker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Worker")
.field("plugin_is_alive", &self.plugin_is_alive)
.field("interface", &self.interface)
.field("instance_handle", &self.instance_handle)
.field("receiver", &"__internal__")
.field("sender", &"__internal__")
.finish()
}
}
pub(crate) unsafe fn maybe_get_worker_interface(
plugin: &lilv::plugin::Plugin,
common_uris: &crate::CommonUris,
instance: &mut lilv::instance::ActiveInstance,
) -> Option<lv2_sys::LV2_Worker_Interface> {
if !plugin.has_feature(&common_uris.worker_schedule_feature_uri) {
return None;
}
let descriptor = instance.instance().descriptor().unwrap();
type ExtDataFn = extern "C" fn(uri: *const u8) -> *const c_void;
let extension_data: Option<ExtDataFn> = std::mem::transmute(descriptor.extension_data);
extension_data?;
Some(
*instance
.instance()
.extension_data::<lv2_sys::LV2_Worker_Interface>(
"http://lv2plug.in/ns/ext/worker#interface",
)?
.as_ref(),
)
}
pub(crate) fn handle_work_responses(
worker_interface: &mut lv2_sys::LV2_Worker_Interface,
receiver: &mut WorkerMessageReceiver,
handle: lv2_sys::LV2_Handle,
) {
while receiver.len() > size_of::<usize>() {
let mut message = pop_message(receiver);
if let Some(work_response_function) = worker_interface.work_response {
unsafe { work_response_function(handle, message.size as u32, message.data()) };
}
}
}
pub(crate) fn end_run(
worker_interface: &mut lv2_sys::LV2_Worker_Interface,
handle: lv2_sys::LV2_Handle,
) {
if let Some(end_function) = worker_interface.end_run {
unsafe { end_function(handle) };
}
}
#[derive(Default, Debug)]
pub struct WorkerManager {
new_workers: Mutex<Vec<Worker>>,
running_workers: Mutex<Vec<Worker>>,
}
impl WorkerManager {
pub fn run_workers(&self) {
let mut workers = self.running_workers.lock().unwrap();
workers.extend(self.new_workers.lock().unwrap().drain(..));
workers.iter_mut().for_each(|worker| worker.do_work());
workers.retain(|worker| worker.should_keep_working());
}
pub fn workers_count(&self) -> usize {
self.running_workers.lock().unwrap().len() + self.new_workers.lock().unwrap().len()
}
pub(crate) fn add_worker(&self, worker: Worker) {
self.new_workers.lock().unwrap().push(worker);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::str;
#[test]
fn test_send() {
let (mut sender, mut receiver) = instantiate_queue();
let sentence_to_transfer = String::from("This is a message for you");
let mut data = sentence_to_transfer.clone().into_bytes();
publish_message(&mut sender, data.len(), data.as_mut_ptr());
let message = pop_message(&mut receiver);
let body = &message.body[..message.size];
let message_body = str::from_utf8(body).unwrap();
assert_eq!(sentence_to_transfer, message_body);
}
}