use std::collections::HashSet;
use std::ffi::c_void;
use std::sync::mpsc;
use std::sync::Arc;
use drasi_core::models::{ElementMetadata, SourceChange};
use drasi_lib::bootstrap::BootstrapProvider;
use drasi_lib::channels::events::{
BootstrapEvent, BootstrapEventSender, SourceEvent, SourceEventWrapper,
};
use drasi_lib::channels::ChangeReceiver;
use drasi_lib::component_graph::ComponentUpdateReceiver;
use drasi_lib::config::SourceSubscriptionSettings;
use drasi_lib::reactions::Reaction;
use drasi_lib::sources::Source;
use drasi_lib::{ComponentStatus, DispatchMode, SourceRuntimeContext, StateStoreProvider};
use super::bootstrap_proxy::FfiBootstrapProviderProxy;
use super::callbacks::FfiLifecycleEvent;
use super::callbacks::FfiLifecycleEventType;
use super::state_store_proxy::FfiStateStoreProxy;
use super::types::*;
use super::vtables::*;
use crate::descriptor::{
BootstrapPluginDescriptor, ReactionPluginDescriptor, SourcePluginDescriptor,
};
type LifecycleEmitterFn = fn(&str, FfiLifecycleEventType, &str);
const _: () = assert!(
std::mem::size_of::<*mut ()>() == std::mem::size_of::<super::callbacks::LifecycleCallbackFn>(),
"LifecycleCallbackFn size must match pointer size"
);
const _: () = assert!(
std::mem::size_of::<*mut ()>() == std::mem::size_of::<super::callbacks::LogCallbackFn>(),
"LogCallbackFn size must match pointer size"
);
fn dispatch_to_runtime<R: Send + 'static>(
handle: &tokio::runtime::Handle,
future: impl std::future::Future<Output = R> + Send + 'static,
) -> R {
let (tx, rx) = mpsc::sync_channel::<R>(0);
handle.spawn(async move {
let result = future.await;
let _ = tx.send(result);
});
rx.recv().expect("plugin runtime task dropped unexpectedly")
}
fn source_change_metadata(change: &SourceChange) -> Option<&ElementMetadata> {
match change {
SourceChange::Insert { element } | SourceChange::Update { element } => {
Some(element.get_metadata())
}
SourceChange::Delete { metadata } => Some(metadata),
SourceChange::Future { .. } => None,
}
}
fn component_status_to_ffi(s: ComponentStatus) -> FfiComponentStatus {
match s {
ComponentStatus::Running => FfiComponentStatus::Running,
ComponentStatus::Stopped => FfiComponentStatus::Stopped,
ComponentStatus::Starting => FfiComponentStatus::Starting,
ComponentStatus::Stopping => FfiComponentStatus::Stopping,
ComponentStatus::Reconfiguring => FfiComponentStatus::Reconfiguring,
ComponentStatus::Error => FfiComponentStatus::Error,
}
}
fn dispatch_mode_to_ffi(m: DispatchMode) -> FfiDispatchMode {
match m {
DispatchMode::Broadcast => FfiDispatchMode::Broadcast,
DispatchMode::Channel => FfiDispatchMode::Channel,
}
}
#[derive(Clone)]
pub struct InstanceLogContext {
pub instance_id: String,
pub component_id: String,
pub component_type: String,
pub log_cb: Option<super::callbacks::LogCallbackFn>,
pub log_ctx: *mut c_void,
}
unsafe impl Send for InstanceLogContext {}
thread_local! {
pub static INSTANCE_LOG_CTX: std::cell::RefCell<Option<InstanceLogContext>> =
const { std::cell::RefCell::new(None) };
}
pub fn current_instance_log_ctx() -> Option<InstanceLogContext> {
INSTANCE_LOG_CTX.with(|ctx| ctx.borrow().clone())
}
fn set_instance_log_ctx(ctx: Option<InstanceLogContext>) {
INSTANCE_LOG_CTX.with(|tls| {
*tls.borrow_mut() = ctx;
});
}
fn clear_instance_log_ctx() {
INSTANCE_LOG_CTX.with(|tls| {
*tls.borrow_mut() = None;
});
}
pub(crate) struct SourceWrapper<T: Source + 'static> {
pub inner: T,
pub cached_id: String,
pub cached_type_name: String,
pub lifecycle_emitter: LifecycleEmitterFn,
pub runtime_handle: fn() -> &'static tokio::runtime::Runtime,
pub vtable_executor: AsyncExecutorFn,
pub instance_log_cb: std::sync::atomic::AtomicPtr<()>,
pub instance_log_ctx: std::sync::atomic::AtomicPtr<c_void>,
pub instance_lifecycle_cb: std::sync::atomic::AtomicPtr<()>,
pub instance_lifecycle_ctx: std::sync::atomic::AtomicPtr<c_void>,
pub instance_id: std::sync::RwLock<String>,
pub _status_rx: std::sync::Mutex<Option<ComponentUpdateReceiver>>,
}
pub fn build_source_vtable<T: Source + 'static>(
source: T,
executor: AsyncExecutorFn,
lifecycle_emitter: LifecycleEmitterFn,
runtime: fn() -> &'static tokio::runtime::Runtime,
) -> SourceVtable {
extern "C" fn id_fn<T: Source + 'static>(state: *const c_void) -> FfiStr {
let w = unsafe { &*(state as *const SourceWrapper<T>) };
FfiStr::from_str(&w.cached_id)
}
extern "C" fn type_name_fn<T: Source + 'static>(state: *const c_void) -> FfiStr {
let w = unsafe { &*(state as *const SourceWrapper<T>) };
FfiStr::from_str(&w.cached_type_name)
}
extern "C" fn auto_start_fn<T: Source + 'static>(state: *const c_void) -> bool {
let w = unsafe { &*(state as *const SourceWrapper<T>) };
w.inner.auto_start()
}
extern "C" fn dispatch_mode_fn<T: Source + 'static>(state: *const c_void) -> FfiDispatchMode {
let w = unsafe { &*(state as *const SourceWrapper<T>) };
dispatch_mode_to_ffi(w.inner.dispatch_mode())
}
extern "C" fn properties_fn<T: Source + 'static>(state: *const c_void) -> FfiOwnedStr {
let w = unsafe { &*(state as *const SourceWrapper<T>) };
let props = w.inner.properties();
let json = serde_json::to_string(&props).unwrap_or_else(|_| "{}".to_string());
FfiOwnedStr::from_string(json)
}
fn emit_lifecycle_for<T: Source + 'static>(
w: &SourceWrapper<T>,
event_type: FfiLifecycleEventType,
message: &str,
) {
let cb_ptr = w
.instance_lifecycle_cb
.load(std::sync::atomic::Ordering::Acquire);
if !cb_ptr.is_null() {
let cb: super::callbacks::LifecycleCallbackFn = unsafe { std::mem::transmute(cb_ptr) };
let ctx = w
.instance_lifecycle_ctx
.load(std::sync::atomic::Ordering::Acquire);
let instance_id = w.instance_id.read().map(|s| s.clone()).unwrap_or_default();
let event = FfiLifecycleEvent {
component_id: FfiStr::from_str(&w.cached_id),
component_type: FfiStr::from_str("source"),
event_type,
message: FfiStr::from_str(message),
timestamp_us: super::now_us(),
};
cb(ctx, &event);
} else {
(w.lifecycle_emitter)(&w.cached_id, event_type, message);
}
}
fn build_instance_log_ctx<T: Source + 'static>(
w: &SourceWrapper<T>,
) -> Option<InstanceLogContext> {
let cb_ptr = w.instance_log_cb.load(std::sync::atomic::Ordering::Acquire);
if cb_ptr.is_null() {
return None;
}
let cb: super::callbacks::LogCallbackFn = unsafe { std::mem::transmute(cb_ptr) };
let ctx = w
.instance_log_ctx
.load(std::sync::atomic::Ordering::Acquire);
let instance_id = w.instance_id.read().map(|s| s.clone()).unwrap_or_default();
Some(InstanceLogContext {
instance_id,
component_id: w.cached_id.clone(),
component_type: "source".to_string(),
log_cb: Some(cb),
log_ctx: ctx,
})
}
extern "C" fn start_fn<T: Source + 'static>(state: *mut c_void) -> FfiResult {
catch_panic_ffi(|| {
let w = unsafe { &*(state as *const SourceWrapper<T>) };
emit_lifecycle_for(w, FfiLifecycleEventType::Starting, "");
let log_ctx = build_instance_log_ctx(w);
let handle = (w.runtime_handle)().handle().clone();
let ptr = SendPtr(state as *const SourceWrapper<T>);
let result = dispatch_to_runtime(&handle, async move {
set_instance_log_ctx(log_ctx);
let inner = unsafe { ptr.as_ref() };
let r = inner.inner.start().await;
clear_instance_log_ctx();
r
});
match result {
Ok(()) => {
emit_lifecycle_for(w, FfiLifecycleEventType::Started, "");
FfiResult::ok()
}
Err(e) => {
let msg = e.to_string();
emit_lifecycle_for(w, FfiLifecycleEventType::Error, &msg);
FfiResult::err(msg)
}
}
})
}
extern "C" fn stop_fn<T: Source + 'static>(state: *mut c_void) -> FfiResult {
catch_panic_ffi(|| {
let w = unsafe { &*(state as *const SourceWrapper<T>) };
emit_lifecycle_for(w, FfiLifecycleEventType::Stopping, "");
let log_ctx = build_instance_log_ctx(w);
let handle = (w.runtime_handle)().handle().clone();
let ptr = SendPtr(state as *const SourceWrapper<T>);
let result = dispatch_to_runtime(&handle, async move {
set_instance_log_ctx(log_ctx);
let inner = unsafe { ptr.as_ref() };
let r = inner.inner.stop().await;
clear_instance_log_ctx();
r
});
match result {
Ok(()) => {
emit_lifecycle_for(w, FfiLifecycleEventType::Stopped, "");
FfiResult::ok()
}
Err(e) => {
let msg = e.to_string();
emit_lifecycle_for(w, FfiLifecycleEventType::Error, &msg);
FfiResult::err(msg)
}
}
})
}
extern "C" fn status_fn<T: Source + 'static>(state: *const c_void) -> FfiComponentStatus {
let w = unsafe { &*(state as *const SourceWrapper<T>) };
let handle = (w.runtime_handle)().handle().clone();
let ptr = SendPtr(state as *const SourceWrapper<T>);
let status = dispatch_to_runtime(&handle, async move {
let inner = unsafe { ptr.as_ref() };
inner.inner.status().await
});
component_status_to_ffi(status)
}
extern "C" fn deprovision_fn<T: Source + 'static>(state: *mut c_void) -> FfiResult {
catch_panic_ffi(|| {
let w = unsafe { &*(state as *const SourceWrapper<T>) };
let handle = (w.runtime_handle)().handle().clone();
let ptr = SendPtr(state as *const SourceWrapper<T>);
let result = dispatch_to_runtime(&handle, async move {
let inner = unsafe { ptr.as_ref() };
inner.inner.deprovision().await
});
match result {
Ok(()) => FfiResult::ok(),
Err(e) => FfiResult::err(e.to_string()),
}
})
}
extern "C" fn subscribe_fn<T: Source + 'static>(
state: *mut c_void,
source_id: FfiStr,
enable_bootstrap: bool,
query_id: FfiStr,
nodes_json: FfiStr,
relations_json: FfiStr,
) -> *mut FfiSubscriptionResponse {
let w = unsafe { &*(state as *const SourceWrapper<T>) };
let source_id_str = unsafe { source_id.to_string() };
let qid = unsafe { query_id.to_string() };
let nodes_str = unsafe { nodes_json.to_string() };
let rels_str = unsafe { relations_json.to_string() };
let nodes: HashSet<String> = serde_json::from_str(&nodes_str).unwrap_or_default();
let relations: HashSet<String> = serde_json::from_str(&rels_str).unwrap_or_default();
let settings = SourceSubscriptionSettings {
source_id: source_id_str,
enable_bootstrap,
query_id: qid,
nodes,
relations,
};
let handle = (w.runtime_handle)().handle().clone();
let ptr = SendPtr(state as *const SourceWrapper<T>);
let result = dispatch_to_runtime(&handle, async move {
let inner = unsafe { ptr.as_ref() };
inner.inner.subscribe(settings).await
});
match result {
Ok(sub) => wrap_subscription_response(sub, w.vtable_executor, handle),
Err(e) => {
log::error!("Subscribe failed: {e}");
std::ptr::null_mut()
}
}
}
extern "C" fn initialize_fn<T: Source + 'static>(
state: *mut c_void,
ctx: *const FfiRuntimeContext,
) {
let w = unsafe { &*(state as *const SourceWrapper<T>) };
let ffi_ctx = unsafe { &*ctx };
if let Some(log_cb) = ffi_ctx.log_callback {
w.instance_log_cb
.store(log_cb as *mut (), std::sync::atomic::Ordering::Release);
w.instance_log_ctx
.store(ffi_ctx.log_ctx, std::sync::atomic::Ordering::Release);
}
if let Some(lifecycle_cb) = ffi_ctx.lifecycle_callback {
w.instance_lifecycle_cb.store(
lifecycle_cb as *mut (),
std::sync::atomic::Ordering::Release,
);
w.instance_lifecycle_ctx
.store(ffi_ctx.lifecycle_ctx, std::sync::atomic::Ordering::Release);
}
if let Ok(mut iid) = w.instance_id.write() {
*iid = unsafe { ffi_ctx.instance_id.to_string() };
}
let (runtime_ctx, status_rx) = build_source_runtime_context(ffi_ctx);
if let Ok(mut guard) = w._status_rx.lock() {
*guard = Some(status_rx);
}
let handle = (w.runtime_handle)().handle().clone();
let ptr = SendPtr(state as *const SourceWrapper<T>);
dispatch_to_runtime(&handle, async move {
let inner = unsafe { ptr.as_ref() };
inner.inner.initialize(runtime_ctx).await
});
}
extern "C" fn set_bootstrap_provider_fn<T: Source + 'static>(
state: *mut c_void,
provider: *mut BootstrapProviderVtable,
) {
if provider.is_null() {
return;
}
let vtable = unsafe { Box::from_raw(provider) };
let proxy = Box::new(FfiBootstrapProviderProxy {
vtable: std::sync::Mutex::new(*vtable),
});
let w = unsafe { &*(state as *const SourceWrapper<T>) };
let handle = (w.runtime_handle)().handle().clone();
let ptr = SendPtr(state as *const SourceWrapper<T>);
dispatch_to_runtime(&handle, async move {
let inner = unsafe { ptr.as_ref() };
inner.inner.set_bootstrap_provider(proxy).await
});
}
extern "C" fn drop_fn<T: Source + 'static>(state: *mut c_void) {
unsafe { drop(Box::from_raw(state as *mut SourceWrapper<T>)) };
}
let cached_id = source.id().to_string();
let cached_type_name = source.type_name().to_string();
let wrapper = Box::new(SourceWrapper {
inner: source,
cached_id,
cached_type_name,
lifecycle_emitter,
runtime_handle: runtime,
vtable_executor: executor,
instance_log_cb: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
instance_log_ctx: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
instance_lifecycle_cb: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
instance_lifecycle_ctx: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
instance_id: std::sync::RwLock::new(String::new()),
_status_rx: std::sync::Mutex::new(None),
});
SourceVtable {
state: Box::into_raw(wrapper) as *mut c_void,
executor,
id_fn: id_fn::<T>,
type_name_fn: type_name_fn::<T>,
auto_start_fn: auto_start_fn::<T>,
dispatch_mode_fn: dispatch_mode_fn::<T>,
properties_fn: properties_fn::<T>,
start_fn: start_fn::<T>,
stop_fn: stop_fn::<T>,
status_fn: status_fn::<T>,
deprovision_fn: deprovision_fn::<T>,
initialize_fn: initialize_fn::<T>,
subscribe_fn: subscribe_fn::<T>,
set_bootstrap_provider_fn: set_bootstrap_provider_fn::<T>,
drop_fn: drop_fn::<T>,
}
}
pub fn build_source_vtable_from_boxed(
source: Box<dyn Source>,
executor: AsyncExecutorFn,
lifecycle_emitter: LifecycleEmitterFn,
runtime: fn() -> &'static tokio::runtime::Runtime,
) -> SourceVtable {
struct DynSourceWrapper {
inner: Box<dyn Source>,
cached_id: String,
cached_type_name: String,
lifecycle_emitter: LifecycleEmitterFn,
runtime_handle: fn() -> &'static tokio::runtime::Runtime,
vtable_executor: AsyncExecutorFn,
instance_log_cb: std::sync::atomic::AtomicPtr<()>,
instance_log_ctx: std::sync::atomic::AtomicPtr<c_void>,
instance_lifecycle_cb: std::sync::atomic::AtomicPtr<()>,
instance_lifecycle_ctx: std::sync::atomic::AtomicPtr<c_void>,
instance_id: std::sync::RwLock<String>,
_status_rx: std::sync::Mutex<Option<ComponentUpdateReceiver>>,
}
extern "C" fn id_fn(state: *const c_void) -> FfiStr {
let w = unsafe { &*(state as *const DynSourceWrapper) };
FfiStr::from_str(&w.cached_id)
}
extern "C" fn type_name_fn(state: *const c_void) -> FfiStr {
let w = unsafe { &*(state as *const DynSourceWrapper) };
FfiStr::from_str(&w.cached_type_name)
}
extern "C" fn auto_start_fn(state: *const c_void) -> bool {
let w = unsafe { &*(state as *const DynSourceWrapper) };
w.inner.auto_start()
}
extern "C" fn dispatch_mode_fn(state: *const c_void) -> FfiDispatchMode {
let w = unsafe { &*(state as *const DynSourceWrapper) };
dispatch_mode_to_ffi(w.inner.dispatch_mode())
}
extern "C" fn properties_fn(state: *const c_void) -> FfiOwnedStr {
let w = unsafe { &*(state as *const DynSourceWrapper) };
let props = w.inner.properties();
let json = serde_json::to_string(&props).unwrap_or_else(|_| "{}".to_string());
FfiOwnedStr::from_string(json)
}
fn emit_dyn_source_lifecycle(
w: &DynSourceWrapper,
event_type: FfiLifecycleEventType,
message: &str,
) {
let cb_ptr = w
.instance_lifecycle_cb
.load(std::sync::atomic::Ordering::Acquire);
if !cb_ptr.is_null() {
let cb: super::callbacks::LifecycleCallbackFn = unsafe { std::mem::transmute(cb_ptr) };
let ctx = w
.instance_lifecycle_ctx
.load(std::sync::atomic::Ordering::Acquire);
let event = FfiLifecycleEvent {
component_id: FfiStr::from_str(&w.cached_id),
component_type: FfiStr::from_str("source"),
event_type,
message: FfiStr::from_str(message),
timestamp_us: super::now_us(),
};
cb(ctx, &event);
} else {
(w.lifecycle_emitter)(&w.cached_id, event_type, message);
}
}
fn build_dyn_source_log_ctx(w: &DynSourceWrapper) -> Option<InstanceLogContext> {
let cb_ptr = w.instance_log_cb.load(std::sync::atomic::Ordering::Acquire);
if cb_ptr.is_null() {
return None;
}
let cb: super::callbacks::LogCallbackFn = unsafe { std::mem::transmute(cb_ptr) };
let ctx = w
.instance_log_ctx
.load(std::sync::atomic::Ordering::Acquire);
let instance_id = w.instance_id.read().map(|s| s.clone()).unwrap_or_default();
Some(InstanceLogContext {
instance_id,
component_id: w.cached_id.clone(),
component_type: "source".to_string(),
log_cb: Some(cb),
log_ctx: ctx,
})
}
extern "C" fn start_fn(state: *mut c_void) -> FfiResult {
catch_panic_ffi(|| {
let w = unsafe { &*(state as *const DynSourceWrapper) };
emit_dyn_source_lifecycle(w, FfiLifecycleEventType::Starting, "");
let log_ctx = build_dyn_source_log_ctx(w);
let handle = (w.runtime_handle)().handle().clone();
let inner_ptr = SendPtr(state as *const DynSourceWrapper);
let result = dispatch_to_runtime(&handle, async move {
set_instance_log_ctx(log_ctx);
let inner = unsafe { inner_ptr.as_ref() };
let r = inner.inner.start().await;
clear_instance_log_ctx();
r
});
match result {
Ok(()) => {
emit_dyn_source_lifecycle(w, FfiLifecycleEventType::Started, "");
FfiResult::ok()
}
Err(e) => {
let msg = e.to_string();
emit_dyn_source_lifecycle(w, FfiLifecycleEventType::Error, &msg);
FfiResult::err(msg)
}
}
})
}
extern "C" fn stop_fn(state: *mut c_void) -> FfiResult {
catch_panic_ffi(|| {
let w = unsafe { &*(state as *const DynSourceWrapper) };
emit_dyn_source_lifecycle(w, FfiLifecycleEventType::Stopping, "");
let log_ctx = build_dyn_source_log_ctx(w);
let handle = (w.runtime_handle)().handle().clone();
let inner_ptr = SendPtr(state as *const DynSourceWrapper);
let result = dispatch_to_runtime(&handle, async move {
set_instance_log_ctx(log_ctx);
let inner = unsafe { inner_ptr.as_ref() };
let r = inner.inner.stop().await;
clear_instance_log_ctx();
r
});
match result {
Ok(()) => {
emit_dyn_source_lifecycle(w, FfiLifecycleEventType::Stopped, "");
FfiResult::ok()
}
Err(e) => {
let msg = e.to_string();
emit_dyn_source_lifecycle(w, FfiLifecycleEventType::Error, &msg);
FfiResult::err(msg)
}
}
})
}
extern "C" fn status_fn(state: *const c_void) -> FfiComponentStatus {
let w = unsafe { &*(state as *const DynSourceWrapper) };
let handle = (w.runtime_handle)().handle().clone();
let inner_ptr = SendPtr(state as *const DynSourceWrapper);
let status = dispatch_to_runtime(&handle, async move {
let inner = unsafe { inner_ptr.as_ref() };
inner.inner.status().await
});
component_status_to_ffi(status)
}
extern "C" fn deprovision_fn(state: *mut c_void) -> FfiResult {
catch_panic_ffi(|| {
let w = unsafe { &*(state as *const DynSourceWrapper) };
let handle = (w.runtime_handle)().handle().clone();
let inner_ptr = SendPtr(state as *const DynSourceWrapper);
let result = dispatch_to_runtime(&handle, async move {
let inner = unsafe { inner_ptr.as_ref() };
inner.inner.deprovision().await
});
match result {
Ok(()) => FfiResult::ok(),
Err(e) => FfiResult::err(e.to_string()),
}
})
}
extern "C" fn subscribe_fn(
state: *mut c_void,
source_id: FfiStr,
enable_bootstrap: bool,
query_id: FfiStr,
nodes_json: FfiStr,
relations_json: FfiStr,
) -> *mut FfiSubscriptionResponse {
let w = unsafe { &*(state as *const DynSourceWrapper) };
let source_id_str = unsafe { source_id.to_string() };
let qid = unsafe { query_id.to_string() };
let nodes_str = unsafe { nodes_json.to_string() };
let rels_str = unsafe { relations_json.to_string() };
let nodes: HashSet<String> = serde_json::from_str(&nodes_str).unwrap_or_default();
let relations: HashSet<String> = serde_json::from_str(&rels_str).unwrap_or_default();
let settings = SourceSubscriptionSettings {
source_id: source_id_str,
enable_bootstrap,
query_id: qid,
nodes,
relations,
};
let handle = (w.runtime_handle)().handle().clone();
let inner_ptr = SendPtr(state as *const DynSourceWrapper);
let result = dispatch_to_runtime(&handle, async move {
let inner = unsafe { inner_ptr.as_ref() };
inner.inner.subscribe(settings).await
});
match result {
Ok(sub) => {
let executor = unsafe { &*(state as *const DynSourceWrapper) }.vtable_executor;
wrap_subscription_response(sub, executor, handle)
}
Err(e) => {
log::error!("Subscribe failed: {e}");
std::ptr::null_mut()
}
}
}
extern "C" fn initialize_fn(state: *mut c_void, ctx: *const FfiRuntimeContext) {
let w = unsafe { &*(state as *const DynSourceWrapper) };
let ffi_ctx = unsafe { &*ctx };
if let Some(log_cb) = ffi_ctx.log_callback {
w.instance_log_cb
.store(log_cb as *mut (), std::sync::atomic::Ordering::Release);
w.instance_log_ctx
.store(ffi_ctx.log_ctx, std::sync::atomic::Ordering::Release);
}
if let Some(lifecycle_cb) = ffi_ctx.lifecycle_callback {
w.instance_lifecycle_cb.store(
lifecycle_cb as *mut (),
std::sync::atomic::Ordering::Release,
);
w.instance_lifecycle_ctx
.store(ffi_ctx.lifecycle_ctx, std::sync::atomic::Ordering::Release);
}
if let Ok(mut iid) = w.instance_id.write() {
*iid = unsafe { ffi_ctx.instance_id.to_string() };
}
let (runtime_ctx, status_rx) = build_source_runtime_context(ffi_ctx);
if let Ok(mut guard) = w._status_rx.lock() {
*guard = Some(status_rx);
}
let handle = (w.runtime_handle)().handle().clone();
let inner_ptr = SendPtr(state as *const DynSourceWrapper);
dispatch_to_runtime(&handle, async move {
let inner = unsafe { inner_ptr.as_ref() };
inner.inner.initialize(runtime_ctx).await
});
}
extern "C" fn set_bootstrap_provider_fn(
state: *mut c_void,
provider: *mut BootstrapProviderVtable,
) {
if provider.is_null() {
return;
}
let vtable = unsafe { Box::from_raw(provider) };
let proxy = Box::new(FfiBootstrapProviderProxy {
vtable: std::sync::Mutex::new(*vtable),
});
let w = unsafe { &*(state as *const DynSourceWrapper) };
let handle = (w.runtime_handle)().handle().clone();
let inner_ptr = SendPtr(state as *const DynSourceWrapper);
dispatch_to_runtime(&handle, async move {
let inner = unsafe { inner_ptr.as_ref() };
inner.inner.set_bootstrap_provider(proxy).await
});
}
extern "C" fn drop_fn(state: *mut c_void) {
unsafe { drop(Box::from_raw(state as *mut DynSourceWrapper)) };
}
let cached_id = source.id().to_string();
let cached_type_name = source.type_name().to_string();
let wrapper = Box::new(DynSourceWrapper {
inner: source,
cached_id,
cached_type_name,
lifecycle_emitter,
runtime_handle: runtime,
vtable_executor: executor,
instance_log_cb: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
instance_log_ctx: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
instance_lifecycle_cb: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
instance_lifecycle_ctx: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
instance_id: std::sync::RwLock::new(String::new()),
_status_rx: std::sync::Mutex::new(None),
});
SourceVtable {
state: Box::into_raw(wrapper) as *mut c_void,
executor,
id_fn,
type_name_fn,
auto_start_fn,
dispatch_mode_fn,
properties_fn,
start_fn,
stop_fn,
status_fn,
deprovision_fn,
initialize_fn,
subscribe_fn,
set_bootstrap_provider_fn,
drop_fn,
}
}
pub(crate) struct ReactionWrapper<T: Reaction + 'static> {
pub inner: T,
pub cached_id: String,
pub cached_type_name: String,
pub lifecycle_emitter: LifecycleEmitterFn,
pub runtime_handle: fn() -> &'static tokio::runtime::Runtime,
pub instance_log_cb: std::sync::atomic::AtomicPtr<()>,
pub instance_log_ctx: std::sync::atomic::AtomicPtr<c_void>,
pub instance_lifecycle_cb: std::sync::atomic::AtomicPtr<()>,
pub instance_lifecycle_ctx: std::sync::atomic::AtomicPtr<c_void>,
pub instance_id: std::sync::RwLock<String>,
pub _status_rx: std::sync::Mutex<Option<ComponentUpdateReceiver>>,
}
pub fn build_reaction_vtable<T: Reaction + 'static>(
reaction: T,
executor: AsyncExecutorFn,
lifecycle_emitter: LifecycleEmitterFn,
runtime: fn() -> &'static tokio::runtime::Runtime,
) -> ReactionVtable {
extern "C" fn id_fn<T: Reaction + 'static>(state: *const c_void) -> FfiStr {
let w = unsafe { &*(state as *const ReactionWrapper<T>) };
FfiStr::from_str(&w.cached_id)
}
extern "C" fn type_name_fn<T: Reaction + 'static>(state: *const c_void) -> FfiStr {
let w = unsafe { &*(state as *const ReactionWrapper<T>) };
FfiStr::from_str(&w.cached_type_name)
}
extern "C" fn auto_start_fn<T: Reaction + 'static>(state: *const c_void) -> bool {
let w = unsafe { &*(state as *const ReactionWrapper<T>) };
w.inner.auto_start()
}
extern "C" fn query_ids_fn<T: Reaction + 'static>(state: *const c_void) -> FfiStringArray {
let w = unsafe { &*(state as *const ReactionWrapper<T>) };
FfiStringArray::from_vec(w.inner.query_ids())
}
extern "C" fn properties_fn<T: Reaction + 'static>(state: *const c_void) -> FfiOwnedStr {
let w = unsafe { &*(state as *const ReactionWrapper<T>) };
let props = w.inner.properties();
let json = serde_json::to_string(&props).unwrap_or_else(|_| "{}".to_string());
FfiOwnedStr::from_string(json)
}
fn emit_reaction_lifecycle_for<T: Reaction + 'static>(
w: &ReactionWrapper<T>,
event_type: FfiLifecycleEventType,
message: &str,
) {
let cb_ptr = w
.instance_lifecycle_cb
.load(std::sync::atomic::Ordering::Acquire);
if !cb_ptr.is_null() {
let cb: super::callbacks::LifecycleCallbackFn = unsafe { std::mem::transmute(cb_ptr) };
let ctx = w
.instance_lifecycle_ctx
.load(std::sync::atomic::Ordering::Acquire);
let event = FfiLifecycleEvent {
component_id: FfiStr::from_str(&w.cached_id),
component_type: FfiStr::from_str("reaction"),
event_type,
message: FfiStr::from_str(message),
timestamp_us: super::now_us(),
};
cb(ctx, &event);
} else {
(w.lifecycle_emitter)(&w.cached_id, event_type, message);
}
}
fn build_reaction_log_ctx<T: Reaction + 'static>(
w: &ReactionWrapper<T>,
) -> Option<InstanceLogContext> {
let cb_ptr = w.instance_log_cb.load(std::sync::atomic::Ordering::Acquire);
if cb_ptr.is_null() {
return None;
}
let cb: super::callbacks::LogCallbackFn = unsafe { std::mem::transmute(cb_ptr) };
let ctx = w
.instance_log_ctx
.load(std::sync::atomic::Ordering::Acquire);
let instance_id = w.instance_id.read().map(|s| s.clone()).unwrap_or_default();
Some(InstanceLogContext {
instance_id,
component_id: w.cached_id.clone(),
component_type: "reaction".to_string(),
log_cb: Some(cb),
log_ctx: ctx,
})
}
extern "C" fn start_fn<T: Reaction + 'static>(state: *mut c_void) -> FfiResult {
catch_panic_ffi(|| {
let w = unsafe { &*(state as *const ReactionWrapper<T>) };
emit_reaction_lifecycle_for(w, FfiLifecycleEventType::Starting, "");
let log_ctx = build_reaction_log_ctx(w);
let handle = (w.runtime_handle)().handle().clone();
let ptr = SendPtr(state as *const ReactionWrapper<T>);
let result = dispatch_to_runtime(&handle, async move {
set_instance_log_ctx(log_ctx);
let inner = unsafe { ptr.as_ref() };
let r = inner.inner.start().await;
clear_instance_log_ctx();
r
});
match result {
Ok(()) => {
emit_reaction_lifecycle_for(w, FfiLifecycleEventType::Started, "");
FfiResult::ok()
}
Err(e) => {
let msg = e.to_string();
emit_reaction_lifecycle_for(w, FfiLifecycleEventType::Error, &msg);
FfiResult::err(msg)
}
}
})
}
extern "C" fn stop_fn<T: Reaction + 'static>(state: *mut c_void) -> FfiResult {
catch_panic_ffi(|| {
let w = unsafe { &*(state as *const ReactionWrapper<T>) };
emit_reaction_lifecycle_for(w, FfiLifecycleEventType::Stopping, "");
let log_ctx = build_reaction_log_ctx(w);
let handle = (w.runtime_handle)().handle().clone();
let ptr = SendPtr(state as *const ReactionWrapper<T>);
let result = dispatch_to_runtime(&handle, async move {
set_instance_log_ctx(log_ctx);
let inner = unsafe { ptr.as_ref() };
let r = inner.inner.stop().await;
clear_instance_log_ctx();
r
});
match result {
Ok(()) => {
emit_reaction_lifecycle_for(w, FfiLifecycleEventType::Stopped, "");
FfiResult::ok()
}
Err(e) => {
let msg = e.to_string();
emit_reaction_lifecycle_for(w, FfiLifecycleEventType::Error, &msg);
FfiResult::err(msg)
}
}
})
}
extern "C" fn status_fn<T: Reaction + 'static>(state: *const c_void) -> FfiComponentStatus {
let w = unsafe { &*(state as *const ReactionWrapper<T>) };
let handle = (w.runtime_handle)().handle().clone();
let ptr = SendPtr(state as *const ReactionWrapper<T>);
let status = dispatch_to_runtime(&handle, async move {
let inner = unsafe { ptr.as_ref() };
inner.inner.status().await
});
component_status_to_ffi(status)
}
extern "C" fn deprovision_fn<T: Reaction + 'static>(state: *mut c_void) -> FfiResult {
catch_panic_ffi(|| {
let w = unsafe { &*(state as *const ReactionWrapper<T>) };
let handle = (w.runtime_handle)().handle().clone();
let ptr = SendPtr(state as *const ReactionWrapper<T>);
let result = dispatch_to_runtime(&handle, async move {
let inner = unsafe { ptr.as_ref() };
inner.inner.deprovision().await
});
match result {
Ok(()) => FfiResult::ok(),
Err(e) => FfiResult::err(e.to_string()),
}
})
}
extern "C" fn initialize_fn<T: Reaction + 'static>(
state: *mut c_void,
ctx: *const FfiRuntimeContext,
) {
let w = unsafe { &*(state as *const ReactionWrapper<T>) };
let ffi_ctx = unsafe { &*ctx };
if let Some(log_cb) = ffi_ctx.log_callback {
w.instance_log_cb
.store(log_cb as *mut (), std::sync::atomic::Ordering::Release);
w.instance_log_ctx
.store(ffi_ctx.log_ctx, std::sync::atomic::Ordering::Release);
}
if let Some(lifecycle_cb) = ffi_ctx.lifecycle_callback {
w.instance_lifecycle_cb.store(
lifecycle_cb as *mut (),
std::sync::atomic::Ordering::Release,
);
w.instance_lifecycle_ctx
.store(ffi_ctx.lifecycle_ctx, std::sync::atomic::Ordering::Release);
}
if let Ok(mut iid) = w.instance_id.write() {
*iid = unsafe { ffi_ctx.instance_id.to_string() };
}
let (runtime_ctx, status_rx) = build_reaction_runtime_context(ffi_ctx);
if let Ok(mut guard) = w._status_rx.lock() {
*guard = Some(status_rx);
}
let handle = (w.runtime_handle)().handle().clone();
let ptr = SendPtr(state as *const ReactionWrapper<T>);
dispatch_to_runtime(&handle, async move {
let inner = unsafe { ptr.as_ref() };
inner.inner.initialize(runtime_ctx).await
});
}
extern "C" fn start_result_push_fn<T: Reaction + 'static>(
state: *mut c_void,
callback: FfiResultPushCallbackFn,
callback_ctx: *mut c_void,
) {
let w = unsafe { &*(state as *const ReactionWrapper<T>) };
let handle = (w.runtime_handle)().handle().clone();
let ctx_raw = callback_ctx as usize;
let ptr = SendPtr(state as *const ReactionWrapper<T>);
handle.spawn(async move {
loop {
let ctx_val = ctx_raw;
let result_ptr = tokio::task::spawn_blocking(move || {
SendMutPtr(callback(ctx_val as *mut c_void, std::ptr::null_mut()))
})
.await;
let result_ptr = match result_ptr {
Ok(p) => p.as_ptr(),
Err(_) => break,
};
if result_ptr.is_null() {
break;
}
let query_result =
unsafe { *Box::from_raw(result_ptr as *mut drasi_lib::channels::QueryResult) };
let inner = unsafe { ptr.as_ref() };
if let Err(e) = inner.inner.enqueue_query_result(query_result).await {
log::error!("Failed to enqueue query result: {e}");
}
}
});
}
extern "C" fn drop_fn<T: Reaction + 'static>(state: *mut c_void) {
unsafe { drop(Box::from_raw(state as *mut ReactionWrapper<T>)) };
}
let cached_id = reaction.id().to_string();
let cached_type_name = reaction.type_name().to_string();
let wrapper = Box::new(ReactionWrapper {
inner: reaction,
cached_id,
cached_type_name,
lifecycle_emitter,
runtime_handle: runtime,
instance_log_cb: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
instance_log_ctx: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
instance_lifecycle_cb: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
instance_lifecycle_ctx: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
instance_id: std::sync::RwLock::new(String::new()),
_status_rx: std::sync::Mutex::new(None),
});
ReactionVtable {
state: Box::into_raw(wrapper) as *mut c_void,
executor,
id_fn: id_fn::<T>,
type_name_fn: type_name_fn::<T>,
auto_start_fn: auto_start_fn::<T>,
query_ids_fn: query_ids_fn::<T>,
properties_fn: properties_fn::<T>,
start_fn: start_fn::<T>,
stop_fn: stop_fn::<T>,
status_fn: status_fn::<T>,
deprovision_fn: deprovision_fn::<T>,
initialize_fn: initialize_fn::<T>,
start_result_push_fn: start_result_push_fn::<T>,
drop_fn: drop_fn::<T>,
}
}
pub fn build_reaction_vtable_from_boxed(
reaction: Box<dyn Reaction>,
executor: AsyncExecutorFn,
lifecycle_emitter: LifecycleEmitterFn,
runtime: fn() -> &'static tokio::runtime::Runtime,
) -> ReactionVtable {
struct DynReactionWrapper {
inner: Box<dyn Reaction>,
cached_id: String,
cached_type_name: String,
lifecycle_emitter: LifecycleEmitterFn,
runtime_handle: fn() -> &'static tokio::runtime::Runtime,
instance_log_cb: std::sync::atomic::AtomicPtr<()>,
instance_log_ctx: std::sync::atomic::AtomicPtr<c_void>,
instance_lifecycle_cb: std::sync::atomic::AtomicPtr<()>,
instance_lifecycle_ctx: std::sync::atomic::AtomicPtr<c_void>,
instance_id: std::sync::RwLock<String>,
_status_rx: std::sync::Mutex<Option<ComponentUpdateReceiver>>,
}
extern "C" fn id_fn(state: *const c_void) -> FfiStr {
let w = unsafe { &*(state as *const DynReactionWrapper) };
FfiStr::from_str(&w.cached_id)
}
extern "C" fn type_name_fn(state: *const c_void) -> FfiStr {
let w = unsafe { &*(state as *const DynReactionWrapper) };
FfiStr::from_str(&w.cached_type_name)
}
extern "C" fn auto_start_fn(state: *const c_void) -> bool {
let w = unsafe { &*(state as *const DynReactionWrapper) };
w.inner.auto_start()
}
extern "C" fn query_ids_fn(state: *const c_void) -> FfiStringArray {
let w = unsafe { &*(state as *const DynReactionWrapper) };
FfiStringArray::from_vec(w.inner.query_ids())
}
extern "C" fn properties_fn(state: *const c_void) -> FfiOwnedStr {
let w = unsafe { &*(state as *const DynReactionWrapper) };
let props = w.inner.properties();
let json = serde_json::to_string(&props).unwrap_or_else(|_| "{}".to_string());
FfiOwnedStr::from_string(json)
}
fn emit_dyn_reaction_lifecycle(
w: &DynReactionWrapper,
event_type: FfiLifecycleEventType,
message: &str,
) {
let cb_ptr = w
.instance_lifecycle_cb
.load(std::sync::atomic::Ordering::Acquire);
if !cb_ptr.is_null() {
let cb: super::callbacks::LifecycleCallbackFn = unsafe { std::mem::transmute(cb_ptr) };
let ctx = w
.instance_lifecycle_ctx
.load(std::sync::atomic::Ordering::Acquire);
let event = FfiLifecycleEvent {
component_id: FfiStr::from_str(&w.cached_id),
component_type: FfiStr::from_str("reaction"),
event_type,
message: FfiStr::from_str(message),
timestamp_us: super::now_us(),
};
cb(ctx, &event);
} else {
(w.lifecycle_emitter)(&w.cached_id, event_type, message);
}
}
fn build_dyn_reaction_log_ctx(w: &DynReactionWrapper) -> Option<InstanceLogContext> {
let cb_ptr = w.instance_log_cb.load(std::sync::atomic::Ordering::Acquire);
if cb_ptr.is_null() {
return None;
}
let cb: super::callbacks::LogCallbackFn = unsafe { std::mem::transmute(cb_ptr) };
let ctx = w
.instance_log_ctx
.load(std::sync::atomic::Ordering::Acquire);
let instance_id = w.instance_id.read().map(|s| s.clone()).unwrap_or_default();
Some(InstanceLogContext {
instance_id,
component_id: w.cached_id.clone(),
component_type: "reaction".to_string(),
log_cb: Some(cb),
log_ctx: ctx,
})
}
extern "C" fn start_fn(state: *mut c_void) -> FfiResult {
catch_panic_ffi(|| {
let w = unsafe { &*(state as *const DynReactionWrapper) };
emit_dyn_reaction_lifecycle(w, FfiLifecycleEventType::Starting, "");
let log_ctx = build_dyn_reaction_log_ctx(w);
let handle = (w.runtime_handle)().handle().clone();
let inner_ptr = SendPtr(state as *const DynReactionWrapper);
let result = dispatch_to_runtime(&handle, async move {
set_instance_log_ctx(log_ctx);
let inner = unsafe { inner_ptr.as_ref() };
let r = inner.inner.start().await;
clear_instance_log_ctx();
r
});
match result {
Ok(()) => {
emit_dyn_reaction_lifecycle(w, FfiLifecycleEventType::Started, "");
FfiResult::ok()
}
Err(e) => {
let msg = e.to_string();
emit_dyn_reaction_lifecycle(w, FfiLifecycleEventType::Error, &msg);
FfiResult::err(msg)
}
}
})
}
extern "C" fn stop_fn(state: *mut c_void) -> FfiResult {
catch_panic_ffi(|| {
let w = unsafe { &*(state as *const DynReactionWrapper) };
emit_dyn_reaction_lifecycle(w, FfiLifecycleEventType::Stopping, "");
let log_ctx = build_dyn_reaction_log_ctx(w);
let handle = (w.runtime_handle)().handle().clone();
let inner_ptr = SendPtr(state as *const DynReactionWrapper);
let result = dispatch_to_runtime(&handle, async move {
set_instance_log_ctx(log_ctx);
let inner = unsafe { inner_ptr.as_ref() };
let r = inner.inner.stop().await;
clear_instance_log_ctx();
r
});
match result {
Ok(()) => {
emit_dyn_reaction_lifecycle(w, FfiLifecycleEventType::Stopped, "");
FfiResult::ok()
}
Err(e) => {
let msg = e.to_string();
emit_dyn_reaction_lifecycle(w, FfiLifecycleEventType::Error, &msg);
FfiResult::err(msg)
}
}
})
}
extern "C" fn status_fn(state: *const c_void) -> FfiComponentStatus {
let w = unsafe { &*(state as *const DynReactionWrapper) };
let handle = (w.runtime_handle)().handle().clone();
let inner_ptr = SendPtr(state as *const DynReactionWrapper);
let status = dispatch_to_runtime(&handle, async move {
let inner = unsafe { inner_ptr.as_ref() };
inner.inner.status().await
});
component_status_to_ffi(status)
}
extern "C" fn deprovision_fn(state: *mut c_void) -> FfiResult {
catch_panic_ffi(|| {
let w = unsafe { &*(state as *const DynReactionWrapper) };
let handle = (w.runtime_handle)().handle().clone();
let inner_ptr = SendPtr(state as *const DynReactionWrapper);
let result = dispatch_to_runtime(&handle, async move {
let inner = unsafe { inner_ptr.as_ref() };
inner.inner.deprovision().await
});
match result {
Ok(()) => FfiResult::ok(),
Err(e) => FfiResult::err(e.to_string()),
}
})
}
extern "C" fn initialize_fn(state: *mut c_void, ctx: *const FfiRuntimeContext) {
let w = unsafe { &*(state as *const DynReactionWrapper) };
let ffi_ctx = unsafe { &*ctx };
if let Some(log_cb) = ffi_ctx.log_callback {
w.instance_log_cb
.store(log_cb as *mut (), std::sync::atomic::Ordering::Release);
w.instance_log_ctx
.store(ffi_ctx.log_ctx, std::sync::atomic::Ordering::Release);
}
if let Some(lifecycle_cb) = ffi_ctx.lifecycle_callback {
w.instance_lifecycle_cb.store(
lifecycle_cb as *mut (),
std::sync::atomic::Ordering::Release,
);
w.instance_lifecycle_ctx
.store(ffi_ctx.lifecycle_ctx, std::sync::atomic::Ordering::Release);
}
if let Ok(mut iid) = w.instance_id.write() {
*iid = unsafe { ffi_ctx.instance_id.to_string() };
}
let (runtime_ctx, status_rx) = build_reaction_runtime_context(ffi_ctx);
if let Ok(mut guard) = w._status_rx.lock() {
*guard = Some(status_rx);
}
let handle = (w.runtime_handle)().handle().clone();
let inner_ptr = SendPtr(state as *const DynReactionWrapper);
dispatch_to_runtime(&handle, async move {
let inner = unsafe { inner_ptr.as_ref() };
inner.inner.initialize(runtime_ctx).await
});
}
extern "C" fn start_result_push_fn(
state: *mut c_void,
callback: FfiResultPushCallbackFn,
callback_ctx: *mut c_void,
) {
let w = unsafe { &*(state as *const DynReactionWrapper) };
let handle = (w.runtime_handle)().handle().clone();
let ctx_raw = callback_ctx as usize;
let ptr = SendPtr(state as *const DynReactionWrapper);
handle.spawn(async move {
loop {
let ctx_val = ctx_raw;
let result_ptr = tokio::task::spawn_blocking(move || {
SendMutPtr(callback(ctx_val as *mut c_void, std::ptr::null_mut()))
})
.await;
let result_ptr = match result_ptr {
Ok(p) => p.as_ptr(),
Err(_) => break,
};
if result_ptr.is_null() {
break;
}
let query_result =
unsafe { *Box::from_raw(result_ptr as *mut drasi_lib::channels::QueryResult) };
let inner = unsafe { ptr.as_ref() };
if let Err(e) = inner.inner.enqueue_query_result(query_result).await {
log::error!("Failed to enqueue query result: {e}");
}
}
});
}
extern "C" fn drop_fn(state: *mut c_void) {
unsafe { drop(Box::from_raw(state as *mut DynReactionWrapper)) };
}
let cached_id = reaction.id().to_string();
let cached_type_name = reaction.type_name().to_string();
let wrapper = Box::new(DynReactionWrapper {
inner: reaction,
cached_id,
cached_type_name,
lifecycle_emitter,
runtime_handle: runtime,
instance_log_cb: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
instance_log_ctx: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
instance_lifecycle_cb: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
instance_lifecycle_ctx: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
instance_id: std::sync::RwLock::new(String::new()),
_status_rx: std::sync::Mutex::new(None),
});
ReactionVtable {
state: Box::into_raw(wrapper) as *mut c_void,
executor,
id_fn,
type_name_fn,
auto_start_fn,
query_ids_fn,
properties_fn,
start_fn,
stop_fn,
status_fn,
deprovision_fn,
initialize_fn,
start_result_push_fn,
drop_fn,
}
}
pub fn build_bootstrap_provider_vtable(
provider: Box<dyn BootstrapProvider>,
executor: AsyncExecutorFn,
) -> BootstrapProviderVtable {
struct BootstrapProviderWrapper {
inner: Box<dyn BootstrapProvider>,
}
extern "C" fn bootstrap_fn(
state: *mut c_void,
query_id: FfiStr,
node_labels: *const FfiStr,
node_labels_count: usize,
relation_labels: *const FfiStr,
relation_labels_count: usize,
request_id: FfiStr,
server_id: FfiStr,
source_id: FfiStr,
sender: *mut FfiBootstrapSender,
) -> i64 {
use drasi_lib::bootstrap::{BootstrapContext, BootstrapRequest};
let query_id_str = unsafe { query_id.to_string() };
let node_label_strs: Vec<String> = (0..node_labels_count)
.map(|i| unsafe { (*node_labels.add(i)).to_string() })
.collect();
let rel_label_strs: Vec<String> = (0..relation_labels_count)
.map(|i| unsafe { (*relation_labels.add(i)).to_string() })
.collect();
let request_id_str = unsafe { request_id.to_string() };
let server_id_str = unsafe { server_id.to_string() };
let source_id_str = unsafe { source_id.to_string() };
let request = BootstrapRequest {
query_id: query_id_str,
node_labels: node_label_strs,
relation_labels: rel_label_strs,
request_id: request_id_str,
};
let context = BootstrapContext::new_minimal(server_id_str, source_id_str.clone());
let ffi_sender = unsafe { &*sender };
let send_fn = ffi_sender.send_fn;
let sender_state = ffi_sender.state;
let (std_tx, std_rx) = std::sync::mpsc::channel::<BootstrapEvent>();
let (tokio_tx, mut tokio_rx) = tokio::sync::mpsc::channel::<BootstrapEvent>(100);
let provider_ptr = SendPtr(state as *const BootstrapProviderWrapper);
let _bootstrap_handle = std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build bootstrap runtime");
let inner = unsafe { provider_ptr.as_ref() };
rt.block_on(async {
let forward_handle = tokio::spawn(async move {
while let Some(record) = tokio_rx.recv().await {
if std_tx.send(record).is_err() {
break;
}
}
});
let _ = inner
.inner
.bootstrap(request, &context, tokio_tx, None)
.await;
let _ = forward_handle.await;
})
});
let mut count: usize = 0;
while let Ok(record) = std_rx.recv() {
let source_id_owned = record.source_id.clone();
let timestamp_us = record
.timestamp
.timestamp_nanos_opt()
.map(|n| n / 1000)
.unwrap_or(0);
let sequence = record.sequence;
let entity_id_str = source_change_metadata(&record.change)
.map(|m| m.reference.element_id.to_string())
.unwrap_or_default();
let label_str = source_change_metadata(&record.change)
.and_then(|m| m.labels.first().map(|l| l.to_string()))
.unwrap_or_default();
let boxed = Box::new(record);
let ptr = Box::into_raw(boxed);
let event = Box::new(FfiBootstrapEvent {
opaque: ptr as *mut c_void,
source_id: FfiStr::from_str(&source_id_owned),
timestamp_us,
sequence,
label: FfiStr::from_str(&label_str),
entity_id: FfiStr::from_str(&entity_id_str),
drop_fn: bootstrap_event_drop,
});
let event_ptr = Box::into_raw(event);
let result = (send_fn)(sender_state, event_ptr);
if result != 0 {
break;
}
count += 1;
}
count as i64
}
extern "C" fn bootstrap_event_drop(opaque: *mut c_void) {
if !opaque.is_null() {
unsafe { drop(Box::from_raw(opaque as *mut BootstrapEvent)) };
}
}
extern "C" fn drop_fn(state: *mut c_void) {
unsafe { drop(Box::from_raw(state as *mut BootstrapProviderWrapper)) };
}
let wrapper = Box::new(BootstrapProviderWrapper { inner: provider });
BootstrapProviderVtable {
state: Box::into_raw(wrapper) as *mut c_void,
executor,
bootstrap_fn,
drop_fn,
}
}
struct SourcePluginWrapper<T: SourcePluginDescriptor + 'static> {
inner: T,
cached_kind: String,
cached_config_version: String,
cached_schema_name: String,
executor: AsyncExecutorFn,
lifecycle_emitter: LifecycleEmitterFn,
runtime_handle: fn() -> &'static tokio::runtime::Runtime,
}
pub fn build_source_plugin_vtable<T: SourcePluginDescriptor + 'static>(
descriptor: T,
executor: AsyncExecutorFn,
lifecycle_emitter: LifecycleEmitterFn,
runtime: fn() -> &'static tokio::runtime::Runtime,
) -> SourcePluginVtable {
extern "C" fn kind_fn<T: SourcePluginDescriptor + 'static>(state: *const c_void) -> FfiStr {
let w = unsafe { &*(state as *const SourcePluginWrapper<T>) };
FfiStr::from_str(&w.cached_kind)
}
extern "C" fn config_version_fn<T: SourcePluginDescriptor + 'static>(
state: *const c_void,
) -> FfiStr {
let w = unsafe { &*(state as *const SourcePluginWrapper<T>) };
FfiStr::from_str(&w.cached_config_version)
}
extern "C" fn config_schema_json_fn<T: SourcePluginDescriptor + 'static>(
state: *const c_void,
) -> FfiOwnedStr {
let w = unsafe { &*(state as *const SourcePluginWrapper<T>) };
FfiOwnedStr::from_string(w.inner.config_schema_json())
}
extern "C" fn config_schema_name_fn<T: SourcePluginDescriptor + 'static>(
state: *const c_void,
) -> FfiStr {
let w = unsafe { &*(state as *const SourcePluginWrapper<T>) };
FfiStr::from_str(&w.cached_schema_name)
}
extern "C" fn create_source_fn<T: SourcePluginDescriptor + 'static>(
state: *mut c_void,
id: FfiStr,
config_json: FfiStr,
auto_start: bool,
) -> FfiCreateResult {
let w = unsafe { &*(state as *const SourcePluginWrapper<T>) };
let id_str = unsafe { id.to_string() };
let config_str = unsafe { config_json.to_string() };
let config_value: serde_json::Value = match serde_json::from_str(&config_str) {
Ok(v) => v,
Err(e) => {
let msg = format!("Failed to parse config JSON for source '{id_str}': {e}");
log::error!("{msg}");
return FfiCreateResult::err(msg);
}
};
let handle = (w.runtime_handle)().handle().clone();
let ptr = SendPtr(state as *const SourcePluginWrapper<T>);
let id_owned = id_str.clone();
let result = dispatch_to_runtime(&handle, async move {
let inner = unsafe { ptr.as_ref() };
inner
.inner
.create_source(&id_owned, &config_value, auto_start)
.await
});
match result {
Ok(source) => {
let vtable = build_source_vtable_from_boxed(
source,
w.executor,
w.lifecycle_emitter,
w.runtime_handle,
);
FfiCreateResult::ok(Box::into_raw(Box::new(vtable)))
}
Err(e) => {
let msg = format!("Failed to create source '{id_str}': {e}");
log::error!("{msg}");
FfiCreateResult::err(msg)
}
}
}
extern "C" fn drop_fn<T: SourcePluginDescriptor + 'static>(state: *mut c_void) {
unsafe { drop(Box::from_raw(state as *mut SourcePluginWrapper<T>)) };
}
let cached_kind = descriptor.kind().to_string();
let cached_config_version = descriptor.config_version().to_string();
let cached_schema_name = descriptor.config_schema_name().to_string();
let wrapper = Box::new(SourcePluginWrapper {
inner: descriptor,
cached_kind,
cached_config_version,
cached_schema_name,
executor,
lifecycle_emitter,
runtime_handle: runtime,
});
SourcePluginVtable {
state: Box::into_raw(wrapper) as *mut c_void,
executor,
kind_fn: kind_fn::<T>,
config_version_fn: config_version_fn::<T>,
config_schema_json_fn: config_schema_json_fn::<T>,
config_schema_name_fn: config_schema_name_fn::<T>,
create_source_fn: create_source_fn::<T>,
drop_fn: drop_fn::<T>,
}
}
struct ReactionPluginWrapper<T: ReactionPluginDescriptor + 'static> {
inner: T,
cached_kind: String,
cached_config_version: String,
cached_schema_name: String,
executor: AsyncExecutorFn,
lifecycle_emitter: LifecycleEmitterFn,
runtime_handle: fn() -> &'static tokio::runtime::Runtime,
}
pub fn build_reaction_plugin_vtable<T: ReactionPluginDescriptor + 'static>(
descriptor: T,
executor: AsyncExecutorFn,
lifecycle_emitter: LifecycleEmitterFn,
runtime: fn() -> &'static tokio::runtime::Runtime,
) -> ReactionPluginVtable {
extern "C" fn kind_fn<T: ReactionPluginDescriptor + 'static>(state: *const c_void) -> FfiStr {
let w = unsafe { &*(state as *const ReactionPluginWrapper<T>) };
FfiStr::from_str(&w.cached_kind)
}
extern "C" fn config_version_fn<T: ReactionPluginDescriptor + 'static>(
state: *const c_void,
) -> FfiStr {
let w = unsafe { &*(state as *const ReactionPluginWrapper<T>) };
FfiStr::from_str(&w.cached_config_version)
}
extern "C" fn config_schema_json_fn<T: ReactionPluginDescriptor + 'static>(
state: *const c_void,
) -> FfiOwnedStr {
let w = unsafe { &*(state as *const ReactionPluginWrapper<T>) };
FfiOwnedStr::from_string(w.inner.config_schema_json())
}
extern "C" fn config_schema_name_fn<T: ReactionPluginDescriptor + 'static>(
state: *const c_void,
) -> FfiStr {
let w = unsafe { &*(state as *const ReactionPluginWrapper<T>) };
FfiStr::from_str(&w.cached_schema_name)
}
extern "C" fn create_reaction_fn<T: ReactionPluginDescriptor + 'static>(
state: *mut c_void,
id: FfiStr,
query_ids_json: FfiStr,
config_json: FfiStr,
auto_start: bool,
) -> FfiCreateResult {
let w = unsafe { &*(state as *const ReactionPluginWrapper<T>) };
let id_str = unsafe { id.to_string() };
let query_ids_str = unsafe { query_ids_json.to_string() };
let config_str = unsafe { config_json.to_string() };
let query_ids: Vec<String> = match serde_json::from_str(&query_ids_str) {
Ok(v) => v,
Err(e) => {
let msg = format!("Failed to parse query_ids JSON for reaction '{id_str}': {e}");
log::error!("{msg}");
return FfiCreateResult::err(msg);
}
};
let config_value: serde_json::Value = match serde_json::from_str(&config_str) {
Ok(v) => v,
Err(e) => {
let msg = format!("Failed to parse config JSON for reaction '{id_str}': {e}");
log::error!("{msg}");
return FfiCreateResult::err(msg);
}
};
let handle = (w.runtime_handle)().handle().clone();
let ptr = SendPtr(state as *const ReactionPluginWrapper<T>);
let id_owned = id_str.clone();
let result = dispatch_to_runtime(&handle, async move {
let inner = unsafe { ptr.as_ref() };
inner
.inner
.create_reaction(&id_owned, query_ids, &config_value, auto_start)
.await
});
match result {
Ok(reaction) => {
let vtable = build_reaction_vtable_from_boxed(
reaction,
w.executor,
w.lifecycle_emitter,
w.runtime_handle,
);
FfiCreateResult::ok(Box::into_raw(Box::new(vtable)))
}
Err(e) => {
let msg = format!("Failed to create reaction '{id_str}': {e}");
log::error!("{msg}");
FfiCreateResult::err(msg)
}
}
}
extern "C" fn drop_fn<T: ReactionPluginDescriptor + 'static>(state: *mut c_void) {
unsafe { drop(Box::from_raw(state as *mut ReactionPluginWrapper<T>)) };
}
let cached_kind = descriptor.kind().to_string();
let cached_config_version = descriptor.config_version().to_string();
let cached_schema_name = descriptor.config_schema_name().to_string();
let wrapper = Box::new(ReactionPluginWrapper {
inner: descriptor,
cached_kind,
cached_config_version,
cached_schema_name,
executor,
lifecycle_emitter,
runtime_handle: runtime,
});
ReactionPluginVtable {
state: Box::into_raw(wrapper) as *mut c_void,
executor,
kind_fn: kind_fn::<T>,
config_version_fn: config_version_fn::<T>,
config_schema_json_fn: config_schema_json_fn::<T>,
config_schema_name_fn: config_schema_name_fn::<T>,
create_reaction_fn: create_reaction_fn::<T>,
drop_fn: drop_fn::<T>,
}
}
struct BootstrapPluginWrapper<T: BootstrapPluginDescriptor + 'static> {
inner: T,
cached_kind: String,
cached_config_version: String,
cached_schema_name: String,
executor: AsyncExecutorFn,
#[allow(dead_code)]
lifecycle_emitter: LifecycleEmitterFn,
runtime_handle: fn() -> &'static tokio::runtime::Runtime,
}
pub fn build_bootstrap_plugin_vtable<T: BootstrapPluginDescriptor + 'static>(
descriptor: T,
executor: AsyncExecutorFn,
lifecycle_emitter: LifecycleEmitterFn,
runtime: fn() -> &'static tokio::runtime::Runtime,
) -> BootstrapPluginVtable {
extern "C" fn kind_fn<T: BootstrapPluginDescriptor + 'static>(state: *const c_void) -> FfiStr {
let w = unsafe { &*(state as *const BootstrapPluginWrapper<T>) };
FfiStr::from_str(&w.cached_kind)
}
extern "C" fn config_version_fn<T: BootstrapPluginDescriptor + 'static>(
state: *const c_void,
) -> FfiStr {
let w = unsafe { &*(state as *const BootstrapPluginWrapper<T>) };
FfiStr::from_str(&w.cached_config_version)
}
extern "C" fn config_schema_json_fn<T: BootstrapPluginDescriptor + 'static>(
state: *const c_void,
) -> FfiOwnedStr {
let w = unsafe { &*(state as *const BootstrapPluginWrapper<T>) };
FfiOwnedStr::from_string(w.inner.config_schema_json())
}
extern "C" fn config_schema_name_fn<T: BootstrapPluginDescriptor + 'static>(
state: *const c_void,
) -> FfiStr {
let w = unsafe { &*(state as *const BootstrapPluginWrapper<T>) };
FfiStr::from_str(&w.cached_schema_name)
}
extern "C" fn create_bootstrap_provider_fn<T: BootstrapPluginDescriptor + 'static>(
state: *mut c_void,
config_json: FfiStr,
source_config_json: FfiStr,
) -> FfiCreateResult {
let w = unsafe { &*(state as *const BootstrapPluginWrapper<T>) };
let config_str = unsafe { config_json.to_string() };
let source_config_str = unsafe { source_config_json.to_string() };
let config_value: serde_json::Value = match serde_json::from_str(&config_str) {
Ok(v) => v,
Err(e) => {
let msg = format!("Failed to parse bootstrap config JSON: {e}");
log::error!("{msg}");
return FfiCreateResult::err(msg);
}
};
let source_config_value: serde_json::Value = match serde_json::from_str(&source_config_str)
{
Ok(v) => v,
Err(e) => {
let msg = format!("Failed to parse source config JSON: {e}");
log::error!("{msg}");
return FfiCreateResult::err(msg);
}
};
let handle = (w.runtime_handle)().handle().clone();
let ptr = SendPtr(state as *const BootstrapPluginWrapper<T>);
let result = dispatch_to_runtime(&handle, async move {
let inner = unsafe { ptr.as_ref() };
inner
.inner
.create_bootstrap_provider(&config_value, &source_config_value)
.await
});
match result {
Ok(provider) => {
let vtable = build_bootstrap_provider_vtable(provider, w.executor);
FfiCreateResult::ok(Box::into_raw(Box::new(vtable)))
}
Err(e) => {
let msg = format!("Failed to create bootstrap provider: {e}");
log::error!("{msg}");
FfiCreateResult::err(msg)
}
}
}
extern "C" fn drop_fn<T: BootstrapPluginDescriptor + 'static>(state: *mut c_void) {
unsafe { drop(Box::from_raw(state as *mut BootstrapPluginWrapper<T>)) };
}
let cached_kind = descriptor.kind().to_string();
let cached_config_version = descriptor.config_version().to_string();
let cached_schema_name = descriptor.config_schema_name().to_string();
let wrapper = Box::new(BootstrapPluginWrapper {
inner: descriptor,
cached_kind,
cached_config_version,
cached_schema_name,
executor,
lifecycle_emitter,
runtime_handle: runtime,
});
BootstrapPluginVtable {
state: Box::into_raw(wrapper) as *mut c_void,
executor,
kind_fn: kind_fn::<T>,
config_version_fn: config_version_fn::<T>,
config_schema_json_fn: config_schema_json_fn::<T>,
config_schema_name_fn: config_schema_name_fn::<T>,
create_bootstrap_provider_fn: create_bootstrap_provider_fn::<T>,
drop_fn: drop_fn::<T>,
}
}
fn build_source_runtime_context(
ffi_ctx: &FfiRuntimeContext,
) -> (SourceRuntimeContext, ComponentUpdateReceiver) {
let instance_id = unsafe { ffi_ctx.instance_id.to_string() };
let component_id = unsafe { ffi_ctx.component_id.to_string() };
let state_store: Option<Arc<dyn StateStoreProvider>> = if ffi_ctx.state_store.is_null() {
None
} else {
Some(Arc::new(FfiStateStoreProxy {
vtable: ffi_ctx.state_store,
}))
};
let identity_provider: Option<Arc<dyn drasi_lib::identity::IdentityProvider>> =
if ffi_ctx.identity_provider.is_null() {
None
} else {
Some(Arc::new(unsafe {
super::identity_proxy::FfiIdentityProviderProxy::new(ffi_ctx.identity_provider)
}))
};
let (update_tx, status_rx) = tokio::sync::mpsc::channel(16);
let ctx = SourceRuntimeContext {
instance_id,
source_id: component_id,
update_tx,
state_store,
identity_provider,
};
(ctx, status_rx)
}
fn build_reaction_runtime_context(
ffi_ctx: &FfiRuntimeContext,
) -> (drasi_lib::ReactionRuntimeContext, ComponentUpdateReceiver) {
let instance_id = unsafe { ffi_ctx.instance_id.to_string() };
let component_id = unsafe { ffi_ctx.component_id.to_string() };
let state_store: Option<Arc<dyn StateStoreProvider>> = if ffi_ctx.state_store.is_null() {
None
} else {
Some(Arc::new(FfiStateStoreProxy {
vtable: ffi_ctx.state_store,
}))
};
let identity_provider: Option<Arc<dyn drasi_lib::identity::IdentityProvider>> =
if ffi_ctx.identity_provider.is_null() {
None
} else {
Some(Arc::new(unsafe {
super::identity_proxy::FfiIdentityProviderProxy::new(ffi_ctx.identity_provider)
}))
};
let (update_tx, status_rx) = tokio::sync::mpsc::channel(16);
let ctx = drasi_lib::ReactionRuntimeContext {
instance_id,
reaction_id: component_id,
update_tx,
state_store,
identity_provider,
};
(ctx, status_rx)
}
fn wrap_subscription_response(
sub: drasi_lib::SubscriptionResponse,
executor: AsyncExecutorFn,
runtime_handle: tokio::runtime::Handle,
) -> *mut FfiSubscriptionResponse {
use super::vtables::FfiChangePushCallbackFn;
use drasi_lib::channels::events::SourceEventWrapper;
struct DrasiLibChangeReceiver {
inner: tokio::sync::Mutex<Box<dyn ChangeReceiver<SourceEventWrapper>>>,
runtime_handle: tokio::runtime::Handle,
shutdown: Arc<tokio::sync::Notify>,
}
struct DrasiLibChangeReceiverHandle {
receiver: Arc<DrasiLibChangeReceiver>,
}
fn wrap_source_event(wrapper: Arc<SourceEventWrapper>) -> *mut FfiSourceEvent {
let op = match &wrapper.event {
drasi_lib::channels::events::SourceEvent::Change(change) => match change {
SourceChange::Insert { .. } => FfiChangeOp::Insert,
SourceChange::Update { .. } => FfiChangeOp::Update,
SourceChange::Delete { .. } => FfiChangeOp::Delete,
SourceChange::Future { .. } => FfiChangeOp::Update,
},
_ => FfiChangeOp::Update,
};
let timestamp_us = wrapper
.timestamp
.timestamp_nanos_opt()
.map(|n| n / 1000)
.unwrap_or(0);
let boxed = Box::new(Arc::try_unwrap(wrapper).unwrap_or_else(|arc| (*arc).clone()));
let source_id = FfiStr::from_str(&boxed.source_id);
let opaque = Box::into_raw(boxed) as *mut c_void;
extern "C" fn drop_wrapper(ptr: *mut c_void) {
unsafe { drop(Box::from_raw(ptr as *mut SourceEventWrapper)) };
}
Box::into_raw(Box::new(FfiSourceEvent {
opaque,
source_id,
timestamp_us,
op,
label: FfiStr::from_str(""),
entity_id: FfiStr::from_str(""),
drop_fn: drop_wrapper,
}))
}
extern "C" fn start_push_fn(
state: *mut c_void,
callback: FfiChangePushCallbackFn,
callback_ctx: *mut c_void,
) {
let handle = unsafe { &*(state as *const DrasiLibChangeReceiverHandle) };
let receiver = handle.receiver.clone();
let ctx = SendMutPtr(callback_ctx);
let shutdown = receiver.shutdown.clone();
let rt_handle = receiver.runtime_handle.clone();
rt_handle.spawn(async move {
let mut rx = receiver.inner.lock().await;
loop {
tokio::select! {
_ = shutdown.notified() => {
callback(ctx.as_ptr(), std::ptr::null_mut());
break;
}
result = rx.recv() => {
match result {
Ok(wrapper) => {
let ffi_event = wrap_source_event(wrapper);
let accepted = callback(ctx.as_ptr(), ffi_event);
if !accepted {
break;
}
}
Err(_) => {
callback(ctx.as_ptr(), std::ptr::null_mut());
break;
}
}
}
}
}
});
}
extern "C" fn change_receiver_drop(state: *mut c_void) {
let handle = unsafe { Box::from_raw(state as *mut DrasiLibChangeReceiverHandle) };
handle.receiver.shutdown.notify_one();
drop(handle);
}
let ffi_receiver = Box::new(DrasiLibChangeReceiverHandle {
receiver: Arc::new(DrasiLibChangeReceiver {
inner: tokio::sync::Mutex::new(sub.receiver),
runtime_handle: runtime_handle.clone(),
shutdown: Arc::new(tokio::sync::Notify::new()),
}),
});
let ffi_rx = Box::new(FfiChangeReceiver {
state: Box::into_raw(ffi_receiver) as *mut c_void,
executor,
start_push_fn,
drop_fn: change_receiver_drop,
});
let bootstrap_receiver = if let Some(brx) = sub.bootstrap_receiver {
use super::vtables::FfiBootstrapPushCallbackFn;
struct DrasiLibBootstrapReceiver {
inner: tokio::sync::Mutex<tokio::sync::mpsc::Receiver<BootstrapEvent>>,
runtime_handle: tokio::runtime::Handle,
shutdown: Arc<tokio::sync::Notify>,
}
struct DrasiLibBootstrapReceiverHandle {
receiver: Arc<DrasiLibBootstrapReceiver>,
}
fn wrap_bootstrap_event(record: BootstrapEvent) -> *mut FfiBootstrapEvent {
let source_id_owned = record.source_id.clone();
let timestamp_us = record
.timestamp
.timestamp_nanos_opt()
.map(|n| n / 1000)
.unwrap_or(0);
let sequence = record.sequence;
let entity_id_str = source_change_metadata(&record.change)
.map(|m| m.reference.element_id.to_string())
.unwrap_or_default();
let label_str = source_change_metadata(&record.change)
.and_then(|m| m.labels.first().map(|l| l.to_string()))
.unwrap_or_default();
let boxed = Box::new(record);
let opaque = Box::into_raw(boxed) as *mut c_void;
extern "C" fn drop_bootstrap(ptr: *mut c_void) {
unsafe { drop(Box::from_raw(ptr as *mut BootstrapEvent)) };
}
Box::into_raw(Box::new(FfiBootstrapEvent {
opaque,
source_id: FfiStr::from_str(&source_id_owned),
timestamp_us,
sequence,
label: FfiStr::from_str(&label_str),
entity_id: FfiStr::from_str(&entity_id_str),
drop_fn: drop_bootstrap,
}))
}
extern "C" fn bootstrap_start_push(
state: *mut c_void,
callback: FfiBootstrapPushCallbackFn,
callback_ctx: *mut c_void,
) {
let handle = unsafe { &*(state as *const DrasiLibBootstrapReceiverHandle) };
let receiver = handle.receiver.clone();
let ctx = SendMutPtr(callback_ctx);
let shutdown = receiver.shutdown.clone();
let rt_handle = receiver.runtime_handle.clone();
rt_handle.spawn(async move {
let mut rx = receiver.inner.lock().await;
loop {
tokio::select! {
_ = shutdown.notified() => {
callback(ctx.as_ptr(), std::ptr::null_mut());
break;
}
result = rx.recv() => {
match result {
Some(record) => {
let ffi_event = wrap_bootstrap_event(record);
let accepted = callback(ctx.as_ptr(), ffi_event);
if !accepted {
break;
}
}
None => {
callback(ctx.as_ptr(), std::ptr::null_mut());
break;
}
}
}
}
}
});
}
extern "C" fn bootstrap_drop(state: *mut c_void) {
let handle = unsafe { Box::from_raw(state as *mut DrasiLibBootstrapReceiverHandle) };
handle.receiver.shutdown.notify_one();
drop(handle);
}
let ffi_brx = Box::new(DrasiLibBootstrapReceiverHandle {
receiver: Arc::new(DrasiLibBootstrapReceiver {
inner: tokio::sync::Mutex::new(brx),
runtime_handle: runtime_handle.clone(),
shutdown: Arc::new(tokio::sync::Notify::new()),
}),
});
let ffi_brx_wrapper = Box::new(FfiBootstrapReceiver {
state: Box::into_raw(ffi_brx) as *mut c_void,
start_push_fn: bootstrap_start_push,
drop_fn: bootstrap_drop,
});
Box::into_raw(ffi_brx_wrapper)
} else {
std::ptr::null_mut()
};
Box::into_raw(Box::new(FfiSubscriptionResponse {
query_id: FfiOwnedStr::from_string(sub.query_id),
source_id: FfiOwnedStr::from_string(sub.source_id),
receiver: Box::into_raw(ffi_rx),
bootstrap_receiver,
}))
}