1use std::ffi::c_void;
22use std::sync::{Arc, Mutex, OnceLock};
23
24use drasi_lib::channels::events::{ComponentEvent, ComponentStatus, ComponentType};
25use drasi_lib::component_graph::{ComponentUpdate, ComponentUpdateSender};
26use drasi_lib::managers::{ComponentEventHistory, ComponentLogRegistry, LogLevel, LogMessage};
27use drasi_plugin_sdk::ffi::{
28 FfiLifecycleEvent, FfiLifecycleEventType, FfiLogEntry, FfiLogLevel, LifecycleCallbackFn,
29 LogCallbackFn,
30};
31use tokio::sync::RwLock;
32
33pub struct CallbackContext {
42 pub instance_id: String,
44 pub runtime_handle: tokio::runtime::Handle,
46 pub log_registry: Arc<ComponentLogRegistry>,
48 pub source_event_history: Arc<RwLock<ComponentEventHistory>>,
50 pub reaction_event_history: Arc<RwLock<ComponentEventHistory>>,
52}
53
54unsafe impl Send for CallbackContext {}
56unsafe impl Sync for CallbackContext {}
57
58impl CallbackContext {
59 pub fn into_raw(self: Arc<Self>) -> *mut c_void {
62 Arc::into_raw(self) as *mut c_void
63 }
64
65 unsafe fn from_raw_ref<'a>(ptr: *mut c_void) -> &'a Self {
70 &*(ptr as *const Self)
71 }
72}
73
74pub struct InstanceCallbackContext {
80 pub instance_id: String,
82 pub runtime_handle: tokio::runtime::Handle,
84 pub log_registry: Arc<ComponentLogRegistry>,
86 pub update_tx: ComponentUpdateSender,
88}
89
90unsafe impl Send for InstanceCallbackContext {}
92unsafe impl Sync for InstanceCallbackContext {}
93
94impl InstanceCallbackContext {
95 pub fn into_raw(self: Arc<Self>) -> *mut c_void {
96 Arc::into_raw(self) as *mut c_void
97 }
98
99 unsafe fn from_raw_ref<'a>(ptr: *mut c_void) -> &'a Self {
100 &*(ptr as *const Self)
101 }
102}
103
104#[derive(Debug, Clone)]
106pub struct CapturedLog {
107 pub level: FfiLogLevel,
108 pub plugin_id: String,
109 pub message: String,
110}
111
112#[derive(Debug, Clone)]
114pub struct CapturedLifecycle {
115 pub component_id: String,
116 pub event_type: FfiLifecycleEventType,
117 pub message: String,
118}
119
120pub fn captured_logs() -> &'static Mutex<Vec<CapturedLog>> {
122 static LOGS: OnceLock<Mutex<Vec<CapturedLog>>> = OnceLock::new();
123 LOGS.get_or_init(|| Mutex::new(Vec::new()))
124}
125
126pub fn captured_lifecycles() -> &'static Mutex<Vec<CapturedLifecycle>> {
128 static EVENTS: OnceLock<Mutex<Vec<CapturedLifecycle>>> = OnceLock::new();
129 EVENTS.get_or_init(|| Mutex::new(Vec::new()))
130}
131
132fn ffi_log_level_to_log_level(level: FfiLogLevel) -> LogLevel {
133 match level {
134 FfiLogLevel::Error => LogLevel::Error,
135 FfiLogLevel::Warn => LogLevel::Warn,
136 FfiLogLevel::Info => LogLevel::Info,
137 FfiLogLevel::Debug => LogLevel::Debug,
138 FfiLogLevel::Trace => LogLevel::Trace,
139 }
140}
141
142fn ffi_log_level_to_std_level(level: FfiLogLevel) -> log::Level {
143 match level {
144 FfiLogLevel::Error => log::Level::Error,
145 FfiLogLevel::Warn => log::Level::Warn,
146 FfiLogLevel::Info => log::Level::Info,
147 FfiLogLevel::Debug => log::Level::Debug,
148 FfiLogLevel::Trace => log::Level::Trace,
149 }
150}
151
152fn parse_component_type(s: &str) -> ComponentType {
153 match s {
154 "source" => ComponentType::Source,
155 "query" => ComponentType::Query,
156 "reaction" => ComponentType::Reaction,
157 _ => ComponentType::Source, }
159}
160
161fn ffi_lifecycle_to_component_status(event_type: FfiLifecycleEventType) -> ComponentStatus {
162 match event_type {
163 FfiLifecycleEventType::Starting => ComponentStatus::Starting,
164 FfiLifecycleEventType::Started => ComponentStatus::Running,
165 FfiLifecycleEventType::Stopping => ComponentStatus::Stopping,
166 FfiLifecycleEventType::Stopped => ComponentStatus::Stopped,
167 FfiLifecycleEventType::Error => ComponentStatus::Error,
168 }
169}
170
171#[allow(clippy::not_unsafe_ptr_arg_deref)]
185pub extern "C" fn default_log_callback(ctx: *mut c_void, entry: *const FfiLogEntry) {
186 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
190 let entry = unsafe { &*entry };
191 let plugin_id = unsafe { entry.plugin_id.to_string() };
192 let message = unsafe { entry.message.to_string() };
193 let instance_id = unsafe { entry.instance_id.to_string() };
194 let component_id = unsafe { entry.component_id.to_string() };
195 let level = entry.level;
196
197 log::log!(
199 ffi_log_level_to_std_level(level),
200 "[plugin:{}] {}",
201 if component_id.is_empty() {
202 &plugin_id
203 } else {
204 &component_id
205 },
206 message
207 );
208
209 if let Ok(mut logs) = captured_logs().lock() {
213 logs.push(CapturedLog {
214 level,
215 plugin_id: plugin_id.clone(),
216 message: message.clone(),
217 });
218 }
219
220 if !ctx.is_null() && !instance_id.is_empty() && !component_id.is_empty() {
222 let context = unsafe { CallbackContext::from_raw_ref(ctx) };
223 let log_message = LogMessage::with_instance(
224 ffi_log_level_to_log_level(level),
225 message,
226 &instance_id,
227 &component_id,
228 ComponentType::Source, );
230 let registry = context.log_registry.clone();
231 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
234 registry.try_log(log_message);
235 }));
236 }
237 }));
238}
239
240#[allow(clippy::not_unsafe_ptr_arg_deref)]
245pub extern "C" fn default_lifecycle_callback(ctx: *mut c_void, event: *const FfiLifecycleEvent) {
246 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
247 let event = unsafe { &*event };
248 let component_id = unsafe { event.component_id.to_string() };
249 let component_type_str = unsafe { event.component_type.to_string() };
250 let message = unsafe { event.message.to_string() };
251 let event_type = event.event_type;
252
253 log::debug!("Lifecycle: {component_id} ({component_type_str}) {event_type:?} {message}");
254
255 if let Ok(mut events) = captured_lifecycles().lock() {
257 events.push(CapturedLifecycle {
258 component_id: component_id.clone(),
259 event_type,
260 message: message.clone(),
261 });
262 }
263
264 if !ctx.is_null() {
266 let context = unsafe { CallbackContext::from_raw_ref(ctx) };
267 let component_type = parse_component_type(&component_type_str);
268 let status = ffi_lifecycle_to_component_status(event_type);
269
270 let component_event = ComponentEvent {
271 component_id,
272 component_type: component_type.clone(),
273 status,
274 timestamp: chrono::Utc::now(),
275 message: if message.is_empty() {
276 None
277 } else {
278 Some(message)
279 },
280 };
281
282 let event_history = match component_type {
284 ComponentType::Reaction => context.reaction_event_history.clone(),
285 _ => context.source_event_history.clone(),
286 };
287 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
290 if let Ok(mut history) = event_history.try_write() {
291 history.record_event(component_event);
292 }
293 }));
294 }
295 }));
296}
297
298pub fn default_log_callback_fn() -> LogCallbackFn {
300 default_log_callback
301}
302
303pub fn default_lifecycle_callback_fn() -> LifecycleCallbackFn {
305 default_lifecycle_callback
306}
307
308#[allow(clippy::not_unsafe_ptr_arg_deref)]
321pub extern "C" fn instance_log_callback(ctx: *mut c_void, entry: *const FfiLogEntry) {
322 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
323 let entry = unsafe { &*entry };
324 let plugin_id = unsafe { entry.plugin_id.to_string() };
325 let message = unsafe { entry.message.to_string() };
326 let instance_id = unsafe { entry.instance_id.to_string() };
327 let component_id = unsafe { entry.component_id.to_string() };
328 let level = entry.level;
329
330 log::log!(
332 ffi_log_level_to_std_level(level),
333 "[plugin:{}] {}",
334 if component_id.is_empty() {
335 &plugin_id
336 } else {
337 &component_id
338 },
339 message
340 );
341
342 if let Ok(mut logs) = captured_logs().lock() {
344 logs.push(CapturedLog {
345 level,
346 plugin_id: plugin_id.clone(),
347 message: message.clone(),
348 });
349 }
350
351 if !ctx.is_null() {
353 let context = unsafe { InstanceCallbackContext::from_raw_ref(ctx) };
354 let log_instance_id = if instance_id.is_empty() {
357 &context.instance_id
358 } else {
359 &instance_id
360 };
361 let log_component_id = if component_id.is_empty() {
362 &plugin_id
363 } else {
364 &component_id
365 };
366 let log_message = LogMessage::with_instance(
367 ffi_log_level_to_log_level(level),
368 message,
369 log_instance_id,
370 log_component_id,
371 ComponentType::Source,
372 );
373 let registry = context.log_registry.clone();
374 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
379 registry.try_log(log_message);
380 }));
381 }
382 }));
383}
384
385#[allow(clippy::not_unsafe_ptr_arg_deref)]
391pub extern "C" fn instance_lifecycle_callback(ctx: *mut c_void, event: *const FfiLifecycleEvent) {
392 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
393 let event = unsafe { &*event };
394 let component_id = unsafe { event.component_id.to_string() };
395 let component_type_str = unsafe { event.component_type.to_string() };
396 let message = unsafe { event.message.to_string() };
397 let event_type = event.event_type;
398
399 log::debug!(
400 "Lifecycle [instance]: {component_id} ({component_type_str}) {event_type:?} {message}"
401 );
402
403 if let Ok(mut events) = captured_lifecycles().lock() {
405 events.push(CapturedLifecycle {
406 component_id: component_id.clone(),
407 event_type,
408 message: message.clone(),
409 });
410 }
411
412 if !ctx.is_null() {
414 let context = unsafe { InstanceCallbackContext::from_raw_ref(ctx) };
415 let status = ffi_lifecycle_to_component_status(event_type);
416
417 let update = ComponentUpdate::Status {
418 component_id,
419 status,
420 message: if message.is_empty() {
421 None
422 } else {
423 Some(message)
424 },
425 };
426
427 let tx = context.update_tx.clone();
428 if let Err(e) = tx.try_send(update) {
431 log::error!("Failed to send lifecycle event: {e}");
432 }
433 }
434 }));
435}
436
437pub fn instance_log_callback_fn() -> LogCallbackFn {
439 instance_log_callback
440}
441
442pub fn instance_lifecycle_callback_fn() -> LifecycleCallbackFn {
444 instance_lifecycle_callback
445}