use std::ffi::c_void;
use std::sync::{Arc, Mutex, OnceLock};
use drasi_lib::channels::events::{ComponentEvent, ComponentStatus, ComponentType};
use drasi_lib::component_graph::{ComponentUpdate, ComponentUpdateSender};
use drasi_lib::managers::{ComponentEventHistory, ComponentLogRegistry, LogLevel, LogMessage};
use drasi_plugin_sdk::ffi::{
FfiLifecycleEvent, FfiLifecycleEventType, FfiLogEntry, FfiLogLevel, LifecycleCallbackFn,
LogCallbackFn,
};
use tokio::sync::RwLock;
pub struct CallbackContext {
pub instance_id: String,
pub runtime_handle: tokio::runtime::Handle,
pub log_registry: Arc<ComponentLogRegistry>,
pub source_event_history: Arc<RwLock<ComponentEventHistory>>,
pub reaction_event_history: Arc<RwLock<ComponentEventHistory>>,
}
unsafe impl Send for CallbackContext {}
unsafe impl Sync for CallbackContext {}
impl CallbackContext {
pub fn into_raw(self: Arc<Self>) -> *mut c_void {
Arc::into_raw(self) as *mut c_void
}
unsafe fn from_raw_ref<'a>(ptr: *mut c_void) -> &'a Self {
&*(ptr as *const Self)
}
}
pub struct InstanceCallbackContext {
pub instance_id: String,
pub runtime_handle: tokio::runtime::Handle,
pub log_registry: Arc<ComponentLogRegistry>,
pub update_tx: ComponentUpdateSender,
}
unsafe impl Send for InstanceCallbackContext {}
unsafe impl Sync for InstanceCallbackContext {}
impl InstanceCallbackContext {
pub fn into_raw(self: Arc<Self>) -> *mut c_void {
Arc::into_raw(self) as *mut c_void
}
unsafe fn from_raw_ref<'a>(ptr: *mut c_void) -> &'a Self {
&*(ptr as *const Self)
}
}
#[derive(Debug, Clone)]
pub struct CapturedLog {
pub level: FfiLogLevel,
pub plugin_id: String,
pub message: String,
}
#[derive(Debug, Clone)]
pub struct CapturedLifecycle {
pub component_id: String,
pub event_type: FfiLifecycleEventType,
pub message: String,
}
pub fn captured_logs() -> &'static Mutex<Vec<CapturedLog>> {
static LOGS: OnceLock<Mutex<Vec<CapturedLog>>> = OnceLock::new();
LOGS.get_or_init(|| Mutex::new(Vec::new()))
}
pub fn captured_lifecycles() -> &'static Mutex<Vec<CapturedLifecycle>> {
static EVENTS: OnceLock<Mutex<Vec<CapturedLifecycle>>> = OnceLock::new();
EVENTS.get_or_init(|| Mutex::new(Vec::new()))
}
fn ffi_log_level_to_log_level(level: FfiLogLevel) -> LogLevel {
match level {
FfiLogLevel::Error => LogLevel::Error,
FfiLogLevel::Warn => LogLevel::Warn,
FfiLogLevel::Info => LogLevel::Info,
FfiLogLevel::Debug => LogLevel::Debug,
FfiLogLevel::Trace => LogLevel::Trace,
}
}
fn ffi_log_level_to_std_level(level: FfiLogLevel) -> log::Level {
match level {
FfiLogLevel::Error => log::Level::Error,
FfiLogLevel::Warn => log::Level::Warn,
FfiLogLevel::Info => log::Level::Info,
FfiLogLevel::Debug => log::Level::Debug,
FfiLogLevel::Trace => log::Level::Trace,
}
}
fn parse_component_type(s: &str) -> ComponentType {
match s {
"source" => ComponentType::Source,
"query" => ComponentType::Query,
"reaction" => ComponentType::Reaction,
_ => ComponentType::Source, }
}
fn ffi_lifecycle_to_component_status(event_type: FfiLifecycleEventType) -> ComponentStatus {
match event_type {
FfiLifecycleEventType::Starting => ComponentStatus::Starting,
FfiLifecycleEventType::Started => ComponentStatus::Running,
FfiLifecycleEventType::Stopping => ComponentStatus::Stopping,
FfiLifecycleEventType::Stopped => ComponentStatus::Stopped,
FfiLifecycleEventType::Error => ComponentStatus::Error,
}
}
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn default_log_callback(ctx: *mut c_void, entry: *const FfiLogEntry) {
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let entry = unsafe { &*entry };
let plugin_id = unsafe { entry.plugin_id.to_string() };
let message = unsafe { entry.message.to_string() };
let instance_id = unsafe { entry.instance_id.to_string() };
let component_id = unsafe { entry.component_id.to_string() };
let level = entry.level;
log::log!(
ffi_log_level_to_std_level(level),
"[plugin:{}] {}",
if component_id.is_empty() {
&plugin_id
} else {
&component_id
},
message
);
if let Ok(mut logs) = captured_logs().lock() {
logs.push(CapturedLog {
level,
plugin_id: plugin_id.clone(),
message: message.clone(),
});
}
if !ctx.is_null() && !instance_id.is_empty() && !component_id.is_empty() {
let context = unsafe { CallbackContext::from_raw_ref(ctx) };
let log_message = LogMessage::with_instance(
ffi_log_level_to_log_level(level),
message,
&instance_id,
&component_id,
ComponentType::Source, );
let registry = context.log_registry.clone();
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
registry.try_log(log_message);
}));
}
}));
}
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn default_lifecycle_callback(ctx: *mut c_void, event: *const FfiLifecycleEvent) {
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let event = unsafe { &*event };
let component_id = unsafe { event.component_id.to_string() };
let component_type_str = unsafe { event.component_type.to_string() };
let message = unsafe { event.message.to_string() };
let event_type = event.event_type;
log::debug!("Lifecycle: {component_id} ({component_type_str}) {event_type:?} {message}");
if let Ok(mut events) = captured_lifecycles().lock() {
events.push(CapturedLifecycle {
component_id: component_id.clone(),
event_type,
message: message.clone(),
});
}
if !ctx.is_null() {
let context = unsafe { CallbackContext::from_raw_ref(ctx) };
let component_type = parse_component_type(&component_type_str);
let status = ffi_lifecycle_to_component_status(event_type);
let component_event = ComponentEvent {
component_id,
component_type: component_type.clone(),
status,
timestamp: chrono::Utc::now(),
message: if message.is_empty() {
None
} else {
Some(message)
},
};
let event_history = match component_type {
ComponentType::Reaction => context.reaction_event_history.clone(),
_ => context.source_event_history.clone(),
};
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
if let Ok(mut history) = event_history.try_write() {
history.record_event(component_event);
}
}));
}
}));
}
pub fn default_log_callback_fn() -> LogCallbackFn {
default_log_callback
}
pub fn default_lifecycle_callback_fn() -> LifecycleCallbackFn {
default_lifecycle_callback
}
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn instance_log_callback(ctx: *mut c_void, entry: *const FfiLogEntry) {
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let entry = unsafe { &*entry };
let plugin_id = unsafe { entry.plugin_id.to_string() };
let message = unsafe { entry.message.to_string() };
let instance_id = unsafe { entry.instance_id.to_string() };
let component_id = unsafe { entry.component_id.to_string() };
let level = entry.level;
log::log!(
ffi_log_level_to_std_level(level),
"[plugin:{}] {}",
if component_id.is_empty() {
&plugin_id
} else {
&component_id
},
message
);
if let Ok(mut logs) = captured_logs().lock() {
logs.push(CapturedLog {
level,
plugin_id: plugin_id.clone(),
message: message.clone(),
});
}
if !ctx.is_null() {
let context = unsafe { InstanceCallbackContext::from_raw_ref(ctx) };
let log_instance_id = if instance_id.is_empty() {
&context.instance_id
} else {
&instance_id
};
let log_component_id = if component_id.is_empty() {
&plugin_id
} else {
&component_id
};
let log_message = LogMessage::with_instance(
ffi_log_level_to_log_level(level),
message,
log_instance_id,
log_component_id,
ComponentType::Source,
);
let registry = context.log_registry.clone();
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
registry.try_log(log_message);
}));
}
}));
}
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn instance_lifecycle_callback(ctx: *mut c_void, event: *const FfiLifecycleEvent) {
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let event = unsafe { &*event };
let component_id = unsafe { event.component_id.to_string() };
let component_type_str = unsafe { event.component_type.to_string() };
let message = unsafe { event.message.to_string() };
let event_type = event.event_type;
log::debug!(
"Lifecycle [instance]: {component_id} ({component_type_str}) {event_type:?} {message}"
);
if let Ok(mut events) = captured_lifecycles().lock() {
events.push(CapturedLifecycle {
component_id: component_id.clone(),
event_type,
message: message.clone(),
});
}
if !ctx.is_null() {
let context = unsafe { InstanceCallbackContext::from_raw_ref(ctx) };
let status = ffi_lifecycle_to_component_status(event_type);
let update = ComponentUpdate::Status {
component_id,
status,
message: if message.is_empty() {
None
} else {
Some(message)
},
};
let tx = context.update_tx.clone();
if let Err(e) = tx.try_send(update) {
log::error!("Failed to send lifecycle event: {e}");
}
}
}));
}
pub fn instance_log_callback_fn() -> LogCallbackFn {
instance_log_callback
}
pub fn instance_lifecycle_callback_fn() -> LifecycleCallbackFn {
instance_lifecycle_callback
}