1#![allow(unsafe_code)]
25#![allow(
26 clippy::multiple_unsafe_ops_per_block,
27 reason = "vtable deref and FFI call form a single boundary callback; \
28 SAFETY comments cover both ops together"
29)]
30
31use std::{
32 any::Any,
33 fmt::Debug,
34 panic::{AssertUnwindSafe, catch_unwind},
35};
36
37use nautilus_common::{
38 actor::{DataActor, DataActorConfig, DataActorCore},
39 nautilus_actor,
40 signal::Signal,
41 timer::TimeEvent,
42};
43use nautilus_model::{
44 data::{
45 Bar, CustomData, FundingRateUpdate, IndexPriceUpdate, InstrumentClose, InstrumentStatus,
46 MarkPriceUpdate, OptionChainSlice, OptionGreeks, OrderBookDelta, OrderBookDeltas,
47 OrderBookDepth10, QuoteTick, TradeTick,
48 },
49 events::{OrderCanceled, OrderFilled},
50 identifiers::ActorId,
51 instruments::InstrumentAny,
52 orderbook::OrderBook,
53};
54
55use crate::{
56 boundary::{BorrowedStr, PluginResult, Slice},
57 bridge::{
58 custom_data::{try_custom_data_boundary_ref, try_historical_custom_data_boundary_ref},
59 registry::{HostContextInner, drop_host_context, leak_host_context},
60 },
61 host::{HostContext, HostVTable},
62 manifest::ValidatedActorVTable,
63 surfaces::{
64 actor::PluginActorHandle,
65 book::{OrderBookDeltasHandle, OrderBookHandle},
66 custom_data::PluginCustomDataRef,
67 instrument::InstrumentAnyHandle,
68 option_chain::OptionChainSliceHandle,
69 },
70};
71
72pub struct PluginActorAdapter {
75 core: DataActorCore,
76 plugin_name: String,
77 type_name: String,
78 vtable: ValidatedActorVTable,
79 handle: *mut PluginActorHandle,
80 ctx: *const HostContext,
81}
82
83unsafe impl Send for PluginActorAdapter {}
88
89impl Debug for PluginActorAdapter {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 f.debug_struct(stringify!(PluginActorAdapter))
92 .field("plugin_name", &self.plugin_name)
93 .field("type_name", &self.type_name)
94 .field("actor_id", &self.core.actor_id())
95 .finish()
96 }
97}
98
99impl PluginActorAdapter {
100 pub unsafe fn new(
116 actor_id: ActorId,
117 plugin_name: impl Into<String>,
118 type_name: impl Into<String>,
119 vtable: ValidatedActorVTable,
120 host: *const HostVTable,
121 config_json: &str,
122 ) -> anyhow::Result<Self> {
123 let plugin_name = plugin_name.into();
124 let type_name = type_name.into();
125 let create = unsafe { validated_slot!(ActorVTable, vtable.as_ptr(), create) };
127
128 let ctx = leak_host_context(HostContextInner {
129 actor_id,
130 is_strategy: false,
131 });
132
133 let cfg = BorrowedStr::from_str(config_json);
134 let handle = guard_call(&plugin_name, &type_name, "create", || unsafe {
137 create(host, ctx, cfg)
138 })
139 .ok_or_else(|| {
140 unsafe { drop_host_context(ctx) };
142 anyhow::anyhow!("plug-in actor '{type_name}' panicked in create")
143 })?;
144
145 if handle.is_null() {
146 unsafe { drop_host_context(ctx) };
148 anyhow::bail!("plug-in actor '{type_name}' returned a null handle from create");
149 }
150
151 let core = DataActorCore::new(DataActorConfig {
152 actor_id: Some(actor_id),
153 log_events: true,
154 log_commands: true,
155 });
156
157 Ok(Self {
158 core,
159 plugin_name,
160 type_name,
161 vtable,
162 handle,
163 ctx,
164 })
165 }
166
167 #[must_use]
169 pub fn type_name(&self) -> &str {
170 &self.type_name
171 }
172
173 #[must_use]
175 pub fn plugin_name(&self) -> &str {
176 &self.plugin_name
177 }
178}
179
180impl Drop for PluginActorAdapter {
181 fn drop(&mut self) {
182 if !self.handle.is_null() {
183 let _ = catch_unwind(AssertUnwindSafe(|| {
184 unsafe {
186 validated_slot!(ActorVTable, self.vtable.as_ptr(), drop_handle)(self.handle);
187 };
188 }));
189 self.handle = std::ptr::null_mut();
190 }
191 unsafe { drop_host_context(self.ctx) };
193 self.ctx = std::ptr::null();
194 }
195}
196
197nautilus_actor!(PluginActorAdapter);
198
199impl DataActor for PluginActorAdapter {
200 fn on_start(&mut self) -> anyhow::Result<()> {
201 invoke_lifecycle(self, "on_start", |adapter| unsafe {
202 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_start)(adapter.handle)
203 })
204 }
205
206 fn on_stop(&mut self) -> anyhow::Result<()> {
207 invoke_lifecycle(self, "on_stop", |adapter| unsafe {
208 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_stop)(adapter.handle)
209 })
210 }
211
212 fn on_resume(&mut self) -> anyhow::Result<()> {
213 invoke_lifecycle(self, "on_resume", |adapter| unsafe {
214 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_resume)(adapter.handle)
215 })
216 }
217
218 fn on_reset(&mut self) -> anyhow::Result<()> {
219 invoke_lifecycle(self, "on_reset", |adapter| unsafe {
220 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_reset)(adapter.handle)
221 })
222 }
223
224 fn on_dispose(&mut self) -> anyhow::Result<()> {
225 invoke_lifecycle(self, "on_dispose", |adapter| unsafe {
226 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_dispose)(adapter.handle)
227 })
228 }
229
230 fn on_degrade(&mut self) -> anyhow::Result<()> {
231 invoke_lifecycle(self, "on_degrade", |adapter| unsafe {
232 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_degrade)(adapter.handle)
233 })
234 }
235
236 fn on_fault(&mut self) -> anyhow::Result<()> {
237 invoke_lifecycle(self, "on_fault", |adapter| unsafe {
238 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_fault)(adapter.handle)
239 })
240 }
241
242 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
243 invoke_event(self, "on_time_event", event, |adapter, p| unsafe {
244 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_time_event)(adapter.handle, p)
245 })
246 }
247
248 fn on_data(&mut self, data: &CustomData) -> anyhow::Result<()> {
249 let Some(data_ref) = try_custom_data_boundary_ref(data) else {
250 return Ok(());
251 };
252 invoke_custom_data(self, "on_data", data_ref, |adapter, value| unsafe {
253 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_data)(adapter.handle, value)
254 })
255 }
256
257 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
258 let handle = InstrumentAnyHandle::new(instrument.clone());
259 invoke_event(self, "on_instrument", &handle, |adapter, p| unsafe {
260 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_instrument)(adapter.handle, p)
261 })
262 }
263
264 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
265 let handle = OrderBookDeltasHandle::new(deltas.clone());
266 invoke_event(self, "on_book_deltas", &handle, |adapter, p| unsafe {
267 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_book_deltas)(adapter.handle, p)
268 })
269 }
270
271 fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
272 let handle = OrderBookHandle::new(book.clone());
273 invoke_event(self, "on_book", &handle, |adapter, p| unsafe {
274 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_book)(adapter.handle, p)
275 })
276 }
277
278 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
279 invoke_event(self, "on_quote", quote, |adapter, p| unsafe {
280 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_quote)(adapter.handle, p)
281 })
282 }
283
284 fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
285 invoke_event(self, "on_trade", trade, |adapter, p| unsafe {
286 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_trade)(adapter.handle, p)
287 })
288 }
289
290 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
291 invoke_event(self, "on_bar", bar, |adapter, p| unsafe {
292 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_bar)(adapter.handle, p)
293 })
294 }
295
296 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
297 invoke_event(self, "on_mark_price", mark_price, |adapter, p| unsafe {
298 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_mark_price)(adapter.handle, p)
299 })
300 }
301
302 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
303 invoke_event(self, "on_index_price", index_price, |adapter, p| unsafe {
304 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_index_price)(adapter.handle, p)
305 })
306 }
307
308 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
309 invoke_event(self, "on_funding_rate", funding_rate, |adapter, p| unsafe {
310 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_funding_rate)(
311 adapter.handle,
312 p,
313 )
314 })
315 }
316
317 fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
318 invoke_event(self, "on_option_greeks", greeks, |adapter, p| unsafe {
319 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_option_greeks)(
320 adapter.handle,
321 p,
322 )
323 })
324 }
325
326 fn on_option_chain(&mut self, chain: &OptionChainSlice) -> anyhow::Result<()> {
327 let handle = OptionChainSliceHandle::new(chain.clone());
328 invoke_event(self, "on_option_chain", &handle, |adapter, p| unsafe {
329 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_option_chain)(
330 adapter.handle,
331 p,
332 )
333 })
334 }
335
336 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
337 invoke_event(self, "on_instrument_status", data, |adapter, p| unsafe {
338 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_instrument_status)(
339 adapter.handle,
340 p,
341 )
342 })
343 }
344
345 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
346 invoke_event(self, "on_instrument_close", update, |adapter, p| unsafe {
347 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_instrument_close)(
348 adapter.handle,
349 p,
350 )
351 })
352 }
353
354 fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
355 invoke_event(self, "on_order_filled", event, |adapter, p| unsafe {
356 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_order_filled)(
357 adapter.handle,
358 p,
359 )
360 })
361 }
362
363 fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
364 invoke_event(self, "on_order_canceled", event, |adapter, p| unsafe {
365 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_order_canceled)(
366 adapter.handle,
367 p,
368 )
369 })
370 }
371
372 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
373 invoke_event(self, "on_signal", signal, |adapter, p| unsafe {
374 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_signal)(adapter.handle, p)
375 })
376 }
377
378 fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
379 let Some(data_ref) = try_historical_custom_data_boundary_ref(data) else {
380 return Ok(());
381 };
382 invoke_custom_data(
383 self,
384 "on_historical_data",
385 data_ref,
386 |adapter, value| unsafe {
387 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_data)(
388 adapter.handle,
389 value,
390 )
391 },
392 )
393 }
394
395 fn on_historical_book_deltas(&mut self, deltas: &[OrderBookDelta]) -> anyhow::Result<()> {
396 invoke_slice(
397 self,
398 "on_historical_book_deltas",
399 deltas,
400 |adapter, s| unsafe {
401 validated_slot!(
402 ActorVTable,
403 adapter.vtable.as_ptr(),
404 on_historical_book_deltas
405 )(adapter.handle, s)
406 },
407 )
408 }
409
410 fn on_historical_book_depth(&mut self, depths: &[OrderBookDepth10]) -> anyhow::Result<()> {
411 invoke_slice(
412 self,
413 "on_historical_book_depth",
414 depths,
415 |adapter, s| unsafe {
416 validated_slot!(
417 ActorVTable,
418 adapter.vtable.as_ptr(),
419 on_historical_book_depth
420 )(adapter.handle, s)
421 },
422 )
423 }
424
425 fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
426 invoke_slice(self, "on_historical_quotes", quotes, |adapter, s| unsafe {
427 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_historical_quotes)(
428 adapter.handle,
429 s,
430 )
431 })
432 }
433
434 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
435 invoke_slice(self, "on_historical_trades", trades, |adapter, s| unsafe {
436 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_historical_trades)(
437 adapter.handle,
438 s,
439 )
440 })
441 }
442
443 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
444 invoke_slice(self, "on_historical_bars", bars, |adapter, s| unsafe {
445 validated_slot!(ActorVTable, adapter.vtable.as_ptr(), on_historical_bars)(
446 adapter.handle,
447 s,
448 )
449 })
450 }
451
452 fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
453 invoke_slice(
454 self,
455 "on_historical_mark_prices",
456 mark_prices,
457 |adapter, s| unsafe {
458 validated_slot!(
459 ActorVTable,
460 adapter.vtable.as_ptr(),
461 on_historical_mark_prices
462 )(adapter.handle, s)
463 },
464 )
465 }
466
467 fn on_historical_index_prices(
468 &mut self,
469 index_prices: &[IndexPriceUpdate],
470 ) -> anyhow::Result<()> {
471 invoke_slice(
472 self,
473 "on_historical_index_prices",
474 index_prices,
475 |adapter, s| unsafe {
476 validated_slot!(
477 ActorVTable,
478 adapter.vtable.as_ptr(),
479 on_historical_index_prices
480 )(adapter.handle, s)
481 },
482 )
483 }
484
485 fn on_historical_funding_rates(
486 &mut self,
487 funding_rates: &[FundingRateUpdate],
488 ) -> anyhow::Result<()> {
489 invoke_slice(
490 self,
491 "on_historical_funding_rates",
492 funding_rates,
493 |adapter, s| unsafe {
494 validated_slot!(
495 ActorVTable,
496 adapter.vtable.as_ptr(),
497 on_historical_funding_rates
498 )(adapter.handle, s)
499 },
500 )
501 }
502}
503
504fn guard_call<R>(plugin: &str, type_name: &str, method: &str, f: impl FnOnce() -> R) -> Option<R> {
509 match catch_unwind(AssertUnwindSafe(f)) {
510 Ok(r) => Some(r),
511 Err(_payload) => {
512 log::error!(
513 target: "nautilus_plugin",
514 "plug-in '{plugin}' ({type_name}) panicked in {method}",
515 );
516 None
517 }
518 }
519}
520
521fn invoke_lifecycle(
522 adapter: &PluginActorAdapter,
523 method: &str,
524 f: impl FnOnce(&PluginActorAdapter) -> PluginResult<()>,
525) -> anyhow::Result<()> {
526 let plugin_name = adapter.plugin_name.clone();
527 let type_name = adapter.type_name.clone();
528 let result = guard_call(&plugin_name, &type_name, method, || f(adapter));
529 finish(result, &plugin_name, &type_name, method)
530}
531
532fn invoke_event<T>(
533 adapter: &PluginActorAdapter,
534 method: &str,
535 payload: &T,
536 f: impl FnOnce(&PluginActorAdapter, *const T) -> PluginResult<()>,
537) -> anyhow::Result<()> {
538 let plugin_name = adapter.plugin_name.clone();
539 let type_name = adapter.type_name.clone();
540 let ptr: *const T = payload;
541 let result = guard_call(&plugin_name, &type_name, method, || f(adapter, ptr));
542 finish(result, &plugin_name, &type_name, method)
543}
544
545fn invoke_custom_data(
546 adapter: &PluginActorAdapter,
547 method: &str,
548 payload: PluginCustomDataRef,
549 f: impl FnOnce(&PluginActorAdapter, PluginCustomDataRef) -> PluginResult<()>,
550) -> anyhow::Result<()> {
551 let plugin_name = adapter.plugin_name.clone();
552 let type_name = adapter.type_name.clone();
553 let result = guard_call(&plugin_name, &type_name, method, || f(adapter, payload));
554 finish(result, &plugin_name, &type_name, method)
555}
556
557fn invoke_slice<T>(
558 adapter: &PluginActorAdapter,
559 method: &str,
560 payload: &[T],
561 f: impl FnOnce(&PluginActorAdapter, Slice<'_, T>) -> PluginResult<()>,
562) -> anyhow::Result<()> {
563 let plugin_name = adapter.plugin_name.clone();
564 let type_name = adapter.type_name.clone();
565 let slice = Slice::from_slice(payload);
566 let result = guard_call(&plugin_name, &type_name, method, || f(adapter, slice));
567 finish(result, &plugin_name, &type_name, method)
568}
569
570fn finish(
571 result: Option<PluginResult<()>>,
572 plugin_name: &str,
573 type_name: &str,
574 method: &str,
575) -> anyhow::Result<()> {
576 match result {
577 Some(r) => r.into_result().map_err(|e| {
578 anyhow::anyhow!(
579 "plug-in '{plugin_name}' ({type_name}) {method} returned error: {}",
580 e.message_string()
581 )
582 }),
583 None => anyhow::bail!("plug-in '{plugin_name}' ({type_name}) panicked in {method}"),
584 }
585}
586
587#[cfg(test)]
588mod tests {
589 use rstest::rstest;
590
591 use super::*;
592 use crate::{
593 bridge::{
594 host::host_vtable,
595 registry::{host_context_live_count, host_context_test_lock},
596 },
597 surfaces::actor::{PluginActor, actor_vtable},
598 };
599
600 struct DropTestActor;
601
602 impl PluginActor for DropTestActor {
603 const TYPE_NAME: &'static str = "DropTestActor";
604
605 fn new(_host: *const HostVTable, _ctx: *const HostContext, _config_json: &str) -> Self {
606 Self
607 }
608 }
609
610 fn drop_test_actor_vtable() -> ValidatedActorVTable {
611 unsafe { ValidatedActorVTable::from_raw_unchecked(actor_vtable::<DropTestActor>()) }
614 }
615
616 #[rstest]
617 fn drop_frees_host_context() {
618 let _guard = host_context_test_lock();
619 let before = host_context_live_count();
620 let adapter = unsafe {
622 PluginActorAdapter::new(
623 ActorId::from("DropTestActor-001"),
624 "plug-in",
625 DropTestActor::TYPE_NAME,
626 drop_test_actor_vtable(),
627 host_vtable(),
628 "{}",
629 )
630 }
631 .expect("adapter construction");
632 assert_eq!(host_context_live_count(), before + 1);
633
634 drop(adapter);
635 assert_eq!(host_context_live_count(), before);
636 }
637}