drasi-host-sdk 0.6.2

Host-side SDK for loading and interacting with Drasi cdylib plugins
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
// Copyright 2025 The Drasi Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Host-side proxy for Reaction and ReactionPluginDescriptor.

use std::collections::HashMap;
use std::ffi::c_void;
use std::sync::Arc;

use async_trait::async_trait;

use drasi_lib::reactions::Reaction;
use drasi_lib::{ComponentStatus, ReactionRuntimeContext};
use drasi_plugin_sdk::descriptor::ReactionPluginDescriptor;
use drasi_plugin_sdk::ffi::{
    FfiComponentStatus, FfiRuntimeContext, FfiStr, ReactionPluginVtable, ReactionVtable,
};
use libloading::Library;

use crate::state_store_bridge::StateStoreVtableBuilder;

/// Wraps a `ReactionVtable` into a DrasiLib `Reaction` trait implementation.
pub struct ReactionProxy {
    vtable: ReactionVtable,
    _library: Arc<Library>,
    cached_id: String,
    cached_type_name: String,
    /// Per-instance callback context for plugin-emitted log/lifecycle callbacks.
    ///
    /// Stored as an `Arc` whose strong count was bumped by `Arc::into_raw` when
    /// the raw pointer was handed to the plugin. The host's `Arc` is kept here
    /// so the proxy holds at least two strong references; on Drop the host's
    /// `Arc` is `mem::forget`-ed unconditionally so any **late** log/lifecycle
    /// callback emitted by the plugin (after `stop()` returns) still finds a
    /// valid pointer. The cdylib itself is intentionally leaked process-wide
    /// (see `host-sdk/src/loader.rs`), so the small per-instance `Arc` leak is
    /// acceptable in exchange for closing the late-callback UAF window.
    _callback_ctx: std::sync::Mutex<Option<Arc<crate::callbacks::InstanceCallbackContext>>>,
    /// Channel for push-based result delivery. Created on start, closed on stop/drop.
    result_tx:
        std::sync::Mutex<Option<std::sync::mpsc::SyncSender<drasi_lib::channels::QueryResult>>>,
    /// Keep the callback context alive for the lifetime of the forwarder.
    _push_ctx: std::sync::Mutex<Option<Arc<ResultPushContext>>>,
}

/// Context for the push-based result callback.
struct ResultPushContext {
    rx: std::sync::Mutex<Option<std::sync::mpsc::Receiver<drasi_lib::channels::QueryResult>>>,
    /// Signaled when the plugin-side forwarder task has fully exited its loop
    /// and will no longer access the `ReactionWrapper`. The forwarder signals
    /// this by calling the callback one final time with the sentinel parameter,
    /// AFTER breaking out of its processing loop.
    forwarder_done: std::sync::Mutex<bool>,
    forwarder_done_cv: std::sync::Condvar,
}

fn signal_forwarder_done(context: &ResultPushContext) {
    if let Ok(mut done) = context.forwarder_done.lock() {
        *done = true;
        context.forwarder_done_cv.notify_all();
    }
}

/// Callback invoked by the plugin's forwarder task to receive the next QueryResult.
/// Blocks until a result is available. Returns null on channel close (shutdown).
///
/// The `sentinel` parameter serves dual purpose:
/// - `null`: Normal mode — block on recv() and return the next QueryResult.
/// - Non-null: **Forwarder-exit sentinel** — the forwarder has fully exited its
///   processing loop and will not access the ReactionWrapper again. Signal
///   `forwarder_done` so the host can safely free the wrapper.
///
/// Wrapped in `catch_unwind` because this is `extern "C"` — panics unwinding
/// across the FFI boundary are undefined behavior.
extern "C" fn result_push_callback(ctx: *mut c_void, sentinel: *mut c_void) -> *mut c_void {
    std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
        result_push_callback_inner(ctx, sentinel)
    }))
    .unwrap_or_else(|_| {
        // On panic, signal done so drop() doesn't deadlock
        let context = unsafe { &*(ctx as *const ResultPushContext) };
        signal_forwarder_done(context);
        std::ptr::null_mut()
    })
}

fn result_push_callback_inner(ctx: *mut c_void, sentinel: *mut c_void) -> *mut c_void {
    let context = unsafe { &*(ctx as *const ResultPushContext) };

    // Sentinel call: the forwarder task has exited its loop and will not
    // access the ReactionWrapper again.  Signal completion so Drop can
    // safely call drop_fn.
    if !sentinel.is_null() {
        signal_forwarder_done(context);
        return std::ptr::null_mut();
    }

    let guard = context
        .rx
        .lock()
        .expect("result_push_callback lock poisoned");
    if let Some(ref rx) = *guard {
        match rx.recv() {
            Ok(result) => Box::into_raw(Box::new(result)) as *mut c_void,
            Err(_) => {
                // Channel closed — return null so the forwarder breaks.
                // Do NOT signal forwarder_done here; the forwarder will
                // send a sentinel callback after it has fully exited.
                std::ptr::null_mut()
            }
        }
    } else {
        // rx already taken — return null (forwarder will send sentinel after exiting)
        std::ptr::null_mut()
    }
}

unsafe impl Send for ReactionProxy {}
unsafe impl Sync for ReactionProxy {}

impl ReactionProxy {
    pub fn new(vtable: ReactionVtable, library: Arc<Library>) -> Self {
        let cached_id = unsafe { (vtable.id_fn)(vtable.state as *const c_void).to_string() };
        let cached_type_name =
            unsafe { (vtable.type_name_fn)(vtable.state as *const c_void).to_string() };
        Self {
            vtable,
            _library: library,
            cached_id,
            cached_type_name,
            _callback_ctx: std::sync::Mutex::new(None),
            result_tx: std::sync::Mutex::new(None),
            _push_ctx: std::sync::Mutex::new(None),
        }
    }
}

#[async_trait]
impl Reaction for ReactionProxy {
    fn id(&self) -> &str {
        &self.cached_id
    }

    fn type_name(&self) -> &str {
        &self.cached_type_name
    }

    fn properties(&self) -> HashMap<String, serde_json::Value> {
        let owned = (self.vtable.properties_fn)(self.vtable.state as *const c_void);
        let json_str = unsafe { owned.into_string() };
        match serde_json::from_str(&json_str) {
            Ok(props) => props,
            Err(e) => {
                log::warn!(
                    "Failed to parse plugin properties for '{}': {e}",
                    self.cached_id
                );
                HashMap::new()
            }
        }
    }

    fn query_ids(&self) -> Vec<String> {
        let arr = (self.vtable.query_ids_fn)(self.vtable.state as *const c_void);

        unsafe { arr.into_vec() }
    }

    fn auto_start(&self) -> bool {
        (self.vtable.auto_start_fn)(self.vtable.state as *const c_void)
    }

    async fn initialize(&self, context: ReactionRuntimeContext) {
        let state_store_vtable = context
            .state_store
            .as_ref()
            .map(|ss| StateStoreVtableBuilder::build(ss.clone()));

        let instance_id_str = context.instance_id.clone();
        let component_id_str = context.reaction_id.clone();

        let instance_id_ffi = FfiStr::from_str(&instance_id_str);
        let component_id_ffi = FfiStr::from_str(&component_id_str);

        let ss_ptr = state_store_vtable
            .map(|v| Box::into_raw(Box::new(v)) as *const _)
            .unwrap_or(std::ptr::null());

        // Create per-instance callback context for this reaction
        let per_instance_ctx = Arc::new(crate::callbacks::InstanceCallbackContext {
            instance_id: instance_id_str.clone(),
            runtime_handle: tokio::runtime::Handle::current(),
            log_registry: drasi_lib::managers::get_or_init_global_registry(),
            update_tx: context.update_tx.clone(),
        });

        // Bug C fix: hand the plugin a strong reference (Arc::into_raw bumps
        // the refcount) so log/lifecycle callbacks emitted late by the plugin
        // (e.g. from inside stop_fn or from internal tasks shutting down) do
        // not deref freed memory. The matching `mem::forget` happens in Drop
        // and intentionally leaks one strong ref per instance.
        let ctx_for_plugin = per_instance_ctx.clone();
        let ctx_ptr = Arc::into_raw(ctx_for_plugin) as *mut c_void;

        if let Ok(mut guard) = self._callback_ctx.lock() {
            *guard = Some(per_instance_ctx);
        }

        let identity_vtable = context
            .identity_provider
            .as_ref()
            .map(|ip| crate::identity_bridge::IdentityProviderVtableBuilder::build(ip.clone()));

        let ip_ptr = identity_vtable
            .map(|v| Box::into_raw(Box::new(v)) as *const _)
            .unwrap_or(std::ptr::null());

        let ffi_ctx = FfiRuntimeContext {
            instance_id: instance_id_ffi,
            component_id: component_id_ffi,
            state_store: ss_ptr,
            identity_provider: ip_ptr,
            log_callback: Some(crate::callbacks::instance_log_callback),
            log_ctx: ctx_ptr,
            lifecycle_callback: Some(crate::callbacks::instance_lifecycle_callback),
            lifecycle_ctx: ctx_ptr,
        };

        (self.vtable.initialize_fn)(self.vtable.state, &ffi_ctx as *const FfiRuntimeContext);
    }

    async fn start(&self) -> anyhow::Result<()> {
        // Set up push-based result channel before starting the reaction
        let (tx, rx) = std::sync::mpsc::sync_channel::<drasi_lib::channels::QueryResult>(256);
        {
            let mut guard = self.result_tx.lock().expect("result_tx lock poisoned");
            *guard = Some(tx);
        }

        let push_ctx = Arc::new(ResultPushContext {
            rx: std::sync::Mutex::new(Some(rx)),
            forwarder_done: std::sync::Mutex::new(false),
            forwarder_done_cv: std::sync::Condvar::new(),
        });
        // Use Arc::as_ptr — the Arc stays alive in _push_ctx for the lifetime of the proxy
        let ctx_ptr = Arc::as_ptr(&push_ctx) as *mut c_void;
        {
            let mut guard = self._push_ctx.lock().expect("_push_ctx lock poisoned");
            *guard = Some(push_ctx);
        }

        // Start the plugin's forwarder task
        (self.vtable.start_result_push_fn)(self.vtable.state, result_push_callback, ctx_ptr);

        // Start the reaction itself
        let state = drasi_plugin_sdk::ffi::SendMutPtr(self.vtable.state);
        let start_fn = self.vtable.start_fn;
        let result = std::thread::spawn(move || (start_fn)(state.as_ptr()))
            .join()
            .map_err(|_| anyhow::anyhow!("Thread panicked"))?;
        unsafe { result.into_result().map_err(|e| anyhow::anyhow!(e)) }
    }

    async fn stop(&self) -> anyhow::Result<()> {
        // Bug B fix: close ONLY the sender. Dropping the sender is sufficient
        // to unblock the forwarder's `rx.recv()` (it returns Err, the callback
        // returns null, and the forwarder breaks). Do NOT also drop the
        // receiver here — the callback may still be holding `context.rx.lock()`
        // for a recv() that is racing this stop, and removing the receiver
        // mid-flight creates a race against the forwarder's in-flight
        // `enqueue_query_result(qr).await` against the reaction's
        // shutting-down priority queue.
        {
            let mut guard = self.result_tx.lock().expect("result_tx lock poisoned");
            *guard = None;
        }

        let state = drasi_plugin_sdk::ffi::SendMutPtr(self.vtable.state);
        let stop_fn = self.vtable.stop_fn;
        let result = std::thread::spawn(move || (stop_fn)(state.as_ptr()))
            .join()
            .map_err(|_| anyhow::anyhow!("Thread panicked"))?;
        let r = unsafe { result.into_result().map_err(|e| anyhow::anyhow!(e)) };
        r
    }

    async fn status(&self) -> ComponentStatus {
        let s = (self.vtable.status_fn)(self.vtable.state as *const c_void);
        match s {
            FfiComponentStatus::Starting => ComponentStatus::Starting,
            FfiComponentStatus::Running => ComponentStatus::Running,
            FfiComponentStatus::Stopping => ComponentStatus::Stopping,
            FfiComponentStatus::Stopped => ComponentStatus::Stopped,
            FfiComponentStatus::Reconfiguring => ComponentStatus::Reconfiguring,
            FfiComponentStatus::Error => ComponentStatus::Error,
            FfiComponentStatus::Added => ComponentStatus::Added,
            FfiComponentStatus::Removed => ComponentStatus::Removed,
        }
    }

    async fn enqueue_query_result(
        &self,
        result: drasi_lib::channels::QueryResult,
    ) -> anyhow::Result<()> {
        let guard = self.result_tx.lock().expect("result_tx lock poisoned");
        if let Some(ref tx) = *guard {
            tx.send(result)
                .map_err(|_| anyhow::anyhow!("Result channel closed"))?;
        } else {
            return Err(anyhow::anyhow!(
                "Reaction not started — result channel not initialized"
            ));
        }
        Ok(())
    }

    async fn deprovision(&self) -> anyhow::Result<()> {
        let state = drasi_plugin_sdk::ffi::SendMutPtr(self.vtable.state);
        let deprovision_fn = self.vtable.deprovision_fn;
        let result = std::thread::spawn(move || (deprovision_fn)(state.as_ptr()))
            .join()
            .map_err(|_| anyhow::anyhow!("Thread panicked"))?;
        unsafe { result.into_result().map_err(|e| anyhow::anyhow!(e)) }
    }
}

impl Drop for ReactionProxy {
    fn drop(&mut self) {
        // Close the result channel sender to unblock the forwarder's callback.
        // The callback's rx.recv() will return Err, causing it to return null.
        // The forwarder then breaks out of its loop and sends a sentinel
        // callback to signal forwarder_done.
        if let Ok(mut guard) = self.result_tx.lock() {
            *guard = None;
        }
        // Bug B fix: do NOT drop the receiver here. Sender drop alone unblocks
        // recv(); leaving the receiver in place avoids racing a callback that
        // is currently holding `context.rx.lock()`. The receiver lives until
        // the leaked push-ctx Arc is collected (see the `mem::forget` below).

        // Wait for the forwarder task to fully exit its processing loop.
        //
        // Safety argument: the forwarder sends a sentinel callback AFTER
        // breaking out of its loop. At that point, all enqueue_query_result()
        // calls have finished and the forwarder will NOT access the
        // ReactionWrapper again. Therefore, after this signal fires,
        // it is safe to free the ReactionWrapper.
        let forwarder_exited = if let Ok(guard) = self._push_ctx.lock() {
            if let Some(ref ctx) = *guard {
                let done = ctx.forwarder_done.lock().expect("forwarder_done lock");
                let (guard, timeout) = ctx
                    .forwarder_done_cv
                    .wait_timeout_while(done, std::time::Duration::from_secs(5), |done| !*done)
                    .expect("forwarder_done condvar wait");
                !timeout.timed_out() && *guard
            } else {
                true // No push context → forwarder was never started
            }
        } else {
            false // Lock poisoned
        };

        if forwarder_exited {
            // Safe to free the ReactionWrapper — forwarder won't access it.
            let drop_fn = self.vtable.drop_fn;
            let state = drasi_plugin_sdk::ffi::SendMutPtr(self.vtable.state);
            let _ = std::thread::spawn(move || (drop_fn)(state.as_ptr())).join();
        } else {
            // Timeout or error — leak the ReactionWrapper to prevent UAF.
            // Memory leak is preferable to undefined behavior.
            log::warn!(
                "ReactionProxy::drop: forwarder did not exit within timeout; \
                 leaking ReactionWrapper to prevent use-after-free"
            );
        }

        // Leak the push context Arc on the timeout path — the forwarder's
        // spawn_blocking callback may still reference it. On the success path
        // this is unnecessary but harmless, and keeps the logic simple.
        if let Ok(mut guard) = self._push_ctx.lock() {
            if let Some(ctx) = guard.take() {
                std::mem::forget(ctx);
            }
        }

        // Bug C fix: leak the per-instance callback context Arc unconditionally.
        // The strong reference handed to the plugin via `Arc::into_raw` in
        // initialize() is never reclaimed — late log/lifecycle callbacks
        // emitted by the plugin (during stop_fn or from internal tasks) must
        // still find a valid pointer. The cdylib itself is intentionally
        // leaked process-wide (see host-sdk/src/loader.rs), so this small
        // per-instance Arc leak is the price of closing the late-callback
        // UAF window.
        if let Ok(mut guard) = self._callback_ctx.lock() {
            if let Some(ctx) = guard.take() {
                std::mem::forget(ctx);
            }
        }
    }
}

// ============================================================================
// ReactionPluginProxy — wraps ReactionPluginVtable into ReactionPluginDescriptor
// ============================================================================

/// Wraps a `ReactionPluginVtable` (factory) into a `ReactionPluginDescriptor`.
pub struct ReactionPluginProxy {
    vtable: ReactionPluginVtable,
    library: Arc<Library>,
    cached_kind: String,
    cached_config_version: String,
    cached_config_schema_name: String,
    plugin_id: String,
}

unsafe impl Send for ReactionPluginProxy {}
unsafe impl Sync for ReactionPluginProxy {}

impl ReactionPluginProxy {
    pub fn new(vtable: ReactionPluginVtable, library: Arc<Library>) -> Self {
        let cached_kind = unsafe { (vtable.kind_fn)(vtable.state as *const c_void).to_string() };
        let cached_config_version =
            unsafe { (vtable.config_version_fn)(vtable.state as *const c_void).to_string() };
        let cached_config_schema_name =
            unsafe { (vtable.config_schema_name_fn)(vtable.state as *const c_void).to_string() };
        Self {
            vtable,
            library,
            cached_kind,
            cached_config_version,
            cached_config_schema_name,
            plugin_id: String::new(),
        }
    }

    /// The unique identifier of the plugin that provided this descriptor.
    pub fn plugin_id(&self) -> &str {
        &self.plugin_id
    }

    /// Set the plugin identity for this descriptor.
    pub fn set_plugin_id(&mut self, id: String) {
        self.plugin_id = id;
    }
}

#[async_trait]
impl ReactionPluginDescriptor for ReactionPluginProxy {
    fn kind(&self) -> &str {
        &self.cached_kind
    }

    fn config_version(&self) -> &str {
        &self.cached_config_version
    }

    fn config_schema_json(&self) -> String {
        unsafe {
            (self.vtable.config_schema_json_fn)(self.vtable.state as *const c_void).into_string()
        }
    }

    fn config_schema_name(&self) -> &str {
        &self.cached_config_schema_name
    }

    async fn create_reaction(
        &self,
        id: &str,
        query_ids: Vec<String>,
        config_json: &serde_json::Value,
        auto_start: bool,
    ) -> anyhow::Result<Box<dyn Reaction>> {
        let config_str = serde_json::to_string(config_json)?;
        let query_ids_str = serde_json::to_string(&query_ids)?;
        let id_ffi = FfiStr::from_str(id);
        let query_ids_ffi = FfiStr::from_str(&query_ids_str);
        let config_ffi = FfiStr::from_str(&config_str);

        let state = self.vtable.state;
        let create_fn = self.vtable.create_reaction_fn;
        let result = (create_fn)(state, id_ffi, query_ids_ffi, config_ffi, auto_start);

        let vtable_ptr = unsafe {
            result
                .into_result::<ReactionVtable>()
                .map_err(|msg| anyhow::anyhow!("{msg}"))?
        };

        if vtable_ptr.is_null() {
            return Err(anyhow::anyhow!(
                "Plugin factory returned null for reaction '{id}'"
            ));
        }

        let vtable = unsafe { *Box::from_raw(vtable_ptr) };
        Ok(Box::new(ReactionProxy::new(vtable, self.library.clone())))
    }
}

impl Drop for ReactionPluginProxy {
    fn drop(&mut self) {
        let drop_fn = self.vtable.drop_fn;
        let state = drasi_plugin_sdk::ffi::SendMutPtr(self.vtable.state);
        let _ = std::thread::spawn(move || (drop_fn)(state.as_ptr())).join();
    }
}