use std::ffi::c_void;
use std::sync::Mutex;
use super::types::{now_us, FfiStr};
use super::vtables::{BootstrapProviderVtable, FfiBootstrapEvent, FfiBootstrapSender};
use drasi_lib::bootstrap::BootstrapProvider;
pub struct FfiBootstrapProviderProxy {
pub(crate) vtable: Mutex<BootstrapProviderVtable>,
}
unsafe impl Send for FfiBootstrapProviderProxy {}
unsafe impl Sync for FfiBootstrapProviderProxy {}
#[async_trait::async_trait]
impl BootstrapProvider for FfiBootstrapProviderProxy {
async fn bootstrap(
&self,
request: drasi_lib::bootstrap::BootstrapRequest,
context: &drasi_lib::bootstrap::BootstrapContext,
event_tx: drasi_lib::channels::events::BootstrapEventSender,
settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
) -> anyhow::Result<usize> {
let (vtable_state, vtable_bootstrap_fn) = {
let vtable = self.vtable.lock().expect("vtable mutex poisoned");
(vtable.state, vtable.bootstrap_fn)
};
let query_id = request.query_id.clone();
let node_labels: Vec<String> = request.node_labels.clone();
let relation_labels: Vec<String> = request.relation_labels.clone();
let request_id = request.request_id.clone();
let server_id = context.server_id.clone();
let source_id = context.source_id.clone();
struct SenderState {
tx: std::sync::mpsc::Sender<drasi_lib::channels::events::BootstrapEvent>,
}
extern "C" fn send_fn(state: *mut c_void, event: *mut FfiBootstrapEvent) -> i32 {
let sender_state = unsafe { &*(state as *const SenderState) };
if event.is_null() {
return -1;
}
let ffi_event = unsafe { &*event };
let bootstrap_event = unsafe {
let opaque = ffi_event.opaque as *mut drasi_lib::channels::events::BootstrapEvent;
std::ptr::read(opaque)
};
match sender_state.tx.send(bootstrap_event) {
Ok(()) => 0,
Err(_) => -1,
}
}
extern "C" fn drop_sender(state: *mut c_void) {
unsafe { drop(Box::from_raw(state as *mut SenderState)) };
}
let (std_tx, std_rx) =
std::sync::mpsc::channel::<drasi_lib::channels::events::BootstrapEvent>();
let sender_state = Box::new(SenderState { tx: std_tx });
let ffi_sender = Box::new(FfiBootstrapSender {
state: Box::into_raw(sender_state) as *mut c_void,
send_fn,
drop_fn: drop_sender,
});
let vtable_state_usize = vtable_state as usize;
let bootstrap_fn = vtable_bootstrap_fn;
std::thread::spawn(move || {
let ffi_query_id = FfiStr::from_str(&query_id);
let ffi_node_labels: Vec<FfiStr> =
node_labels.iter().map(|s| FfiStr::from_str(s)).collect();
let ffi_rel_labels: Vec<FfiStr> = relation_labels
.iter()
.map(|s| FfiStr::from_str(s))
.collect();
let ffi_request_id = FfiStr::from_str(&request_id);
let ffi_server_id = FfiStr::from_str(&server_id);
let ffi_source_id = FfiStr::from_str(&source_id);
let ffi_sender_ptr = Box::into_raw(ffi_sender);
(bootstrap_fn)(
vtable_state_usize as *mut c_void,
ffi_query_id,
ffi_node_labels.as_ptr(),
ffi_node_labels.len(),
ffi_rel_labels.as_ptr(),
ffi_rel_labels.len(),
ffi_request_id,
ffi_server_id,
ffi_source_id,
ffi_sender_ptr,
);
let ffi_sender = unsafe { Box::from_raw(ffi_sender_ptr) };
(ffi_sender.drop_fn)(ffi_sender.state);
});
let count = tokio::task::spawn_blocking(move || {
let mut count: usize = 0;
while let Ok(record) = std_rx.recv() {
if event_tx.blocking_send(record).is_err() {
break;
}
count += 1;
}
count
})
.await
.expect("forwarding task panicked");
Ok(count)
}
}