#![allow(unsafe_code)]
use std::sync::{
Mutex, MutexGuard, OnceLock,
atomic::{AtomicPtr, AtomicU64, Ordering},
};
use nautilus_core::{UUID4, UnixNanos};
use nautilus_model::{
enums::{OrderSide, TimeInForce},
identifiers::{AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId},
orders::{MarketOrder, OrderAny},
types::Quantity,
};
use nautilus_plugin::{
NAUTILUS_PLUGIN_ABI_VERSION,
boundary::{BorrowedStr, OwnedBytes, PluginResult, Slice},
host::{HostContext, HostLogLevel, HostVTable},
surfaces::{
commands::{
CancelAllOrdersCommand, CancelAllOrdersHandle, CancelOrderCommand, CancelOrderHandle,
CancelOrdersCommand, CancelOrdersHandle, CloseAllPositionsCommand,
CloseAllPositionsHandle, ClosePositionCommand, ClosePositionHandle, ModifyOrderCommand,
ModifyOrderHandle, QueryAccountCommand, QueryAccountHandle, QueryOrderCommand,
QueryOrderHandle, SubmitOrderCommand, SubmitOrderHandle, SubmitOrderListCommand,
SubmitOrderListHandle,
},
strategy::{PluginStrategy, strategy_vtable},
},
};
use rstest::rstest;
macro_rules! generated_slot {
($vtable:expr, $slot:ident) => {{
($vtable)
.$slot
.expect(concat!("generated vtable includes ", stringify!($slot)))
}};
}
#[repr(usize)]
#[derive(Clone, Copy, Debug)]
enum ExecHook {
Submit,
Cancel,
Modify,
SubmitList,
CancelOrders,
CancelAll,
ClosePosition,
CloseAll,
QueryAccount,
QueryOrder,
}
const HOOK_COUNT: usize = ExecHook::QueryOrder as usize + 1;
static HOOK_CALLS: [AtomicU64; HOOK_COUNT] = [const { AtomicU64::new(0) }; HOOK_COUNT];
static LAST_CTX: [AtomicPtr<HostContext>; HOOK_COUNT] =
[const { AtomicPtr::new(std::ptr::null_mut()) }; HOOK_COUNT];
fn dispatch_lock() -> MutexGuard<'static, ()> {
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| Mutex::new(()))
.lock()
.unwrap_or_else(|p| p.into_inner())
}
fn reset_all() {
for c in &HOOK_CALLS {
c.store(0, Ordering::SeqCst);
}
for c in &LAST_CTX {
c.store(std::ptr::null_mut(), Ordering::SeqCst);
}
}
fn record(ctx: *const HostContext, hook: ExecHook) {
let idx = hook as usize;
HOOK_CALLS[idx].fetch_add(1, Ordering::SeqCst);
LAST_CTX[idx].store(ctx.cast_mut(), Ordering::SeqCst);
}
fn assert_only_hook(expected: ExecHook) {
for (i, c) in HOOK_CALLS.iter().enumerate() {
let v = c.load(Ordering::SeqCst);
if i == expected as usize {
assert_eq!(v, 1, "hook {expected:?} should have fired exactly once");
} else {
assert_eq!(
v, 0,
"hook at index {i} fired but {expected:?} was expected",
);
}
}
}
fn assert_ctx(hook: ExecHook, expected: *const HostContext) {
let last = LAST_CTX[hook as usize].load(Ordering::SeqCst) as *const HostContext;
assert!(
std::ptr::eq(last, expected),
"host context not threaded through to {hook:?}: expected {expected:?}, was {last:?}",
);
}
unsafe extern "C" fn stub_clock_now_ns() -> u64 {
0
}
unsafe extern "C" fn stub_log(
_level: HostLogLevel,
_target: BorrowedStr<'_>,
_message: BorrowedStr<'_>,
) {
}
macro_rules! stub_bytes_handler {
($name:ident) => {
unsafe extern "C" fn $name(
_ctx: *const HostContext,
_arg: BorrowedStr<'_>,
) -> PluginResult<OwnedBytes> {
PluginResult::Ok(OwnedBytes::empty())
}
};
}
stub_bytes_handler!(stub_cache_instrument);
stub_bytes_handler!(stub_cache_account);
stub_bytes_handler!(stub_cache_order);
stub_bytes_handler!(stub_cache_position);
stub_bytes_handler!(stub_cache_orders_for_strategy);
stub_bytes_handler!(stub_cache_positions_for_strategy);
macro_rules! stub_subscription_handler {
($name:ident) => {
unsafe extern "C" fn $name(
_ctx: *const HostContext,
_id: BorrowedStr<'_>,
_client_id: BorrowedStr<'_>,
_params_json: BorrowedStr<'_>,
) -> PluginResult<()> {
PluginResult::Ok(())
}
};
}
stub_subscription_handler!(stub_subscribe_quotes);
stub_subscription_handler!(stub_unsubscribe_quotes);
stub_subscription_handler!(stub_subscribe_trades);
stub_subscription_handler!(stub_unsubscribe_trades);
stub_subscription_handler!(stub_subscribe_bars);
stub_subscription_handler!(stub_unsubscribe_bars);
stub_subscription_handler!(stub_unsubscribe_book_deltas);
unsafe extern "C" fn stub_subscribe_book_deltas(
_ctx: *const HostContext,
_instrument_id: BorrowedStr<'_>,
_book_type: u8,
_depth: usize,
_client_id: BorrowedStr<'_>,
_managed: u8,
_params_json: BorrowedStr<'_>,
) -> PluginResult<()> {
PluginResult::Ok(())
}
unsafe extern "C" fn stub_subscribe_book_at_interval(
_ctx: *const HostContext,
_instrument_id: BorrowedStr<'_>,
_book_type: u8,
_depth: usize,
_interval_ms: usize,
_client_id: BorrowedStr<'_>,
_params_json: BorrowedStr<'_>,
) -> PluginResult<()> {
PluginResult::Ok(())
}
unsafe extern "C" fn stub_unsubscribe_book_at_interval(
_ctx: *const HostContext,
_instrument_id: BorrowedStr<'_>,
_interval_ms: usize,
_client_id: BorrowedStr<'_>,
_params_json: BorrowedStr<'_>,
) -> PluginResult<()> {
PluginResult::Ok(())
}
unsafe extern "C" fn stub_msgbus_publish(
_ctx: *const HostContext,
_topic: BorrowedStr<'_>,
_payload: Slice<'_, u8>,
) -> PluginResult<()> {
PluginResult::Ok(())
}
unsafe extern "C" fn stub_set_time_alert(
_ctx: *const HostContext,
_name: BorrowedStr<'_>,
_alert_time_ns: u64,
_allow_past: u8,
) -> PluginResult<()> {
PluginResult::Ok(())
}
unsafe extern "C" fn stub_set_timer(
_ctx: *const HostContext,
_name: BorrowedStr<'_>,
_interval_ns: u64,
_start_time_ns: u64,
_stop_time_ns: u64,
_allow_past: u8,
_fire_immediately: u8,
) -> PluginResult<()> {
PluginResult::Ok(())
}
unsafe extern "C" fn stub_cancel_timer(
_ctx: *const HostContext,
_name: BorrowedStr<'_>,
) -> PluginResult<()> {
PluginResult::Ok(())
}
unsafe extern "C" fn recording_submit_order(
ctx: *const HostContext,
_command: *const SubmitOrderHandle,
) -> PluginResult<()> {
record(ctx, ExecHook::Submit);
PluginResult::Ok(())
}
unsafe extern "C" fn recording_cancel_order(
ctx: *const HostContext,
_command: *const CancelOrderHandle,
) -> PluginResult<()> {
record(ctx, ExecHook::Cancel);
PluginResult::Ok(())
}
unsafe extern "C" fn recording_modify_order(
ctx: *const HostContext,
_command: *const ModifyOrderHandle,
) -> PluginResult<()> {
record(ctx, ExecHook::Modify);
PluginResult::Ok(())
}
unsafe extern "C" fn recording_submit_order_list(
ctx: *const HostContext,
_command: *const SubmitOrderListHandle,
) -> PluginResult<()> {
record(ctx, ExecHook::SubmitList);
PluginResult::Ok(())
}
unsafe extern "C" fn recording_cancel_orders(
ctx: *const HostContext,
_command: *const CancelOrdersHandle,
) -> PluginResult<()> {
record(ctx, ExecHook::CancelOrders);
PluginResult::Ok(())
}
unsafe extern "C" fn recording_cancel_all_orders(
ctx: *const HostContext,
_command: *const CancelAllOrdersHandle,
) -> PluginResult<()> {
record(ctx, ExecHook::CancelAll);
PluginResult::Ok(())
}
unsafe extern "C" fn recording_close_position(
ctx: *const HostContext,
_command: *const ClosePositionHandle,
) -> PluginResult<()> {
record(ctx, ExecHook::ClosePosition);
PluginResult::Ok(())
}
unsafe extern "C" fn recording_close_all_positions(
ctx: *const HostContext,
_command: *const CloseAllPositionsHandle,
) -> PluginResult<()> {
record(ctx, ExecHook::CloseAll);
PluginResult::Ok(())
}
unsafe extern "C" fn recording_query_account(
ctx: *const HostContext,
_command: *const QueryAccountHandle,
) -> PluginResult<()> {
record(ctx, ExecHook::QueryAccount);
PluginResult::Ok(())
}
unsafe extern "C" fn recording_query_order(
ctx: *const HostContext,
_command: *const QueryOrderHandle,
) -> PluginResult<()> {
record(ctx, ExecHook::QueryOrder);
PluginResult::Ok(())
}
fn make_market_order() -> OrderAny {
OrderAny::Market(MarketOrder::new(
TraderId::from("TRADER-001"),
StrategyId::from("S-001"),
InstrumentId::from("ETH-USDT.BINANCE"),
ClientOrderId::from("O-1"),
OrderSide::Buy,
Quantity::from("1.0"),
TimeInForce::Gtc,
UUID4::new(),
UnixNanos::default(),
false,
false,
None,
None,
None,
None,
None,
None,
None,
None,
))
}
static TEST_HOST: HostVTable = HostVTable {
abi_version: NAUTILUS_PLUGIN_ABI_VERSION,
clock_now_ns: stub_clock_now_ns,
log: stub_log,
cache_instrument: stub_cache_instrument,
cache_account: stub_cache_account,
cache_order: stub_cache_order,
cache_position: stub_cache_position,
cache_orders_for_strategy: stub_cache_orders_for_strategy,
cache_positions_for_strategy: stub_cache_positions_for_strategy,
subscribe_quotes: stub_subscribe_quotes,
unsubscribe_quotes: stub_unsubscribe_quotes,
subscribe_trades: stub_subscribe_trades,
unsubscribe_trades: stub_unsubscribe_trades,
subscribe_bars: stub_subscribe_bars,
unsubscribe_bars: stub_unsubscribe_bars,
subscribe_book_deltas: stub_subscribe_book_deltas,
unsubscribe_book_deltas: stub_unsubscribe_book_deltas,
subscribe_book_at_interval: stub_subscribe_book_at_interval,
unsubscribe_book_at_interval: stub_unsubscribe_book_at_interval,
msgbus_publish: stub_msgbus_publish,
set_time_alert: stub_set_time_alert,
set_timer: stub_set_timer,
cancel_timer: stub_cancel_timer,
submit_order: recording_submit_order,
cancel_order: recording_cancel_order,
modify_order: recording_modify_order,
submit_order_list: recording_submit_order_list,
cancel_orders: recording_cancel_orders,
cancel_all_orders: recording_cancel_all_orders,
close_position: recording_close_position,
close_all_positions: recording_close_all_positions,
query_account: recording_query_account,
query_order: recording_query_order,
};
thread_local! {
static TARGET: std::cell::Cell<ExecHook> = const { std::cell::Cell::new(ExecHook::Submit) };
}
fn set_target(hook: ExecHook) {
TARGET.with(|t| t.set(hook));
}
struct ExecStrategy {
host: *const HostVTable,
ctx: *const HostContext,
}
unsafe impl Send for ExecStrategy {}
impl PluginStrategy for ExecStrategy {
const TYPE_NAME: &'static str = "ExecStrategy";
fn new(host: *const HostVTable, ctx: *const HostContext, _config_json: &str) -> Self {
Self { host, ctx }
}
fn on_start(&mut self) -> anyhow::Result<()> {
let host = unsafe { &*self.host };
let r = match TARGET.with(std::cell::Cell::get) {
ExecHook::Submit => {
let handle = SubmitOrderHandle::new(SubmitOrderCommand::new(
make_market_order(),
None,
None,
None,
));
unsafe { (host.submit_order)(self.ctx, &raw const handle) }
}
ExecHook::Cancel => {
let handle = CancelOrderHandle::new(CancelOrderCommand::new(
ClientOrderId::from("O-1"),
None,
None,
));
unsafe { (host.cancel_order)(self.ctx, &raw const handle) }
}
ExecHook::Modify => {
let handle = ModifyOrderHandle::new(ModifyOrderCommand::new(
ClientOrderId::from("O-1"),
None,
None,
None,
None,
None,
));
unsafe { (host.modify_order)(self.ctx, &raw const handle) }
}
ExecHook::SubmitList => {
let handle = SubmitOrderListHandle::new(SubmitOrderListCommand::new(
vec![make_market_order()],
None,
None,
None,
));
unsafe { (host.submit_order_list)(self.ctx, &raw const handle) }
}
ExecHook::CancelOrders => {
let handle = CancelOrdersHandle::new(CancelOrdersCommand::new(
vec![ClientOrderId::from("O-1")],
None,
None,
));
unsafe { (host.cancel_orders)(self.ctx, &raw const handle) }
}
ExecHook::CancelAll => {
let handle = CancelAllOrdersHandle::new(CancelAllOrdersCommand::new(
InstrumentId::from("ETH-USDT.BINANCE"),
None,
None,
None,
));
unsafe { (host.cancel_all_orders)(self.ctx, &raw const handle) }
}
ExecHook::ClosePosition => {
let handle = ClosePositionHandle::new(ClosePositionCommand::new(
PositionId::from("P-001"),
None,
None,
None,
None,
None,
));
unsafe { (host.close_position)(self.ctx, &raw const handle) }
}
ExecHook::CloseAll => {
let handle = CloseAllPositionsHandle::new(CloseAllPositionsCommand::new(
InstrumentId::from("ETH-USDT.BINANCE"),
None,
None,
None,
None,
None,
None,
));
unsafe { (host.close_all_positions)(self.ctx, &raw const handle) }
}
ExecHook::QueryAccount => {
let handle = QueryAccountHandle::new(QueryAccountCommand::new(
AccountId::from("BINANCE-001"),
None,
None,
));
unsafe { (host.query_account)(self.ctx, &raw const handle) }
}
ExecHook::QueryOrder => {
let handle = QueryOrderHandle::new(QueryOrderCommand::new(
ClientOrderId::from("O-1"),
None,
None,
));
unsafe { (host.query_order)(self.ctx, &raw const handle) }
}
};
r.into_result()
.map_err(|e| anyhow::anyhow!(e.message_string()))
}
}
#[repr(transparent)]
struct HostContextPad {
_filler: u8,
}
static SENTINEL_CTX: HostContextPad = HostContextPad { _filler: 0 };
static CTX_A: HostContextPad = HostContextPad { _filler: 1 };
static CTX_B: HostContextPad = HostContextPad { _filler: 2 };
fn sentinel_ctx() -> *const HostContext {
std::ptr::from_ref(&SENTINEL_CTX).cast::<HostContext>()
}
#[rstest]
#[case::submit_order(ExecHook::Submit)]
#[case::cancel_order(ExecHook::Cancel)]
#[case::modify_order(ExecHook::Modify)]
#[case::submit_order_list(ExecHook::SubmitList)]
#[case::cancel_orders(ExecHook::CancelOrders)]
#[case::cancel_all_orders(ExecHook::CancelAll)]
#[case::close_position(ExecHook::ClosePosition)]
#[case::close_all_positions(ExecHook::CloseAll)]
#[case::query_account(ExecHook::QueryAccount)]
#[case::query_order(ExecHook::QueryOrder)]
fn strategy_callback_invokes_bound_execution_method_with_stored_context(#[case] hook: ExecHook) {
let _g = dispatch_lock();
reset_all();
set_target(hook);
let vt_ptr = strategy_vtable::<ExecStrategy>();
let vt = unsafe { &*vt_ptr };
let host = std::ptr::from_ref(&TEST_HOST);
let ctx = sentinel_ctx();
let handle = unsafe { generated_slot!(vt, create)(host, ctx, BorrowedStr::empty()) };
assert!(!handle.is_null(), "create returned null");
let r = unsafe { generated_slot!(vt, on_start)(handle) };
r.into_result().expect("strategy callback");
assert_only_hook(hook);
assert_ctx(hook, ctx);
unsafe {
generated_slot!(vt, drop_handle)(handle);
};
}
#[rstest]
fn distinct_strategy_instances_carry_distinct_contexts_to_the_host() {
let _g = dispatch_lock();
reset_all();
let vt_ptr = strategy_vtable::<ExecStrategy>();
let vt = unsafe { &*vt_ptr };
let ctx_a = std::ptr::from_ref(&CTX_A).cast::<HostContext>();
let ctx_b = std::ptr::from_ref(&CTX_B).cast::<HostContext>();
let host = std::ptr::from_ref(&TEST_HOST);
let h_a = unsafe { generated_slot!(vt, create)(host, ctx_a, BorrowedStr::empty()) };
let h_b = unsafe { generated_slot!(vt, create)(host, ctx_b, BorrowedStr::empty()) };
set_target(ExecHook::Submit);
let r = unsafe { generated_slot!(vt, on_start)(h_a) };
r.into_result().expect("on_start on instance A");
assert_ctx(ExecHook::Submit, ctx_a);
set_target(ExecHook::Modify);
let r = unsafe { generated_slot!(vt, on_start)(h_b) };
r.into_result().expect("on_start on instance B");
assert_ctx(ExecHook::Modify, ctx_b);
unsafe {
generated_slot!(vt, drop_handle)(h_a);
};
unsafe {
generated_slot!(vt, drop_handle)(h_b);
};
}