Skip to main content

drasi_host_sdk/
callbacks.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Log and lifecycle callback wiring for host-side plugin management.
16//!
17//! The host creates a [`CallbackContext`] per DrasiLib instance and passes it
18//! as an opaque `*mut c_void` to each plugin. The callbacks then route logs
19//! and lifecycle events into the DrasiLib systems that the REST API reads from.
20
21use std::ffi::c_void;
22use std::sync::{Arc, Mutex, OnceLock};
23
24use drasi_lib::channels::events::{ComponentEvent, ComponentStatus, ComponentType};
25use drasi_lib::component_graph::{ComponentUpdate, ComponentUpdateSender};
26use drasi_lib::managers::{ComponentEventHistory, ComponentLogRegistry, LogLevel, LogMessage};
27use drasi_plugin_sdk::ffi::{
28    FfiLifecycleEvent, FfiLifecycleEventType, FfiLogEntry, FfiLogLevel, LifecycleCallbackFn,
29    LogCallbackFn,
30};
31use tokio::sync::RwLock;
32
33/// Spawn an async future on the host tokio runtime.
34///
35/// Callbacks are `extern "C"` functions invoked from within a plugin's own tokio
36/// runtime, so we cannot call `block_on` on the host runtime. Instead we
37/// Host-side callback context that routes plugin logs and events into DrasiLib.
38///
39/// One context is created per DrasiLib instance. The host passes a raw pointer
40/// to this struct as the `ctx` argument when setting callbacks on plugins.
41pub struct CallbackContext {
42    /// The DrasiLib instance ID that owns the plugins using this context.
43    pub instance_id: String,
44    /// Handle to the host tokio runtime for dispatching async callback work.
45    pub runtime_handle: tokio::runtime::Handle,
46    /// The global log registry (shared across all DrasiLib instances).
47    pub log_registry: Arc<ComponentLogRegistry>,
48    /// The event history for the owning DrasiLib instance's sources.
49    pub source_event_history: Arc<RwLock<ComponentEventHistory>>,
50    /// The event history for the owning DrasiLib instance's reactions.
51    pub reaction_event_history: Arc<RwLock<ComponentEventHistory>>,
52}
53
54// Safety: CallbackContext only contains Arc/RwLock types which are Send+Sync.
55unsafe impl Send for CallbackContext {}
56unsafe impl Sync for CallbackContext {}
57
58impl CallbackContext {
59    /// Convert to a raw pointer for passing through FFI.
60    /// The caller must ensure the context lives as long as plugins use it.
61    pub fn into_raw(self: Arc<Self>) -> *mut c_void {
62        Arc::into_raw(self) as *mut c_void
63    }
64
65    /// Reconstruct from a raw pointer (does NOT take ownership — just borrows).
66    ///
67    /// # Safety
68    /// The pointer must have been created by `into_raw` and the `Arc` must still be alive.
69    unsafe fn from_raw_ref<'a>(ptr: *mut c_void) -> &'a Self {
70        &*(ptr as *const Self)
71    }
72}
73
74/// Per-source/reaction-instance callback context.
75///
76/// Created during `SourceProxy.initialize()` / `ReactionProxy.initialize()`.
77/// Uses the `ComponentUpdateSender` channel from the runtime context so
78/// lifecycle events flow through the ComponentGraph update loop.
79pub struct InstanceCallbackContext {
80    /// The DrasiLib instance ID.
81    pub instance_id: String,
82    /// Handle to the host tokio runtime for dispatching async callback work.
83    pub runtime_handle: tokio::runtime::Handle,
84    /// The global log registry.
85    pub log_registry: Arc<ComponentLogRegistry>,
86    /// Channel for status updates to the ComponentGraph.
87    pub update_tx: ComponentUpdateSender,
88}
89
90// Safety: contains only Arc and tokio mpsc::Sender (which is Send+Sync).
91unsafe impl Send for InstanceCallbackContext {}
92unsafe impl Sync for InstanceCallbackContext {}
93
94impl InstanceCallbackContext {
95    pub fn into_raw(self: Arc<Self>) -> *mut c_void {
96        Arc::into_raw(self) as *mut c_void
97    }
98
99    unsafe fn from_raw_ref<'a>(ptr: *mut c_void) -> &'a Self {
100        &*(ptr as *const Self)
101    }
102}
103
104/// A captured log entry from a plugin (for testing/diagnostics).
105#[derive(Debug, Clone)]
106pub struct CapturedLog {
107    pub level: FfiLogLevel,
108    pub plugin_id: String,
109    pub message: String,
110}
111
112/// A captured lifecycle event from a plugin (for testing/diagnostics).
113#[derive(Debug, Clone)]
114pub struct CapturedLifecycle {
115    pub component_id: String,
116    pub event_type: FfiLifecycleEventType,
117    pub message: String,
118}
119
120/// Access the global captured log store (for testing/diagnostics).
121pub fn captured_logs() -> &'static Mutex<Vec<CapturedLog>> {
122    static LOGS: OnceLock<Mutex<Vec<CapturedLog>>> = OnceLock::new();
123    LOGS.get_or_init(|| Mutex::new(Vec::new()))
124}
125
126/// Access the global captured lifecycle event store (for testing/diagnostics).
127pub fn captured_lifecycles() -> &'static Mutex<Vec<CapturedLifecycle>> {
128    static EVENTS: OnceLock<Mutex<Vec<CapturedLifecycle>>> = OnceLock::new();
129    EVENTS.get_or_init(|| Mutex::new(Vec::new()))
130}
131
132fn ffi_log_level_to_log_level(level: FfiLogLevel) -> LogLevel {
133    match level {
134        FfiLogLevel::Error => LogLevel::Error,
135        FfiLogLevel::Warn => LogLevel::Warn,
136        FfiLogLevel::Info => LogLevel::Info,
137        FfiLogLevel::Debug => LogLevel::Debug,
138        FfiLogLevel::Trace => LogLevel::Trace,
139    }
140}
141
142fn ffi_log_level_to_std_level(level: FfiLogLevel) -> log::Level {
143    match level {
144        FfiLogLevel::Error => log::Level::Error,
145        FfiLogLevel::Warn => log::Level::Warn,
146        FfiLogLevel::Info => log::Level::Info,
147        FfiLogLevel::Debug => log::Level::Debug,
148        FfiLogLevel::Trace => log::Level::Trace,
149    }
150}
151
152fn parse_component_type(s: &str) -> ComponentType {
153    match s {
154        "source" => ComponentType::Source,
155        "query" => ComponentType::Query,
156        "reaction" => ComponentType::Reaction,
157        _ => ComponentType::Source, // default for "plugin" or unknown
158    }
159}
160
161fn ffi_lifecycle_to_component_status(event_type: FfiLifecycleEventType) -> ComponentStatus {
162    match event_type {
163        FfiLifecycleEventType::Starting => ComponentStatus::Starting,
164        FfiLifecycleEventType::Started => ComponentStatus::Running,
165        FfiLifecycleEventType::Stopping => ComponentStatus::Stopping,
166        FfiLifecycleEventType::Stopped => ComponentStatus::Stopped,
167        FfiLifecycleEventType::Error => ComponentStatus::Error,
168    }
169}
170
171/// Host log callback that routes plugin logs into the DrasiLib ComponentLogRegistry.
172///
173/// When `ctx` is non-null and points to a valid [`CallbackContext`], AND the
174/// FfiLogEntry carries a non-empty `instance_id` and `component_id`, logs are
175/// pushed into the registry with the correct composite key so they appear in
176/// the REST API's log streaming endpoints.
177/// # Safety
178/// `entry` must be a valid pointer to an `FfiLogEntry`. `ctx` may be null (logs
179/// are still forwarded to the host log framework), or must point to a valid
180/// `CallbackContext` for registry routing.
181///
182/// This function's signature matches `LogCallbackFn` (non-unsafe `extern "C"`).
183/// Raw pointer dereferences are guarded by `unsafe` blocks inside the body.
184#[allow(clippy::not_unsafe_ptr_arg_deref)]
185pub extern "C" fn default_log_callback(ctx: *mut c_void, entry: *const FfiLogEntry) {
186    // Wrap the entire body in catch_unwind: this is an extern "C" function called
187    // from plugin code, so any unwinding panic across the FFI boundary causes a
188    // non-unwinding abort. We must absorb panics here.
189    let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
190        let entry = unsafe { &*entry };
191        let plugin_id = unsafe { entry.plugin_id.to_string() };
192        let message = unsafe { entry.message.to_string() };
193        let instance_id = unsafe { entry.instance_id.to_string() };
194        let component_id = unsafe { entry.component_id.to_string() };
195        let level = entry.level;
196
197        // Always forward to host's log framework
198        log::log!(
199            ffi_log_level_to_std_level(level),
200            "[plugin:{}] {}",
201            if component_id.is_empty() {
202                &plugin_id
203            } else {
204                &component_id
205            },
206            message
207        );
208
209        // Always capture for diagnostics (use `ok()` to avoid panicking in extern "C"
210        // if the Mutex was poisoned by a prior test/thread panic — a panic here would
211        // be a non-unwinding abort since this is an extern "C" function)
212        if let Ok(mut logs) = captured_logs().lock() {
213            logs.push(CapturedLog {
214                level,
215                plugin_id: plugin_id.clone(),
216                message: message.clone(),
217            });
218        }
219
220        // Route into DrasiLib's ComponentLogRegistry if we have both context and instance info
221        if !ctx.is_null() && !instance_id.is_empty() && !component_id.is_empty() {
222            let context = unsafe { CallbackContext::from_raw_ref(ctx) };
223            let log_message = LogMessage::with_instance(
224                ffi_log_level_to_log_level(level),
225                message,
226                &instance_id,
227                &component_id,
228                ComponentType::Source, // TODO: parse from entry if available
229            );
230            let registry = context.log_registry.clone();
231            // try_log may panic from inside tokio's RwLock::try_write under
232            // certain race conditions; catch_unwind above absorbs that.
233            let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
234                registry.try_log(log_message);
235            }));
236        }
237    }));
238}
239
240/// Host lifecycle callback that routes plugin events into DrasiLib's ComponentEventHistory.
241/// # Safety
242/// `event` must be a valid pointer to an `FfiLifecycleEvent`. `ctx` may be null
243/// or must point to a valid `CallbackContext`.
244#[allow(clippy::not_unsafe_ptr_arg_deref)]
245pub extern "C" fn default_lifecycle_callback(ctx: *mut c_void, event: *const FfiLifecycleEvent) {
246    let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
247        let event = unsafe { &*event };
248        let component_id = unsafe { event.component_id.to_string() };
249        let component_type_str = unsafe { event.component_type.to_string() };
250        let message = unsafe { event.message.to_string() };
251        let event_type = event.event_type;
252
253        log::debug!("Lifecycle: {component_id} ({component_type_str}) {event_type:?} {message}");
254
255        // Always capture for diagnostics (use `ok()` to avoid panicking in extern "C")
256        if let Ok(mut events) = captured_lifecycles().lock() {
257            events.push(CapturedLifecycle {
258                component_id: component_id.clone(),
259                event_type,
260                message: message.clone(),
261            });
262        }
263
264        // Route into DrasiLib's ComponentEventHistory if context is available
265        if !ctx.is_null() {
266            let context = unsafe { CallbackContext::from_raw_ref(ctx) };
267            let component_type = parse_component_type(&component_type_str);
268            let status = ffi_lifecycle_to_component_status(event_type);
269
270            let component_event = ComponentEvent {
271                component_id,
272                component_type: component_type.clone(),
273                status,
274                timestamp: chrono::Utc::now(),
275                message: if message.is_empty() {
276                    None
277                } else {
278                    Some(message)
279                },
280            };
281
282            // Use try_write to avoid spawning async tasks that block the scheduler
283            let event_history = match component_type {
284                ComponentType::Reaction => context.reaction_event_history.clone(),
285                _ => context.source_event_history.clone(),
286            };
287            // try_write on tokio RwLock may panic with `unreachable!` under certain
288            // race conditions; absorb that panic to keep FFI safe.
289            let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
290                if let Ok(mut history) = event_history.try_write() {
291                    history.record_event(component_event);
292                }
293            }));
294        }
295    }));
296}
297
298/// Get the default log callback function pointer.
299pub fn default_log_callback_fn() -> LogCallbackFn {
300    default_log_callback
301}
302
303/// Get the default lifecycle callback function pointer.
304pub fn default_lifecycle_callback_fn() -> LifecycleCallbackFn {
305    default_lifecycle_callback
306}
307
308// ============================================================================
309// Per-instance callbacks (used by SourceProxy/ReactionProxy)
310// ============================================================================
311
312/// Per-instance log callback that routes logs using InstanceCallbackContext.
313///
314/// This callback is set during SourceProxy.initialize() via FfiRuntimeContext.
315/// It uses the `instance_id` and `component_id` from the FfiLogEntry (set by
316/// the plugin's TLS-aware FfiLogger) to construct the correct ComponentLogKey.
317/// # Safety
318/// `entry` must be a valid pointer to an `FfiLogEntry`. `ctx` may be null or
319/// must point to a valid `InstanceCallbackContext`.
320#[allow(clippy::not_unsafe_ptr_arg_deref)]
321pub extern "C" fn instance_log_callback(ctx: *mut c_void, entry: *const FfiLogEntry) {
322    let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
323        let entry = unsafe { &*entry };
324        let plugin_id = unsafe { entry.plugin_id.to_string() };
325        let message = unsafe { entry.message.to_string() };
326        let instance_id = unsafe { entry.instance_id.to_string() };
327        let component_id = unsafe { entry.component_id.to_string() };
328        let level = entry.level;
329
330        // Forward to host log framework
331        log::log!(
332            ffi_log_level_to_std_level(level),
333            "[plugin:{}] {}",
334            if component_id.is_empty() {
335                &plugin_id
336            } else {
337                &component_id
338            },
339            message
340        );
341
342        // Capture for diagnostics (use `ok()` to avoid panicking in extern "C")
343        if let Ok(mut logs) = captured_logs().lock() {
344            logs.push(CapturedLog {
345                level,
346                plugin_id: plugin_id.clone(),
347                message: message.clone(),
348            });
349        }
350
351        // Route into ComponentLogRegistry
352        if !ctx.is_null() {
353            let context = unsafe { InstanceCallbackContext::from_raw_ref(ctx) };
354            // Use instance_id/component_id from the log entry (set by TLS in plugin)
355            // Fall back to context's instance_id if entry doesn't have them
356            let log_instance_id = if instance_id.is_empty() {
357                &context.instance_id
358            } else {
359                &instance_id
360            };
361            let log_component_id = if component_id.is_empty() {
362                &plugin_id
363            } else {
364                &component_id
365            };
366            let log_message = LogMessage::with_instance(
367                ffi_log_level_to_log_level(level),
368                message,
369                log_instance_id,
370                log_component_id,
371                ComponentType::Source,
372            );
373            let registry = context.log_registry.clone();
374            // Use try_log (non-blocking) to avoid spawning async tasks that can
375            // block the current_thread scheduler during drop sequences.
376            // try_log internally calls tokio's RwLock::try_write which can panic
377            // with `unreachable!` under certain races; absorb that panic.
378            let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
379                registry.try_log(log_message);
380            }));
381        }
382    }));
383}
384
385/// Per-instance lifecycle callback that sends events through the SourceManager's
386/// event channel, so they flow through the same path as static source events.
387/// # Safety
388/// `event` must be a valid pointer to an `FfiLifecycleEvent`. `ctx` may be null
389/// or must point to a valid `InstanceCallbackContext`.
390#[allow(clippy::not_unsafe_ptr_arg_deref)]
391pub extern "C" fn instance_lifecycle_callback(ctx: *mut c_void, event: *const FfiLifecycleEvent) {
392    let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
393        let event = unsafe { &*event };
394        let component_id = unsafe { event.component_id.to_string() };
395        let component_type_str = unsafe { event.component_type.to_string() };
396        let message = unsafe { event.message.to_string() };
397        let event_type = event.event_type;
398
399        log::debug!(
400            "Lifecycle [instance]: {component_id} ({component_type_str}) {event_type:?} {message}"
401        );
402
403        // Capture for diagnostics (use `ok()` to avoid panicking in extern "C")
404        if let Ok(mut events) = captured_lifecycles().lock() {
405            events.push(CapturedLifecycle {
406                component_id: component_id.clone(),
407                event_type,
408                message: message.clone(),
409            });
410        }
411
412        // Send through the component graph update channel
413        if !ctx.is_null() {
414            let context = unsafe { InstanceCallbackContext::from_raw_ref(ctx) };
415            let status = ffi_lifecycle_to_component_status(event_type);
416
417            let update = ComponentUpdate::Status {
418                component_id,
419                status,
420                message: if message.is_empty() {
421                    None
422                } else {
423                    Some(message)
424                },
425            };
426
427            let tx = context.update_tx.clone();
428            // Use try_send to avoid spawning an async task that may block
429            // the host runtime's current_thread scheduler during drop sequences.
430            if let Err(e) = tx.try_send(update) {
431                log::error!("Failed to send lifecycle event: {e}");
432            }
433        }
434    }));
435}
436
437/// Get the per-instance log callback function pointer.
438pub fn instance_log_callback_fn() -> LogCallbackFn {
439    instance_log_callback
440}
441
442/// Get the per-instance lifecycle callback function pointer.
443pub fn instance_lifecycle_callback_fn() -> LifecycleCallbackFn {
444    instance_lifecycle_callback
445}