use std::sync::Arc;
use async_trait::async_trait;
use drasi_lib::channels::events::SourceEventWrapper;
use drasi_lib::channels::ChangeReceiver;
use drasi_plugin_sdk::ffi::{FfiChangeReceiver, FfiSourceEvent};
struct PushCallbackContext {
tx: std::sync::Mutex<Option<std::sync::mpsc::SyncSender<Arc<SourceEventWrapper>>>>,
notify: Arc<tokio::sync::Notify>,
}
extern "C" fn change_push_callback(ctx: *mut std::ffi::c_void, event: *mut FfiSourceEvent) -> bool {
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
change_push_callback_inner(ctx, event)
}))
.unwrap_or(false)
}
fn change_push_callback_inner(ctx: *mut std::ffi::c_void, event: *mut FfiSourceEvent) -> bool {
let context = unsafe { &*(ctx as *const PushCallbackContext) };
if event.is_null() {
{
let mut guard = context.tx.lock().expect("push callback mutex poisoned");
*guard = None;
context.notify.notify_one();
}
unsafe { Arc::from_raw(ctx as *const PushCallbackContext) };
return false;
}
let ffi_event = unsafe { &*event };
let wrapper = unsafe { *Box::from_raw(ffi_event.opaque as *mut SourceEventWrapper) };
unsafe { drop(Box::from_raw(event)) };
let guard = context.tx.lock().expect("push callback mutex poisoned");
if let Some(tx) = guard.as_ref() {
let ok = tx.send(Arc::new(wrapper)).is_ok();
drop(guard);
if ok {
context.notify.notify_one();
true
} else {
unsafe { Arc::from_raw(ctx as *const PushCallbackContext) };
false
}
} else {
drop(guard);
unsafe { Arc::from_raw(ctx as *const PushCallbackContext) };
false
}
}
struct FfiReceiverState {
drop_fn: extern "C" fn(*mut std::ffi::c_void),
state: *mut std::ffi::c_void,
}
unsafe impl Send for FfiReceiverState {}
unsafe impl Sync for FfiReceiverState {}
impl Drop for FfiReceiverState {
fn drop(&mut self) {
let drop_fn = self.drop_fn;
let state = drasi_plugin_sdk::ffi::SendMutPtr(self.state);
let _ = std::thread::spawn(move || (drop_fn)(state.as_ptr())).join();
}
}
pub struct ChangeReceiverProxy {
rx: std::sync::mpsc::Receiver<Arc<SourceEventWrapper>>,
notify: Arc<tokio::sync::Notify>,
_callback_ctx: Arc<PushCallbackContext>,
_ffi_state: Arc<FfiReceiverState>,
}
unsafe impl Send for ChangeReceiverProxy {}
unsafe impl Sync for ChangeReceiverProxy {}
impl ChangeReceiverProxy {
pub fn new(inner: FfiChangeReceiver) -> Self {
let (tx, rx) = std::sync::mpsc::sync_channel(256);
let notify = Arc::new(tokio::sync::Notify::new());
let callback_ctx = Arc::new(PushCallbackContext {
tx: std::sync::Mutex::new(Some(tx)),
notify: notify.clone(),
});
let ffi_state = Arc::new(FfiReceiverState {
drop_fn: inner.drop_fn,
state: inner.state,
});
let ctx_for_plugin = callback_ctx.clone();
let ctx_ptr = Arc::into_raw(ctx_for_plugin) as *mut std::ffi::c_void;
(inner.start_push_fn)(inner.state, change_push_callback, ctx_ptr);
Self {
rx,
notify,
_callback_ctx: callback_ctx,
_ffi_state: ffi_state,
}
}
}
#[async_trait]
impl ChangeReceiver<SourceEventWrapper> for ChangeReceiverProxy {
async fn recv(&mut self) -> anyhow::Result<Arc<SourceEventWrapper>> {
loop {
match self.rx.try_recv() {
Ok(event) => return Ok(event),
Err(std::sync::mpsc::TryRecvError::Empty) => {
self.notify.notified().await;
}
Err(std::sync::mpsc::TryRecvError::Disconnected) => {
return Err(anyhow::anyhow!("Channel closed"));
}
}
}
}
}
struct BootstrapPushCallbackContext {
tx: std::sync::Mutex<
Option<std::sync::mpsc::SyncSender<drasi_lib::channels::events::BootstrapEvent>>,
>,
notify: Arc<tokio::sync::Notify>,
}
extern "C" fn bootstrap_push_callback(
ctx: *mut std::ffi::c_void,
event: *mut drasi_plugin_sdk::ffi::FfiBootstrapEvent,
) -> bool {
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
bootstrap_push_callback_inner(ctx, event)
}))
.unwrap_or(false)
}
fn bootstrap_push_callback_inner(
ctx: *mut std::ffi::c_void,
event: *mut drasi_plugin_sdk::ffi::FfiBootstrapEvent,
) -> bool {
let context = unsafe { &*(ctx as *const BootstrapPushCallbackContext) };
if event.is_null() {
{
let mut guard = context
.tx
.lock()
.expect("bootstrap push callback mutex poisoned");
*guard = None;
context.notify.notify_one();
}
unsafe { Arc::from_raw(ctx as *const BootstrapPushCallbackContext) };
return false;
}
let ffi_event = unsafe { &*event };
let bootstrap_event = unsafe {
*Box::from_raw(ffi_event.opaque as *mut drasi_lib::channels::events::BootstrapEvent)
};
unsafe { drop(Box::from_raw(event)) };
let guard = context
.tx
.lock()
.expect("bootstrap push callback mutex poisoned");
if let Some(tx) = guard.as_ref() {
let ok = tx.send(bootstrap_event).is_ok();
drop(guard);
if ok {
context.notify.notify_one();
true
} else {
unsafe { Arc::from_raw(ctx as *const BootstrapPushCallbackContext) };
false
}
} else {
drop(guard);
unsafe { Arc::from_raw(ctx as *const BootstrapPushCallbackContext) };
false
}
}
struct FfiBootstrapReceiverState {
drop_fn: extern "C" fn(*mut std::ffi::c_void),
state: *mut std::ffi::c_void,
}
unsafe impl Send for FfiBootstrapReceiverState {}
unsafe impl Sync for FfiBootstrapReceiverState {}
impl Drop for FfiBootstrapReceiverState {
fn drop(&mut self) {
let drop_fn = self.drop_fn;
let state = drasi_plugin_sdk::ffi::SendMutPtr(self.state);
let _ = std::thread::spawn(move || (drop_fn)(state.as_ptr())).join();
}
}
pub struct BootstrapReceiverProxy {
rx: std::sync::mpsc::Receiver<drasi_lib::channels::events::BootstrapEvent>,
notify: Arc<tokio::sync::Notify>,
_callback_ctx: Arc<BootstrapPushCallbackContext>,
_ffi_state: Arc<FfiBootstrapReceiverState>,
}
unsafe impl Send for BootstrapReceiverProxy {}
unsafe impl Sync for BootstrapReceiverProxy {}
impl BootstrapReceiverProxy {
pub fn new(inner: drasi_plugin_sdk::ffi::FfiBootstrapReceiver) -> Self {
let (tx, rx) = std::sync::mpsc::sync_channel(256);
let notify = Arc::new(tokio::sync::Notify::new());
let callback_ctx = Arc::new(BootstrapPushCallbackContext {
tx: std::sync::Mutex::new(Some(tx)),
notify: notify.clone(),
});
let ffi_state = Arc::new(FfiBootstrapReceiverState {
drop_fn: inner.drop_fn,
state: inner.state,
});
let ctx_for_plugin = callback_ctx.clone();
let ctx_ptr = Arc::into_raw(ctx_for_plugin) as *mut std::ffi::c_void;
(inner.start_push_fn)(inner.state, bootstrap_push_callback, ctx_ptr);
Self {
rx,
notify,
_callback_ctx: callback_ctx,
_ffi_state: ffi_state,
}
}
pub fn into_mpsc_receiver(self) -> drasi_lib::channels::events::BootstrapEventReceiver {
let (tx, out_rx) = tokio::sync::mpsc::channel(256);
let rx = self.rx;
let notify = self.notify.clone();
let _ffi_state = self._ffi_state;
let _callback_ctx = self._callback_ctx;
tokio::spawn(async move {
let _ffi = _ffi_state;
let _ctx = _callback_ctx;
loop {
loop {
match rx.try_recv() {
Ok(event) => {
if tx.send(event).await.is_err() {
return;
}
}
Err(std::sync::mpsc::TryRecvError::Empty) => break,
Err(std::sync::mpsc::TryRecvError::Disconnected) => return,
}
}
notify.notified().await;
}
});
out_rx
}
}