#![allow(unsafe_code)]
#![allow(
clippy::multiple_unsafe_ops_per_block,
reason = "vtable deref and FFI call form a single boundary callback; \
SAFETY comments cover both ops together"
)]
use std::{
any::Any,
fmt::Debug,
panic::{AssertUnwindSafe, catch_unwind},
};
use nautilus_common::{
actor::{DataActor, DataActorConfig, DataActorCore},
nautilus_actor,
signal::Signal,
timer::TimeEvent,
};
use nautilus_model::{
data::{
Bar, CustomData, FundingRateUpdate, IndexPriceUpdate, InstrumentClose, InstrumentStatus,
MarkPriceUpdate, OptionChainSlice, OptionGreeks, OrderBookDelta, OrderBookDeltas,
OrderBookDepth10, QuoteTick, TradeTick,
},
events::{OrderCanceled, OrderFilled},
identifiers::ActorId,
instruments::InstrumentAny,
orderbook::OrderBook,
};
use crate::{
boundary::{BorrowedStr, PluginResult, Slice},
bridge::{
custom_data::{try_custom_data_boundary_ref, try_historical_custom_data_boundary_ref},
registry::{HostContextInner, drop_host_context, leak_host_context},
},
host::{HostContext, HostVTable},
manifest::ValidatedActorVTable,
surfaces::{
actor::PluginActorHandle,
book::{OrderBookDeltasHandle, OrderBookHandle},
custom_data::PluginCustomDataRef,
instrument::InstrumentAnyHandle,
option_chain::OptionChainSliceHandle,
},
};
pub struct PluginActorAdapter {
core: DataActorCore,
plugin_name: String,
type_name: String,
vtable: ValidatedActorVTable,
handle: *mut PluginActorHandle,
ctx: *const HostContext,
}
unsafe impl Send for PluginActorAdapter {}
impl Debug for PluginActorAdapter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(PluginActorAdapter))
.field("plugin_name", &self.plugin_name)
.field("type_name", &self.type_name)
.field("actor_id", &self.core.actor_id())
.finish()
}
}
impl PluginActorAdapter {
pub unsafe fn new(
actor_id: ActorId,
plugin_name: impl Into<String>,
type_name: impl Into<String>,
vtable: ValidatedActorVTable,
host: *const HostVTable,
config_json: &str,
) -> anyhow::Result<Self> {
let plugin_name = plugin_name.into();
let type_name = type_name.into();
let create = unsafe { validated_slot!(ActorVTable, vtable.as_ptr(), create) };
let ctx = leak_host_context(HostContextInner {
actor_id,
is_strategy: false,
});
let cfg = BorrowedStr::from_str(config_json);
let handle = guard_call(&plugin_name, &type_name, "create", || unsafe {
create(host, ctx, cfg)
})
.ok_or_else(|| {
unsafe { drop_host_context(ctx) };
anyhow::anyhow!("plug-in actor '{type_name}' panicked in create")
})?;
if handle.is_null() {
unsafe { drop_host_context(ctx) };
anyhow::bail!("plug-in actor '{type_name}' returned a null handle from create");
}
let core = DataActorCore::new(DataActorConfig {
actor_id: Some(actor_id),
log_events: true,
log_commands: true,
});
Ok(Self {
core,
plugin_name,
type_name,
vtable,
handle,
ctx,
})
}
#[must_use]
pub fn type_name(&self) -> &str {
&self.type_name
}
#[must_use]
pub fn plugin_name(&self) -> &str {
&self.plugin_name
}
}
impl Drop for PluginActorAdapter {
fn drop(&mut self) {
if !self.handle.is_null() {
let _ = catch_unwind(AssertUnwindSafe(|| {
unsafe {
validated_slot!(ActorVTable, self.vtable.as_ptr(), drop_handle)(self.handle);
};
}));
self.handle = std::ptr::null_mut();
}
unsafe { drop_host_context(self.ctx) };
self.ctx = std::ptr::null();
}
}
nautilus_actor!(PluginActorAdapter);
impl DataActor for PluginActorAdapter {
fn on_start(&mut self) -> anyhow::Result<()> {
invoke_lifecycle(self, "on_start", |adapter| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_start)(adapter.handle)
})
}
fn on_stop(&mut self) -> anyhow::Result<()> {
invoke_lifecycle(self, "on_stop", |adapter| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_stop)(adapter.handle)
})
}
fn on_resume(&mut self) -> anyhow::Result<()> {
invoke_lifecycle(self, "on_resume", |adapter| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_resume)(adapter.handle)
})
}
fn on_reset(&mut self) -> anyhow::Result<()> {
invoke_lifecycle(self, "on_reset", |adapter| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_reset)(adapter.handle)
})
}
fn on_dispose(&mut self) -> anyhow::Result<()> {
invoke_lifecycle(self, "on_dispose", |adapter| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_dispose)(adapter.handle)
})
}
fn on_degrade(&mut self) -> anyhow::Result<()> {
invoke_lifecycle(self, "on_degrade", |adapter| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_degrade)(adapter.handle)
})
}
fn on_fault(&mut self) -> anyhow::Result<()> {
invoke_lifecycle(self, "on_fault", |adapter| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_fault)(adapter.handle)
})
}
fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
invoke_event(self, "on_time_event", event, |adapter, p| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_time_event)(adapter.handle, p)
})
}
fn on_data(&mut self, data: &CustomData) -> anyhow::Result<()> {
let Some(data_ref) = try_custom_data_boundary_ref(data) else {
return Ok(());
};
invoke_custom_data(self, "on_data", data_ref, |adapter, value| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_data)(adapter.handle, value)
})
}
fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
let handle = InstrumentAnyHandle::new(instrument.clone());
invoke_event(self, "on_instrument", &handle, |adapter, p| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_instrument)(adapter.handle, p)
})
}
fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
let handle = OrderBookDeltasHandle::new(deltas.clone());
invoke_event(self, "on_book_deltas", &handle, |adapter, p| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_book_deltas)(adapter.handle, p)
})
}
fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
let handle = OrderBookHandle::new(book.clone());
invoke_event(self, "on_book", &handle, |adapter, p| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_book)(adapter.handle, p)
})
}
fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
invoke_event(self, "on_quote", quote, |adapter, p| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_quote)(adapter.handle, p)
})
}
fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
invoke_event(self, "on_trade", trade, |adapter, p| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_trade)(adapter.handle, p)
})
}
fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
invoke_event(self, "on_bar", bar, |adapter, p| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_bar)(adapter.handle, p)
})
}
fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
invoke_event(self, "on_mark_price", mark_price, |adapter, p| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_mark_price)(adapter.handle, p)
})
}
fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
invoke_event(self, "on_index_price", index_price, |adapter, p| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_index_price)(adapter.handle, p)
})
}
fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
invoke_event(self, "on_funding_rate", funding_rate, |adapter, p| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_funding_rate)(
adapter.handle,
p,
)
})
}
fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
invoke_event(self, "on_option_greeks", greeks, |adapter, p| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_option_greeks)(
adapter.handle,
p,
)
})
}
fn on_option_chain(&mut self, chain: &OptionChainSlice) -> anyhow::Result<()> {
let handle = OptionChainSliceHandle::new(chain.clone());
invoke_event(self, "on_option_chain", &handle, |adapter, p| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_option_chain)(
adapter.handle,
p,
)
})
}
fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
invoke_event(self, "on_instrument_status", data, |adapter, p| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_instrument_status)(
adapter.handle,
p,
)
})
}
fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
invoke_event(self, "on_instrument_close", update, |adapter, p| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_instrument_close)(
adapter.handle,
p,
)
})
}
fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
invoke_event(self, "on_order_filled", event, |adapter, p| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_order_filled)(
adapter.handle,
p,
)
})
}
fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
invoke_event(self, "on_order_canceled", event, |adapter, p| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_order_canceled)(
adapter.handle,
p,
)
})
}
fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
invoke_event(self, "on_signal", signal, |adapter, p| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_signal)(adapter.handle, p)
})
}
fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
let Some(data_ref) = try_historical_custom_data_boundary_ref(data) else {
return Ok(());
};
invoke_custom_data(
self,
"on_historical_data",
data_ref,
|adapter, value| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_data)(
adapter.handle,
value,
)
},
)
}
fn on_historical_book_deltas(&mut self, deltas: &[OrderBookDelta]) -> anyhow::Result<()> {
invoke_slice(
self,
"on_historical_book_deltas",
deltas,
|adapter, s| unsafe {
validated_slot!(
ActorVTable,
adapter.vtable.as_ptr(),
on_historical_book_deltas
)(adapter.handle, s)
},
)
}
fn on_historical_book_depth(&mut self, depths: &[OrderBookDepth10]) -> anyhow::Result<()> {
invoke_slice(
self,
"on_historical_book_depth",
depths,
|adapter, s| unsafe {
validated_slot!(
ActorVTable,
adapter.vtable.as_ptr(),
on_historical_book_depth
)(adapter.handle, s)
},
)
}
fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
invoke_slice(self, "on_historical_quotes", quotes, |adapter, s| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_historical_quotes)(
adapter.handle,
s,
)
})
}
fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
invoke_slice(self, "on_historical_trades", trades, |adapter, s| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_historical_trades)(
adapter.handle,
s,
)
})
}
fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
invoke_slice(self, "on_historical_bars", bars, |adapter, s| unsafe {
validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_historical_bars)(
adapter.handle,
s,
)
})
}
fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
invoke_slice(
self,
"on_historical_mark_prices",
mark_prices,
|adapter, s| unsafe {
validated_slot!(
ActorVTable,
adapter.vtable.as_ptr(),
on_historical_mark_prices
)(adapter.handle, s)
},
)
}
fn on_historical_index_prices(
&mut self,
index_prices: &[IndexPriceUpdate],
) -> anyhow::Result<()> {
invoke_slice(
self,
"on_historical_index_prices",
index_prices,
|adapter, s| unsafe {
validated_slot!(
ActorVTable,
adapter.vtable.as_ptr(),
on_historical_index_prices
)(adapter.handle, s)
},
)
}
fn on_historical_funding_rates(
&mut self,
funding_rates: &[FundingRateUpdate],
) -> anyhow::Result<()> {
invoke_slice(
self,
"on_historical_funding_rates",
funding_rates,
|adapter, s| unsafe {
validated_slot!(
ActorVTable,
adapter.vtable.as_ptr(),
on_historical_funding_rates
)(adapter.handle, s)
},
)
}
}
fn guard_call<R>(plugin: &str, type_name: &str, method: &str, f: impl FnOnce() -> R) -> Option<R> {
match catch_unwind(AssertUnwindSafe(f)) {
Ok(r) => Some(r),
Err(_payload) => {
log::error!(
target: "nautilus_plugin",
"plug-in '{plugin}' ({type_name}) panicked in {method}",
);
None
}
}
}
fn invoke_lifecycle(
adapter: &PluginActorAdapter,
method: &str,
f: impl FnOnce(&PluginActorAdapter) -> PluginResult<()>,
) -> anyhow::Result<()> {
let plugin_name = adapter.plugin_name.clone();
let type_name = adapter.type_name.clone();
let result = guard_call(&plugin_name, &type_name, method, || f(adapter));
finish(result, &plugin_name, &type_name, method)
}
fn invoke_event<T>(
adapter: &PluginActorAdapter,
method: &str,
payload: &T,
f: impl FnOnce(&PluginActorAdapter, *const T) -> PluginResult<()>,
) -> anyhow::Result<()> {
let plugin_name = adapter.plugin_name.clone();
let type_name = adapter.type_name.clone();
let ptr: *const T = payload;
let result = guard_call(&plugin_name, &type_name, method, || f(adapter, ptr));
finish(result, &plugin_name, &type_name, method)
}
fn invoke_custom_data(
adapter: &PluginActorAdapter,
method: &str,
payload: PluginCustomDataRef,
f: impl FnOnce(&PluginActorAdapter, PluginCustomDataRef) -> PluginResult<()>,
) -> anyhow::Result<()> {
let plugin_name = adapter.plugin_name.clone();
let type_name = adapter.type_name.clone();
let result = guard_call(&plugin_name, &type_name, method, || f(adapter, payload));
finish(result, &plugin_name, &type_name, method)
}
fn invoke_slice<T>(
adapter: &PluginActorAdapter,
method: &str,
payload: &[T],
f: impl FnOnce(&PluginActorAdapter, Slice<'_, T>) -> PluginResult<()>,
) -> anyhow::Result<()> {
let plugin_name = adapter.plugin_name.clone();
let type_name = adapter.type_name.clone();
let slice = Slice::from_slice(payload);
let result = guard_call(&plugin_name, &type_name, method, || f(adapter, slice));
finish(result, &plugin_name, &type_name, method)
}
fn finish(
result: Option<PluginResult<()>>,
plugin_name: &str,
type_name: &str,
method: &str,
) -> anyhow::Result<()> {
match result {
Some(r) => r.into_result().map_err(|e| {
anyhow::anyhow!(
"plug-in '{plugin_name}' ({type_name}) {method} returned error: {}",
e.message_string()
)
}),
None => anyhow::bail!("plug-in '{plugin_name}' ({type_name}) panicked in {method}"),
}
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use super::*;
use crate::{
bridge::{
host::host_vtable,
registry::{host_context_live_count, host_context_test_lock},
},
surfaces::actor::{PluginActor, actor_vtable},
};
struct DropTestActor;
impl PluginActor for DropTestActor {
const TYPE_NAME: &'static str = "DropTestActor";
fn new(_host: *const HostVTable, _ctx: *const HostContext, _config_json: &str) -> Self {
Self
}
}
fn drop_test_actor_vtable() -> ValidatedActorVTable {
unsafe { ValidatedActorVTable::from_raw_unchecked(actor_vtable::<DropTestActor>()) }
}
#[rstest]
fn drop_frees_host_context() {
let _guard = host_context_test_lock();
let before = host_context_live_count();
let adapter = unsafe {
PluginActorAdapter::new(
ActorId::from("DropTestActor-001"),
"plug-in",
DropTestActor::TYPE_NAME,
drop_test_actor_vtable(),
host_vtable(),
"{}",
)
}
.expect("adapter construction");
assert_eq!(host_context_live_count(), before + 1);
drop(adapter);
assert_eq!(host_context_live_count(), before);
}
}