Skip to main content

nautilus_plugin/bridge/
actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Host-side adapter that wraps a plug-in actor cdylib as a [`DataActor`].
17//!
18//! Owns the plug-in's opaque handle plus a pointer to its static
19//! [`crate::surfaces::actor::ActorVTable`] and forwards every callback
20//! the surface ships in v1 through the vtable. The live engine sees a normal
21//! `DataActor`; the plug-in never crosses the FFI boundary except as the typed
22//! event payload pointer the engine already has from the cache.
23
24#![allow(unsafe_code)]
25#![allow(
26    clippy::multiple_unsafe_ops_per_block,
27    reason = "vtable deref and FFI call form a single boundary callback; \
28              SAFETY comments cover both ops together"
29)]
30
31use std::{
32    any::Any,
33    fmt::Debug,
34    panic::{AssertUnwindSafe, catch_unwind},
35};
36
37use nautilus_common::{
38    actor::{DataActor, DataActorConfig, DataActorCore},
39    nautilus_actor,
40    signal::Signal,
41    timer::TimeEvent,
42};
43use nautilus_model::{
44    data::{
45        Bar, CustomData, FundingRateUpdate, IndexPriceUpdate, InstrumentClose, InstrumentStatus,
46        MarkPriceUpdate, OptionChainSlice, OptionGreeks, OrderBookDelta, OrderBookDeltas,
47        OrderBookDepth10, QuoteTick, TradeTick,
48    },
49    events::{OrderCanceled, OrderFilled},
50    identifiers::ActorId,
51    instruments::InstrumentAny,
52    orderbook::OrderBook,
53};
54
55use crate::{
56    boundary::{BorrowedStr, PluginResult, Slice},
57    bridge::{
58        custom_data::{try_custom_data_boundary_ref, try_historical_custom_data_boundary_ref},
59        registry::{HostContextInner, drop_host_context, leak_host_context},
60    },
61    host::{HostContext, HostVTable},
62    manifest::ValidatedActorVTable,
63    surfaces::{
64        actor::PluginActorHandle,
65        book::{OrderBookDeltasHandle, OrderBookHandle},
66        custom_data::PluginCustomDataRef,
67        instrument::InstrumentAnyHandle,
68        option_chain::OptionChainSliceHandle,
69    },
70};
71
72/// Adapts a plug-in actor (vtable + handle from a cdylib) into a host-side
73/// [`DataActor`] the live node can register and dispatch into.
74pub struct PluginActorAdapter {
75    core: DataActorCore,
76    plugin_name: String,
77    type_name: String,
78    vtable: ValidatedActorVTable,
79    handle: *mut PluginActorHandle,
80    ctx: *const HostContext,
81}
82
83// SAFETY: the adapter owns the plug-in handle exclusively and never aliases
84// it across threads. The vtable pointer is process-lifetime static. The
85// engine drives the adapter from a single trader thread; the bound is only
86// required to satisfy `DataActor: 'static + Send` blanket bounds.
87unsafe impl Send for PluginActorAdapter {}
88
89impl Debug for PluginActorAdapter {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        f.debug_struct(stringify!(PluginActorAdapter))
92            .field("plugin_name", &self.plugin_name)
93            .field("type_name", &self.type_name)
94            .field("actor_id", &self.core.actor_id())
95            .finish()
96    }
97}
98
99impl PluginActorAdapter {
100    /// Constructs a new adapter by calling the plug-in's `create` thunk.
101    ///
102    /// `host` must be the same vtable pointer the host handed the plug-in at
103    /// load time. `actor_id` becomes the adapter's identity in the actor
104    /// registry. `config_json` is forwarded verbatim to the plug-in's
105    /// `PluginActor::new` so the cdylib can read instance-specific config.
106    ///
107    /// # Errors
108    ///
109    /// Returns an error if the plug-in's `create` thunk returns a null handle.
110    ///
111    /// # Safety
112    ///
113    /// `host` must be the same vtable pointer the host registered with the
114    /// plug-in at load time.
115    pub unsafe fn new(
116        actor_id: ActorId,
117        plugin_name: impl Into<String>,
118        type_name: impl Into<String>,
119        vtable: ValidatedActorVTable,
120        host: *const HostVTable,
121        config_json: &str,
122    ) -> anyhow::Result<Self> {
123        let plugin_name = plugin_name.into();
124        let type_name = type_name.into();
125        // SAFETY: vtable comes from a validated manifest entry.
126        let create = unsafe { validated_slot!(ActorVTable, vtable.as_ptr(), create) };
127
128        let ctx = leak_host_context(HostContextInner {
129            actor_id,
130            is_strategy: false,
131        });
132
133        let cfg = BorrowedStr::from_str(config_json);
134        // SAFETY: vtable is non-null, host outlives the adapter, ctx + cfg
135        // are live across the call.
136        let handle = guard_call(&plugin_name, &type_name, "create", || unsafe {
137            create(host, ctx, cfg)
138        })
139        .ok_or_else(|| {
140            // SAFETY: ctx came from leak_host_context above.
141            unsafe { drop_host_context(ctx) };
142            anyhow::anyhow!("plug-in actor '{type_name}' panicked in create")
143        })?;
144
145        if handle.is_null() {
146            // SAFETY: ctx came from leak_host_context above.
147            unsafe { drop_host_context(ctx) };
148            anyhow::bail!("plug-in actor '{type_name}' returned a null handle from create");
149        }
150
151        let core = DataActorCore::new(DataActorConfig {
152            actor_id: Some(actor_id),
153            log_events: true,
154            log_commands: true,
155        });
156
157        Ok(Self {
158            core,
159            plugin_name,
160            type_name,
161            vtable,
162            handle,
163            ctx,
164        })
165    }
166
167    /// Returns the canonical type name reported by the plug-in.
168    #[must_use]
169    pub fn type_name(&self) -> &str {
170        &self.type_name
171    }
172
173    /// Returns the plug-in name (manifest `name`) the adapter wraps.
174    #[must_use]
175    pub fn plugin_name(&self) -> &str {
176        &self.plugin_name
177    }
178}
179
180impl Drop for PluginActorAdapter {
181    fn drop(&mut self) {
182        if !self.handle.is_null() {
183            let _ = catch_unwind(AssertUnwindSafe(|| {
184                // SAFETY: vtable + handle are live; drop_handle ignores null.
185                unsafe {
186                    validated_slot!(ActorVTable, self.vtable.as_ptr(), drop_handle)(self.handle);
187                };
188            }));
189            self.handle = std::ptr::null_mut();
190        }
191        // SAFETY: ctx originated from leak_host_context in `new`.
192        unsafe { drop_host_context(self.ctx) };
193        self.ctx = std::ptr::null();
194    }
195}
196
197nautilus_actor!(PluginActorAdapter);
198
199impl DataActor for PluginActorAdapter {
200    fn on_start(&mut self) -> anyhow::Result<()> {
201        invoke_lifecycle(self, "on_start", |adapter| unsafe {
202            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_start)(adapter.handle)
203        })
204    }
205
206    fn on_stop(&mut self) -> anyhow::Result<()> {
207        invoke_lifecycle(self, "on_stop", |adapter| unsafe {
208            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_stop)(adapter.handle)
209        })
210    }
211
212    fn on_resume(&mut self) -> anyhow::Result<()> {
213        invoke_lifecycle(self, "on_resume", |adapter| unsafe {
214            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_resume)(adapter.handle)
215        })
216    }
217
218    fn on_reset(&mut self) -> anyhow::Result<()> {
219        invoke_lifecycle(self, "on_reset", |adapter| unsafe {
220            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_reset)(adapter.handle)
221        })
222    }
223
224    fn on_dispose(&mut self) -> anyhow::Result<()> {
225        invoke_lifecycle(self, "on_dispose", |adapter| unsafe {
226            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_dispose)(adapter.handle)
227        })
228    }
229
230    fn on_degrade(&mut self) -> anyhow::Result<()> {
231        invoke_lifecycle(self, "on_degrade", |adapter| unsafe {
232            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_degrade)(adapter.handle)
233        })
234    }
235
236    fn on_fault(&mut self) -> anyhow::Result<()> {
237        invoke_lifecycle(self, "on_fault", |adapter| unsafe {
238            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_fault)(adapter.handle)
239        })
240    }
241
242    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
243        invoke_event(self, "on_time_event", event, |adapter, p| unsafe {
244            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_time_event)(adapter.handle, p)
245        })
246    }
247
248    fn on_data(&mut self, data: &CustomData) -> anyhow::Result<()> {
249        let Some(data_ref) = try_custom_data_boundary_ref(data) else {
250            return Ok(());
251        };
252        invoke_custom_data(self, "on_data", data_ref, |adapter, value| unsafe {
253            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_data)(adapter.handle, value)
254        })
255    }
256
257    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
258        let handle = InstrumentAnyHandle::new(instrument.clone());
259        invoke_event(self, "on_instrument", &handle, |adapter, p| unsafe {
260            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_instrument)(adapter.handle, p)
261        })
262    }
263
264    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
265        let handle = OrderBookDeltasHandle::new(deltas.clone());
266        invoke_event(self, "on_book_deltas", &handle, |adapter, p| unsafe {
267            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_book_deltas)(adapter.handle, p)
268        })
269    }
270
271    fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
272        let handle = OrderBookHandle::new(book.clone());
273        invoke_event(self, "on_book", &handle, |adapter, p| unsafe {
274            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_book)(adapter.handle, p)
275        })
276    }
277
278    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
279        invoke_event(self, "on_quote", quote, |adapter, p| unsafe {
280            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_quote)(adapter.handle, p)
281        })
282    }
283
284    fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
285        invoke_event(self, "on_trade", trade, |adapter, p| unsafe {
286            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_trade)(adapter.handle, p)
287        })
288    }
289
290    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
291        invoke_event(self, "on_bar", bar, |adapter, p| unsafe {
292            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_bar)(adapter.handle, p)
293        })
294    }
295
296    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
297        invoke_event(self, "on_mark_price", mark_price, |adapter, p| unsafe {
298            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_mark_price)(adapter.handle, p)
299        })
300    }
301
302    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
303        invoke_event(self, "on_index_price", index_price, |adapter, p| unsafe {
304            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_index_price)(adapter.handle, p)
305        })
306    }
307
308    fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
309        invoke_event(self, "on_funding_rate", funding_rate, |adapter, p| unsafe {
310            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_funding_rate)(
311                adapter.handle,
312                p,
313            )
314        })
315    }
316
317    fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
318        invoke_event(self, "on_option_greeks", greeks, |adapter, p| unsafe {
319            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_option_greeks)(
320                adapter.handle,
321                p,
322            )
323        })
324    }
325
326    fn on_option_chain(&mut self, chain: &OptionChainSlice) -> anyhow::Result<()> {
327        let handle = OptionChainSliceHandle::new(chain.clone());
328        invoke_event(self, "on_option_chain", &handle, |adapter, p| unsafe {
329            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_option_chain)(
330                adapter.handle,
331                p,
332            )
333        })
334    }
335
336    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
337        invoke_event(self, "on_instrument_status", data, |adapter, p| unsafe {
338            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_instrument_status)(
339                adapter.handle,
340                p,
341            )
342        })
343    }
344
345    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
346        invoke_event(self, "on_instrument_close", update, |adapter, p| unsafe {
347            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_instrument_close)(
348                adapter.handle,
349                p,
350            )
351        })
352    }
353
354    fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
355        invoke_event(self, "on_order_filled", event, |adapter, p| unsafe {
356            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_order_filled)(
357                adapter.handle,
358                p,
359            )
360        })
361    }
362
363    fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
364        invoke_event(self, "on_order_canceled", event, |adapter, p| unsafe {
365            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_order_canceled)(
366                adapter.handle,
367                p,
368            )
369        })
370    }
371
372    fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
373        invoke_event(self, "on_signal", signal, |adapter, p| unsafe {
374            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_signal)(adapter.handle, p)
375        })
376    }
377
378    fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
379        let Some(data_ref) = try_historical_custom_data_boundary_ref(data) else {
380            return Ok(());
381        };
382        invoke_custom_data(
383            self,
384            "on_historical_data",
385            data_ref,
386            |adapter, value| unsafe {
387                validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_data)(
388                    adapter.handle,
389                    value,
390                )
391            },
392        )
393    }
394
395    fn on_historical_book_deltas(&mut self, deltas: &[OrderBookDelta]) -> anyhow::Result<()> {
396        invoke_slice(
397            self,
398            "on_historical_book_deltas",
399            deltas,
400            |adapter, s| unsafe {
401                validated_slot!(
402                    ActorVTable,
403                    adapter.vtable.as_ptr(),
404                    on_historical_book_deltas
405                )(adapter.handle, s)
406            },
407        )
408    }
409
410    fn on_historical_book_depth(&mut self, depths: &[OrderBookDepth10]) -> anyhow::Result<()> {
411        invoke_slice(
412            self,
413            "on_historical_book_depth",
414            depths,
415            |adapter, s| unsafe {
416                validated_slot!(
417                    ActorVTable,
418                    adapter.vtable.as_ptr(),
419                    on_historical_book_depth
420                )(adapter.handle, s)
421            },
422        )
423    }
424
425    fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
426        invoke_slice(self, "on_historical_quotes", quotes, |adapter, s| unsafe {
427            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_historical_quotes)(
428                adapter.handle,
429                s,
430            )
431        })
432    }
433
434    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
435        invoke_slice(self, "on_historical_trades", trades, |adapter, s| unsafe {
436            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_historical_trades)(
437                adapter.handle,
438                s,
439            )
440        })
441    }
442
443    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
444        invoke_slice(self, "on_historical_bars", bars, |adapter, s| unsafe {
445            validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_historical_bars)(
446                adapter.handle,
447                s,
448            )
449        })
450    }
451
452    fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
453        invoke_slice(
454            self,
455            "on_historical_mark_prices",
456            mark_prices,
457            |adapter, s| unsafe {
458                validated_slot!(
459                    ActorVTable,
460                    adapter.vtable.as_ptr(),
461                    on_historical_mark_prices
462                )(adapter.handle, s)
463            },
464        )
465    }
466
467    fn on_historical_index_prices(
468        &mut self,
469        index_prices: &[IndexPriceUpdate],
470    ) -> anyhow::Result<()> {
471        invoke_slice(
472            self,
473            "on_historical_index_prices",
474            index_prices,
475            |adapter, s| unsafe {
476                validated_slot!(
477                    ActorVTable,
478                    adapter.vtable.as_ptr(),
479                    on_historical_index_prices
480                )(adapter.handle, s)
481            },
482        )
483    }
484
485    fn on_historical_funding_rates(
486        &mut self,
487        funding_rates: &[FundingRateUpdate],
488    ) -> anyhow::Result<()> {
489        invoke_slice(
490            self,
491            "on_historical_funding_rates",
492            funding_rates,
493            |adapter, s| unsafe {
494                validated_slot!(
495                    ActorVTable,
496                    adapter.vtable.as_ptr(),
497                    on_historical_funding_rates
498                )(adapter.handle, s)
499            },
500        )
501    }
502}
503
504/// Wraps a call into the plug-in cdylib in `catch_unwind` so a plug-in panic
505/// surfaces as `None` here instead of unwinding across the FFI boundary
506/// (undefined behaviour). The plug-in's own macro-generated thunks `catch_unwind`
507/// internally, so this guard is defence in depth.
508fn guard_call<R>(plugin: &str, type_name: &str, method: &str, f: impl FnOnce() -> R) -> Option<R> {
509    match catch_unwind(AssertUnwindSafe(f)) {
510        Ok(r) => Some(r),
511        Err(_payload) => {
512            log::error!(
513                target: "nautilus_plugin",
514                "plug-in '{plugin}' ({type_name}) panicked in {method}",
515            );
516            None
517        }
518    }
519}
520
521fn invoke_lifecycle(
522    adapter: &PluginActorAdapter,
523    method: &str,
524    f: impl FnOnce(&PluginActorAdapter) -> PluginResult<()>,
525) -> anyhow::Result<()> {
526    let plugin_name = adapter.plugin_name.clone();
527    let type_name = adapter.type_name.clone();
528    let result = guard_call(&plugin_name, &type_name, method, || f(adapter));
529    finish(result, &plugin_name, &type_name, method)
530}
531
532fn invoke_event<T>(
533    adapter: &PluginActorAdapter,
534    method: &str,
535    payload: &T,
536    f: impl FnOnce(&PluginActorAdapter, *const T) -> PluginResult<()>,
537) -> anyhow::Result<()> {
538    let plugin_name = adapter.plugin_name.clone();
539    let type_name = adapter.type_name.clone();
540    let ptr: *const T = payload;
541    let result = guard_call(&plugin_name, &type_name, method, || f(adapter, ptr));
542    finish(result, &plugin_name, &type_name, method)
543}
544
545fn invoke_custom_data(
546    adapter: &PluginActorAdapter,
547    method: &str,
548    payload: PluginCustomDataRef,
549    f: impl FnOnce(&PluginActorAdapter, PluginCustomDataRef) -> PluginResult<()>,
550) -> anyhow::Result<()> {
551    let plugin_name = adapter.plugin_name.clone();
552    let type_name = adapter.type_name.clone();
553    let result = guard_call(&plugin_name, &type_name, method, || f(adapter, payload));
554    finish(result, &plugin_name, &type_name, method)
555}
556
557fn invoke_slice<T>(
558    adapter: &PluginActorAdapter,
559    method: &str,
560    payload: &[T],
561    f: impl FnOnce(&PluginActorAdapter, Slice<'_, T>) -> PluginResult<()>,
562) -> anyhow::Result<()> {
563    let plugin_name = adapter.plugin_name.clone();
564    let type_name = adapter.type_name.clone();
565    let slice = Slice::from_slice(payload);
566    let result = guard_call(&plugin_name, &type_name, method, || f(adapter, slice));
567    finish(result, &plugin_name, &type_name, method)
568}
569
570fn finish(
571    result: Option<PluginResult<()>>,
572    plugin_name: &str,
573    type_name: &str,
574    method: &str,
575) -> anyhow::Result<()> {
576    match result {
577        Some(r) => r.into_result().map_err(|e| {
578            anyhow::anyhow!(
579                "plug-in '{plugin_name}' ({type_name}) {method} returned error: {}",
580                e.message_string()
581            )
582        }),
583        None => anyhow::bail!("plug-in '{plugin_name}' ({type_name}) panicked in {method}"),
584    }
585}
586
587#[cfg(test)]
588mod tests {
589    use rstest::rstest;
590
591    use super::*;
592    use crate::{
593        bridge::{
594            host::host_vtable,
595            registry::{host_context_live_count, host_context_test_lock},
596        },
597        surfaces::actor::{PluginActor, actor_vtable},
598    };
599
600    struct DropTestActor;
601
602    impl PluginActor for DropTestActor {
603        const TYPE_NAME: &'static str = "DropTestActor";
604
605        fn new(_host: *const HostVTable, _ctx: *const HostContext, _config_json: &str) -> Self {
606            Self
607        }
608    }
609
610    fn drop_test_actor_vtable() -> ValidatedActorVTable {
611        // SAFETY: generated vtables are process-lifetime static and fill
612        // every required actor slot.
613        unsafe { ValidatedActorVTable::from_raw_unchecked(actor_vtable::<DropTestActor>()) }
614    }
615
616    #[rstest]
617    fn drop_frees_host_context() {
618        let _guard = host_context_test_lock();
619        let before = host_context_live_count();
620        // SAFETY: host_vtable is process-lifetime static.
621        let adapter = unsafe {
622            PluginActorAdapter::new(
623                ActorId::from("DropTestActor-001"),
624                "plug-in",
625                DropTestActor::TYPE_NAME,
626                drop_test_actor_vtable(),
627                host_vtable(),
628                "{}",
629            )
630        }
631        .expect("adapter construction");
632        assert_eq!(host_context_live_count(), before + 1);
633
634        drop(adapter);
635        assert_eq!(host_context_live_count(), before);
636    }
637}