#![allow(unsafe_code)]
use std::{
cell::RefCell,
rc::Rc,
sync::{
Mutex, MutexGuard,
atomic::{AtomicU64, Ordering},
},
};
static COUNTER_LOCK: Mutex<()> = Mutex::new(());
fn lock_counters() -> MutexGuard<'static, ()> {
COUNTER_LOCK
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
use nautilus_common::{
actor::{DataActor, registry::register_actor},
cache::Cache,
clock::{Clock, TestClock},
component::Component,
messages::{
data::{DataCommand, SubscribeCommand, UnsubscribeCommand},
execution::TradingCommand,
},
msgbus::{
self, MessagingSwitchboard, ShareableMessageHandler, TypedIntoHandler,
switchboard::{get_bars_topic, get_quotes_topic, get_trades_topic},
},
signal::Signal,
timer::{TimeEvent, TimeEventCallback},
};
use nautilus_core::{Params, UUID4, UnixNanos};
use nautilus_live::plugin::{
CancelAllOrdersCommand, CancelAllOrdersHandle, CancelOrderCommand, CancelOrderHandle,
CancelOrdersCommand, CancelOrdersHandle, CloseAllPositionsCommand, CloseAllPositionsHandle,
ClosePositionCommand, ClosePositionHandle, HostContextInner, ModifyOrderCommand,
ModifyOrderHandle, PluginActorAdapter, PluginStrategyAdapter, QueryAccountCommand,
QueryAccountHandle, QueryOrderCommand, QueryOrderHandle, SubmitOrderCommand, SubmitOrderHandle,
SubmitOrderListCommand, SubmitOrderListHandle, host_vtable, register_custom_data_from_manifest,
};
use nautilus_model::{
accounts::{AccountAny, MarginAccount},
data::{
Bar, BarSpecification, BarType, CustomData, Data, FundingRateUpdate, IndexPriceUpdate,
InstrumentClose, InstrumentStatus, MarkPriceUpdate, OptionChainSlice, OptionGreeks,
OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick,
registry::deserialize_custom_from_json,
stubs::{stub_custom_data, stub_deltas},
},
enums::{
AccountType, AggregationSource, AggressorSide, BarAggregation, BookType, GreeksConvention,
InstrumentCloseType, LiquiditySide, MarketStatusAction, OmsType, OrderSide, OrderType,
PositionSide, PriceType, TimeInForce,
},
events::{
AccountState, OrderAccepted, OrderCancelRejected, OrderCanceled, OrderDenied,
OrderEmulated, OrderExpired, OrderFilled, OrderInitialized, OrderModifyRejected,
OrderPendingCancel, OrderPendingUpdate, OrderRejected, OrderReleased, OrderSubmitted,
OrderTriggered, OrderUpdated, PositionChanged, PositionClosed, PositionOpened,
},
identifiers::{
AccountId, ActorId, ClientId, ClientOrderId, InstrumentId, OptionSeriesId, PositionId,
StrategyId, TradeId, TraderId, Venue, VenueOrderId,
},
instruments::{Instrument, InstrumentAny, stubs::currency_pair_ethusdt},
orderbook::OrderBook,
orders::{Order, OrderAny},
position::Position,
types::{AccountBalance, Currency, Money, Price, Quantity},
};
use nautilus_plugin::{
NAUTILUS_PLUGIN_ABI_VERSION,
boundary::{BorrowedStr, Slice},
host::{HostContext, HostVTable},
manifest::{
CustomDataRegistration, PluginBuildId, PluginManifest, ValidatedActorVTable,
ValidatedPluginManifest, ValidatedStrategyVTable,
},
surfaces::{
actor::{PluginActor, actor_vtable},
custom_data::{PluginCustomData, PluginCustomDataRef, custom_data_vtable},
strategy::{PluginStrategy, strategy_vtable},
},
};
use nautilus_portfolio::portfolio::Portfolio;
use nautilus_trading::strategy::{Strategy, StrategyConfig};
use rstest::rstest;
static A_START: AtomicU64 = AtomicU64::new(0);
static A_STOP: AtomicU64 = AtomicU64::new(0);
static A_RESUME: AtomicU64 = AtomicU64::new(0);
static A_RESET: AtomicU64 = AtomicU64::new(0);
static A_DISPOSE: AtomicU64 = AtomicU64::new(0);
static A_DEGRADE: AtomicU64 = AtomicU64::new(0);
static A_FAULT: AtomicU64 = AtomicU64::new(0);
static A_TIME: AtomicU64 = AtomicU64::new(0);
static A_DATA: AtomicU64 = AtomicU64::new(0);
static A_DATA_VALUE: AtomicU64 = AtomicU64::new(0);
static A_INSTRUMENT: AtomicU64 = AtomicU64::new(0);
static A_BOOK_DELTAS: AtomicU64 = AtomicU64::new(0);
static A_BOOK: AtomicU64 = AtomicU64::new(0);
static A_HIST_BOOK_DELTAS: AtomicU64 = AtomicU64::new(0);
static A_HIST_QUOTES: AtomicU64 = AtomicU64::new(0);
static A_HIST_TRADES: AtomicU64 = AtomicU64::new(0);
static A_HIST_BARS: AtomicU64 = AtomicU64::new(0);
static A_HIST_MARKS: AtomicU64 = AtomicU64::new(0);
static A_HIST_INDEXES: AtomicU64 = AtomicU64::new(0);
static A_HIST_FUNDING: AtomicU64 = AtomicU64::new(0);
static A_QUOTE: AtomicU64 = AtomicU64::new(0);
static A_TRADE: AtomicU64 = AtomicU64::new(0);
static A_BAR: AtomicU64 = AtomicU64::new(0);
static A_MARK: AtomicU64 = AtomicU64::new(0);
static A_INDEX: AtomicU64 = AtomicU64::new(0);
static A_FUNDING: AtomicU64 = AtomicU64::new(0);
static A_OPTION_GREEKS: AtomicU64 = AtomicU64::new(0);
static A_OPTION_CHAIN: AtomicU64 = AtomicU64::new(0);
static A_INSTR_STATUS: AtomicU64 = AtomicU64::new(0);
static A_INSTR_CLOSE: AtomicU64 = AtomicU64::new(0);
static A_ORDER_FILLED: AtomicU64 = AtomicU64::new(0);
static A_ORDER_CANCELED: AtomicU64 = AtomicU64::new(0);
static A_SIGNAL: AtomicU64 = AtomicU64::new(0);
fn a_reset() {
for c in [
&A_START,
&A_STOP,
&A_RESUME,
&A_RESET,
&A_DISPOSE,
&A_DEGRADE,
&A_FAULT,
&A_TIME,
&A_DATA,
&A_DATA_VALUE,
&A_INSTRUMENT,
&A_BOOK_DELTAS,
&A_BOOK,
&A_HIST_BOOK_DELTAS,
&A_HIST_QUOTES,
&A_HIST_TRADES,
&A_HIST_BARS,
&A_HIST_MARKS,
&A_HIST_INDEXES,
&A_HIST_FUNDING,
&A_QUOTE,
&A_TRADE,
&A_BAR,
&A_MARK,
&A_INDEX,
&A_FUNDING,
&A_OPTION_GREEKS,
&A_OPTION_CHAIN,
&A_INSTR_STATUS,
&A_INSTR_CLOSE,
&A_ORDER_FILLED,
&A_ORDER_CANCELED,
&A_SIGNAL,
] {
c.store(0, Ordering::SeqCst);
}
}
struct CountingActor;
#[derive(Clone, PartialEq)]
struct DispatchCustomData {
value: u64,
ts_event: u64,
ts_init: u64,
}
impl PluginCustomData for DispatchCustomData {
const TYPE_NAME: &'static str = "DispatchCustomData";
fn ts_event(&self) -> u64 {
self.ts_event
}
fn ts_init(&self) -> u64 {
self.ts_init
}
fn to_json(&self) -> anyhow::Result<Vec<u8>> {
Ok(serde_json::json!({
"value": self.value,
"ts_event": self.ts_event,
"ts_init": self.ts_init,
})
.to_string()
.into_bytes())
}
fn from_json(payload: &[u8]) -> anyhow::Result<Self> {
let value: serde_json::Value = serde_json::from_slice(payload)?;
Ok(Self {
value: value
.get("value")
.and_then(serde_json::Value::as_u64)
.unwrap_or_default(),
ts_event: value
.get("ts_event")
.and_then(serde_json::Value::as_u64)
.unwrap_or_default(),
ts_init: value
.get("ts_init")
.and_then(serde_json::Value::as_u64)
.unwrap_or_default(),
})
}
fn schema_ipc() -> anyhow::Result<Vec<u8>> {
Ok(Vec::new())
}
fn encode_batch(_items: &[&Self]) -> anyhow::Result<Vec<u8>> {
Ok(Vec::new())
}
fn decode_batch(
_ipc_bytes: &[u8],
_metadata: &[(String, String)],
) -> anyhow::Result<Vec<Self>> {
Ok(Vec::new())
}
}
impl PluginActor for CountingActor {
const TYPE_NAME: &'static str = "CountingActor";
fn new(_host: *const HostVTable, _ctx: *const HostContext, _config_json: &str) -> Self {
Self
}
fn on_start(&mut self) -> anyhow::Result<()> {
A_START.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_stop(&mut self) -> anyhow::Result<()> {
A_STOP.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_resume(&mut self) -> anyhow::Result<()> {
A_RESUME.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_reset(&mut self) -> anyhow::Result<()> {
A_RESET.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_dispose(&mut self) -> anyhow::Result<()> {
A_DISPOSE.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_degrade(&mut self) -> anyhow::Result<()> {
A_DEGRADE.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_fault(&mut self) -> anyhow::Result<()> {
A_FAULT.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_time_event(&mut self, _event: &TimeEvent) -> anyhow::Result<()> {
A_TIME.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_data(&mut self, data: PluginCustomDataRef) -> anyhow::Result<()> {
let Some(value) = data.downcast_ref::<DispatchCustomData>() else {
anyhow::bail!("expected DispatchCustomData, was {}", data.type_name());
};
A_DATA_VALUE.store(value.value, Ordering::SeqCst);
A_DATA.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_instrument(&mut self, _: &InstrumentAny) -> anyhow::Result<()> {
A_INSTRUMENT.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_book_deltas(&mut self, _: &OrderBookDeltas) -> anyhow::Result<()> {
A_BOOK_DELTAS.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_book(&mut self, _: &OrderBook) -> anyhow::Result<()> {
A_BOOK.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_historical_book_deltas(&mut self, _: &[OrderBookDelta]) -> anyhow::Result<()> {
A_HIST_BOOK_DELTAS.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_historical_quotes(&mut self, _: &[QuoteTick]) -> anyhow::Result<()> {
A_HIST_QUOTES.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_historical_trades(&mut self, _: &[TradeTick]) -> anyhow::Result<()> {
A_HIST_TRADES.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_historical_bars(&mut self, _: &[Bar]) -> anyhow::Result<()> {
A_HIST_BARS.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_historical_mark_prices(&mut self, _: &[MarkPriceUpdate]) -> anyhow::Result<()> {
A_HIST_MARKS.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_historical_index_prices(&mut self, _: &[IndexPriceUpdate]) -> anyhow::Result<()> {
A_HIST_INDEXES.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_historical_funding_rates(&mut self, _: &[FundingRateUpdate]) -> anyhow::Result<()> {
A_HIST_FUNDING.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_quote(&mut self, _quote: &QuoteTick) -> anyhow::Result<()> {
A_QUOTE.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_trade(&mut self, _trade: &TradeTick) -> anyhow::Result<()> {
A_TRADE.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_bar(&mut self, _bar: &Bar) -> anyhow::Result<()> {
A_BAR.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_mark_price(&mut self, _: &MarkPriceUpdate) -> anyhow::Result<()> {
A_MARK.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_index_price(&mut self, _: &IndexPriceUpdate) -> anyhow::Result<()> {
A_INDEX.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_funding_rate(&mut self, _: &FundingRateUpdate) -> anyhow::Result<()> {
A_FUNDING.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_option_greeks(&mut self, _: &OptionGreeks) -> anyhow::Result<()> {
A_OPTION_GREEKS.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_option_chain(&mut self, _: &OptionChainSlice) -> anyhow::Result<()> {
A_OPTION_CHAIN.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_instrument_status(&mut self, _: &InstrumentStatus) -> anyhow::Result<()> {
A_INSTR_STATUS.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_instrument_close(&mut self, _: &InstrumentClose) -> anyhow::Result<()> {
A_INSTR_CLOSE.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_order_filled(&mut self, _: &OrderFilled) -> anyhow::Result<()> {
A_ORDER_FILLED.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_order_canceled(&mut self, _: &OrderCanceled) -> anyhow::Result<()> {
A_ORDER_CANCELED.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_signal(&mut self, _: &Signal) -> anyhow::Result<()> {
A_SIGNAL.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
static S_START: AtomicU64 = AtomicU64::new(0);
static S_STOP: AtomicU64 = AtomicU64::new(0);
static S_RESUME: AtomicU64 = AtomicU64::new(0);
static S_RESET: AtomicU64 = AtomicU64::new(0);
static S_DISPOSE: AtomicU64 = AtomicU64::new(0);
static S_DEGRADE: AtomicU64 = AtomicU64::new(0);
static S_FAULT: AtomicU64 = AtomicU64::new(0);
static S_TIME: AtomicU64 = AtomicU64::new(0);
static S_DATA: AtomicU64 = AtomicU64::new(0);
static S_DATA_VALUE: AtomicU64 = AtomicU64::new(0);
static S_INSTRUMENT: AtomicU64 = AtomicU64::new(0);
static S_BOOK_DELTAS: AtomicU64 = AtomicU64::new(0);
static S_BOOK: AtomicU64 = AtomicU64::new(0);
static S_HIST_BOOK_DELTAS: AtomicU64 = AtomicU64::new(0);
static S_HIST_QUOTES: AtomicU64 = AtomicU64::new(0);
static S_HIST_TRADES: AtomicU64 = AtomicU64::new(0);
static S_HIST_BARS: AtomicU64 = AtomicU64::new(0);
static S_HIST_MARKS: AtomicU64 = AtomicU64::new(0);
static S_HIST_INDEXES: AtomicU64 = AtomicU64::new(0);
static S_HIST_FUNDING: AtomicU64 = AtomicU64::new(0);
static S_QUOTE: AtomicU64 = AtomicU64::new(0);
static S_TRADE: AtomicU64 = AtomicU64::new(0);
static S_BAR: AtomicU64 = AtomicU64::new(0);
static S_MARK: AtomicU64 = AtomicU64::new(0);
static S_INDEX: AtomicU64 = AtomicU64::new(0);
static S_FUNDING: AtomicU64 = AtomicU64::new(0);
static S_OPTION_GREEKS: AtomicU64 = AtomicU64::new(0);
static S_OPTION_CHAIN: AtomicU64 = AtomicU64::new(0);
static S_INSTR_STATUS: AtomicU64 = AtomicU64::new(0);
static S_INSTR_CLOSE: AtomicU64 = AtomicU64::new(0);
static S_ORDER_FILLED: AtomicU64 = AtomicU64::new(0);
static S_ORDER_CANCELED: AtomicU64 = AtomicU64::new(0);
static S_SIGNAL: AtomicU64 = AtomicU64::new(0);
static S_ORDER_INITIALIZED: AtomicU64 = AtomicU64::new(0);
static S_ORDER_SUBMITTED: AtomicU64 = AtomicU64::new(0);
static S_ORDER_ACCEPTED: AtomicU64 = AtomicU64::new(0);
static S_ORDER_REJECTED: AtomicU64 = AtomicU64::new(0);
static S_ORDER_EXPIRED: AtomicU64 = AtomicU64::new(0);
static S_ORDER_TRIGGERED: AtomicU64 = AtomicU64::new(0);
static S_ORDER_DENIED: AtomicU64 = AtomicU64::new(0);
static S_ORDER_EMULATED: AtomicU64 = AtomicU64::new(0);
static S_ORDER_RELEASED: AtomicU64 = AtomicU64::new(0);
static S_ORDER_PENDING_UPDATE: AtomicU64 = AtomicU64::new(0);
static S_ORDER_PENDING_CANCEL: AtomicU64 = AtomicU64::new(0);
static S_ORDER_MODIFY_REJECTED: AtomicU64 = AtomicU64::new(0);
static S_ORDER_CANCEL_REJECTED: AtomicU64 = AtomicU64::new(0);
static S_ORDER_UPDATED: AtomicU64 = AtomicU64::new(0);
static S_POSITION_OPENED: AtomicU64 = AtomicU64::new(0);
static S_POSITION_CHANGED: AtomicU64 = AtomicU64::new(0);
static S_POSITION_CLOSED: AtomicU64 = AtomicU64::new(0);
fn s_reset() {
for c in [
&S_START,
&S_STOP,
&S_RESUME,
&S_RESET,
&S_DISPOSE,
&S_DEGRADE,
&S_FAULT,
&S_TIME,
&S_DATA,
&S_DATA_VALUE,
&S_INSTRUMENT,
&S_BOOK_DELTAS,
&S_BOOK,
&S_HIST_BOOK_DELTAS,
&S_HIST_QUOTES,
&S_HIST_TRADES,
&S_HIST_BARS,
&S_HIST_MARKS,
&S_HIST_INDEXES,
&S_HIST_FUNDING,
&S_QUOTE,
&S_TRADE,
&S_BAR,
&S_MARK,
&S_INDEX,
&S_FUNDING,
&S_OPTION_GREEKS,
&S_OPTION_CHAIN,
&S_INSTR_STATUS,
&S_INSTR_CLOSE,
&S_ORDER_FILLED,
&S_ORDER_CANCELED,
&S_SIGNAL,
&S_ORDER_INITIALIZED,
&S_ORDER_SUBMITTED,
&S_ORDER_ACCEPTED,
&S_ORDER_REJECTED,
&S_ORDER_EXPIRED,
&S_ORDER_TRIGGERED,
&S_ORDER_DENIED,
&S_ORDER_EMULATED,
&S_ORDER_RELEASED,
&S_ORDER_PENDING_UPDATE,
&S_ORDER_PENDING_CANCEL,
&S_ORDER_MODIFY_REJECTED,
&S_ORDER_CANCEL_REJECTED,
&S_ORDER_UPDATED,
&S_POSITION_OPENED,
&S_POSITION_CHANGED,
&S_POSITION_CLOSED,
] {
c.store(0, Ordering::SeqCst);
}
}
struct CountingStrategy;
unsafe impl Send for CountingStrategy {}
impl PluginStrategy for CountingStrategy {
const TYPE_NAME: &'static str = "CountingStrategy";
fn new(_host: *const HostVTable, _ctx: *const HostContext, _config_json: &str) -> Self {
Self
}
fn on_start(&mut self) -> anyhow::Result<()> {
S_START.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_stop(&mut self) -> anyhow::Result<()> {
S_STOP.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_resume(&mut self) -> anyhow::Result<()> {
S_RESUME.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_reset(&mut self) -> anyhow::Result<()> {
S_RESET.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_dispose(&mut self) -> anyhow::Result<()> {
S_DISPOSE.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_degrade(&mut self) -> anyhow::Result<()> {
S_DEGRADE.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_fault(&mut self) -> anyhow::Result<()> {
S_FAULT.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_time_event(&mut self, _: &TimeEvent) -> anyhow::Result<()> {
S_TIME.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_data(&mut self, data: PluginCustomDataRef) -> anyhow::Result<()> {
let Some(value) = data.downcast_ref::<DispatchCustomData>() else {
anyhow::bail!("expected DispatchCustomData, was {}", data.type_name());
};
S_DATA_VALUE.store(value.value, Ordering::SeqCst);
S_DATA.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_instrument(&mut self, _: &InstrumentAny) -> anyhow::Result<()> {
S_INSTRUMENT.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_book_deltas(&mut self, _: &OrderBookDeltas) -> anyhow::Result<()> {
S_BOOK_DELTAS.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_book(&mut self, _: &OrderBook) -> anyhow::Result<()> {
S_BOOK.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_historical_book_deltas(&mut self, _: &[OrderBookDelta]) -> anyhow::Result<()> {
S_HIST_BOOK_DELTAS.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_historical_quotes(&mut self, _: &[QuoteTick]) -> anyhow::Result<()> {
S_HIST_QUOTES.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_historical_trades(&mut self, _: &[TradeTick]) -> anyhow::Result<()> {
S_HIST_TRADES.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_historical_bars(&mut self, _: &[Bar]) -> anyhow::Result<()> {
S_HIST_BARS.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_historical_mark_prices(&mut self, _: &[MarkPriceUpdate]) -> anyhow::Result<()> {
S_HIST_MARKS.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_historical_index_prices(&mut self, _: &[IndexPriceUpdate]) -> anyhow::Result<()> {
S_HIST_INDEXES.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_historical_funding_rates(&mut self, _: &[FundingRateUpdate]) -> anyhow::Result<()> {
S_HIST_FUNDING.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_quote(&mut self, _: &QuoteTick) -> anyhow::Result<()> {
S_QUOTE.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_trade(&mut self, _: &TradeTick) -> anyhow::Result<()> {
S_TRADE.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_bar(&mut self, _: &Bar) -> anyhow::Result<()> {
S_BAR.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_mark_price(&mut self, _: &MarkPriceUpdate) -> anyhow::Result<()> {
S_MARK.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_index_price(&mut self, _: &IndexPriceUpdate) -> anyhow::Result<()> {
S_INDEX.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_funding_rate(&mut self, _: &FundingRateUpdate) -> anyhow::Result<()> {
S_FUNDING.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_option_greeks(&mut self, _: &OptionGreeks) -> anyhow::Result<()> {
S_OPTION_GREEKS.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_option_chain(&mut self, _: &OptionChainSlice) -> anyhow::Result<()> {
S_OPTION_CHAIN.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_instrument_status(&mut self, _: &InstrumentStatus) -> anyhow::Result<()> {
S_INSTR_STATUS.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_instrument_close(&mut self, _: &InstrumentClose) -> anyhow::Result<()> {
S_INSTR_CLOSE.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_order_filled(&mut self, _: &OrderFilled) -> anyhow::Result<()> {
S_ORDER_FILLED.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_order_canceled(&mut self, _: &OrderCanceled) -> anyhow::Result<()> {
S_ORDER_CANCELED.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_signal(&mut self, _: &Signal) -> anyhow::Result<()> {
S_SIGNAL.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_order_initialized(&mut self, _: &OrderInitialized) -> anyhow::Result<()> {
S_ORDER_INITIALIZED.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_order_submitted(&mut self, _: &OrderSubmitted) -> anyhow::Result<()> {
S_ORDER_SUBMITTED.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_order_accepted(&mut self, _: &OrderAccepted) -> anyhow::Result<()> {
S_ORDER_ACCEPTED.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_order_rejected(&mut self, _: &OrderRejected) -> anyhow::Result<()> {
S_ORDER_REJECTED.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_order_expired(&mut self, _: &OrderExpired) -> anyhow::Result<()> {
S_ORDER_EXPIRED.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_order_triggered(&mut self, _: &OrderTriggered) -> anyhow::Result<()> {
S_ORDER_TRIGGERED.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_order_denied(&mut self, _: &OrderDenied) -> anyhow::Result<()> {
S_ORDER_DENIED.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_order_emulated(&mut self, _: &OrderEmulated) -> anyhow::Result<()> {
S_ORDER_EMULATED.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_order_released(&mut self, _: &OrderReleased) -> anyhow::Result<()> {
S_ORDER_RELEASED.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_order_pending_update(&mut self, _: &OrderPendingUpdate) -> anyhow::Result<()> {
S_ORDER_PENDING_UPDATE.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_order_pending_cancel(&mut self, _: &OrderPendingCancel) -> anyhow::Result<()> {
S_ORDER_PENDING_CANCEL.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_order_modify_rejected(&mut self, _: &OrderModifyRejected) -> anyhow::Result<()> {
S_ORDER_MODIFY_REJECTED.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_order_cancel_rejected(&mut self, _: &OrderCancelRejected) -> anyhow::Result<()> {
S_ORDER_CANCEL_REJECTED.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_order_updated(&mut self, _: &OrderUpdated) -> anyhow::Result<()> {
S_ORDER_UPDATED.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_position_opened(&mut self, _: &PositionOpened) -> anyhow::Result<()> {
S_POSITION_OPENED.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_position_changed(&mut self, _: &PositionChanged) -> anyhow::Result<()> {
S_POSITION_CHANGED.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn on_position_closed(&mut self, _: &PositionClosed) -> anyhow::Result<()> {
S_POSITION_CLOSED.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
fn instrument_id() -> InstrumentId {
InstrumentId::from("BTCUSDT.BINANCE")
}
fn make_quote() -> QuoteTick {
QuoteTick::new(
instrument_id(),
Price::from("1500.00"),
Price::from("1500.05"),
Quantity::from("1.0"),
Quantity::from("1.0"),
UnixNanos::from(1u64),
UnixNanos::from(1u64),
)
}
fn make_order_book() -> OrderBook {
OrderBook::new(instrument_id(), BookType::L2_MBP)
}
fn make_trade() -> TradeTick {
TradeTick::new(
instrument_id(),
Price::from("1500.00"),
Quantity::from("1.0"),
AggressorSide::Buyer,
TradeId::from("T-001"),
UnixNanos::from(1u64),
UnixNanos::from(1u64),
)
}
fn make_bar() -> Bar {
let spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
let bar_type = BarType::Standard {
instrument_id: instrument_id(),
spec,
aggregation_source: AggregationSource::External,
};
Bar::new(
bar_type,
Price::from("1.0"),
Price::from("2.0"),
Price::from("0.5"),
Price::from("1.5"),
Quantity::from("1.0"),
UnixNanos::from(1u64),
UnixNanos::from(1u64),
)
}
fn make_mark_price() -> MarkPriceUpdate {
MarkPriceUpdate::new(
instrument_id(),
Price::from("1500.0"),
UnixNanos::from(1u64),
UnixNanos::from(1u64),
)
}
fn make_index_price() -> IndexPriceUpdate {
IndexPriceUpdate::new(
instrument_id(),
Price::from("1500.0"),
UnixNanos::from(1u64),
UnixNanos::from(1u64),
)
}
fn make_funding_rate() -> FundingRateUpdate {
FundingRateUpdate::new(
instrument_id(),
"0.0001".parse().unwrap(),
None,
None,
UnixNanos::from(1u64),
UnixNanos::from(1u64),
)
}
fn make_option_greeks() -> OptionGreeks {
OptionGreeks {
instrument_id: instrument_id(),
convention: GreeksConvention::BlackScholes,
greeks: Default::default(),
mark_iv: Some(0.25),
bid_iv: Some(0.24),
ask_iv: Some(0.26),
underlying_price: Some(1500.0),
open_interest: Some(1000.0),
ts_event: UnixNanos::from(1u64),
ts_init: UnixNanos::from(1u64),
}
}
fn make_instrument_status() -> InstrumentStatus {
InstrumentStatus::new(
instrument_id(),
MarketStatusAction::Trading,
UnixNanos::from(1u64),
UnixNanos::from(1u64),
None,
None,
None,
None,
None,
)
}
fn make_instrument_close() -> InstrumentClose {
InstrumentClose::new(
instrument_id(),
Price::from("1500.0"),
InstrumentCloseType::EndOfSession,
UnixNanos::from(1u64),
UnixNanos::from(1u64),
)
}
fn make_instrument() -> InstrumentAny {
InstrumentAny::CurrencyPair(currency_pair_ethusdt())
}
fn make_option_chain() -> OptionChainSlice {
OptionChainSlice::new(OptionSeriesId::new(
Venue::new("DERIBIT"),
ustr::Ustr::from("BTC"),
ustr::Ustr::from("BTC"),
UnixNanos::from(1_700_000_000_000_000_000u64),
))
}
fn make_time_event() -> TimeEvent {
TimeEvent::new(
ustr::Ustr::from("test-time"),
UUID4::new(),
UnixNanos::from(1u64),
UnixNanos::from(1u64),
)
}
fn make_signal() -> Signal {
Signal::new(
ustr::Ustr::from("TestSignal"),
"1.0".to_string(),
UnixNanos::from(1u64),
UnixNanos::from(1u64),
)
}
fn make_order_filled() -> OrderFilled {
OrderFilled {
trader_id: TraderId::from("TRADER-001"),
strategy_id: StrategyId::from("S-001"),
instrument_id: instrument_id(),
client_order_id: ClientOrderId::from("O-001"),
venue_order_id: VenueOrderId::from("V-001"),
account_id: AccountId::from("BINANCE-001"),
trade_id: TradeId::from("T-001"),
position_id: None,
order_side: OrderSide::Buy,
order_type: OrderType::Market,
last_qty: Quantity::from("1.0"),
last_px: Price::from("1500.0"),
currency: Currency::USDT(),
commission: None,
liquidity_side: LiquiditySide::Taker,
event_id: UUID4::new(),
ts_event: UnixNanos::from(1u64),
ts_init: UnixNanos::from(1u64),
reconciliation: false,
causation_id: None,
}
}
fn make_order_canceled() -> OrderCanceled {
OrderCanceled {
trader_id: TraderId::from("TRADER-001"),
strategy_id: StrategyId::from("S-001"),
instrument_id: instrument_id(),
client_order_id: ClientOrderId::from("O-001"),
venue_order_id: Some(VenueOrderId::from("V-001")),
account_id: Some(AccountId::from("BINANCE-001")),
event_id: UUID4::new(),
ts_event: UnixNanos::from(1u64),
ts_init: UnixNanos::from(1u64),
reconciliation: false,
causation_id: None,
}
}
fn make_position_opened() -> PositionOpened {
PositionOpened {
trader_id: TraderId::from("TRADER-001"),
strategy_id: StrategyId::from("S-001"),
instrument_id: instrument_id(),
position_id: PositionId::from("P-19700101-0000-000-000-1"),
account_id: AccountId::from("BINANCE-001"),
opening_order_id: ClientOrderId::from("O-001"),
entry: OrderSide::Buy,
side: PositionSide::Long,
signed_qty: 1.0,
quantity: Quantity::from("1.0"),
last_qty: Quantity::from("1.0"),
last_px: Price::from("1500.0"),
currency: Currency::USDT(),
avg_px_open: 1500.0,
event_id: UUID4::new(),
ts_event: UnixNanos::from(1u64),
ts_init: UnixNanos::from(1u64),
}
}
fn make_position_changed() -> PositionChanged {
PositionChanged {
trader_id: TraderId::from("TRADER-001"),
strategy_id: StrategyId::from("S-001"),
instrument_id: instrument_id(),
position_id: PositionId::from("P-1"),
account_id: AccountId::from("BINANCE-001"),
opening_order_id: ClientOrderId::from("O-001"),
entry: OrderSide::Buy,
side: PositionSide::Long,
signed_qty: 2.0,
quantity: Quantity::from("2.0"),
peak_quantity: Quantity::from("2.0"),
last_qty: Quantity::from("1.0"),
last_px: Price::from("1500.0"),
currency: Currency::USDT(),
avg_px_open: 1500.0,
avg_px_close: None,
realized_return: 0.0,
realized_pnl: None,
unrealized_pnl: Money::new(0.0, Currency::USDT()),
event_id: UUID4::new(),
ts_opened: UnixNanos::from(1u64),
ts_event: UnixNanos::from(2u64),
ts_init: UnixNanos::from(2u64),
}
}
fn make_position_closed() -> PositionClosed {
PositionClosed {
trader_id: TraderId::from("TRADER-001"),
strategy_id: StrategyId::from("S-001"),
instrument_id: instrument_id(),
position_id: PositionId::from("P-1"),
account_id: AccountId::from("BINANCE-001"),
opening_order_id: ClientOrderId::from("O-001"),
closing_order_id: Some(ClientOrderId::from("O-002")),
entry: OrderSide::Buy,
side: PositionSide::Flat,
signed_qty: 0.0,
quantity: Quantity::from("0.0"),
peak_quantity: Quantity::from("2.0"),
last_qty: Quantity::from("2.0"),
last_px: Price::from("1500.0"),
currency: Currency::USDT(),
avg_px_open: 1500.0,
avg_px_close: Some(1500.0),
realized_return: 0.0,
realized_pnl: Some(Money::new(0.0, Currency::USDT())),
unrealized_pnl: Money::new(0.0, Currency::USDT()),
duration: 0,
event_id: UUID4::new(),
ts_opened: UnixNanos::from(1u64),
ts_closed: Some(UnixNanos::from(2u64)),
ts_event: UnixNanos::from(2u64),
ts_init: UnixNanos::from(2u64),
}
}
fn build_actor_adapter(actor_id: &str) -> PluginActorAdapter {
let vtable =
unsafe { ValidatedActorVTable::from_raw_unchecked(actor_vtable::<CountingActor>()) };
unsafe {
PluginActorAdapter::new(
ActorId::from(actor_id),
"in-process",
CountingActor::TYPE_NAME,
vtable,
host_vtable(),
"{}",
)
}
.expect("actor adapter construction")
}
fn build_strategy_adapter(strategy_id: &str) -> PluginStrategyAdapter {
let config = StrategyConfig::builder()
.strategy_id(StrategyId::from(strategy_id))
.order_id_tag("001".to_string())
.build();
let vtable = unsafe {
ValidatedStrategyVTable::from_raw_unchecked(strategy_vtable::<CountingStrategy>())
};
unsafe {
PluginStrategyAdapter::new(
config,
"in-process",
CountingStrategy::TYPE_NAME,
vtable,
host_vtable(),
"{}",
)
}
.expect("strategy adapter construction")
}
#[rstest]
fn actor_adapter_lifecycle_hooks_dispatch_to_plugin() {
let _lock = lock_counters();
a_reset();
let mut a = build_actor_adapter("CountingActor-Life");
DataActor::on_start(&mut a).unwrap();
DataActor::on_stop(&mut a).unwrap();
DataActor::on_resume(&mut a).unwrap();
DataActor::on_reset(&mut a).unwrap();
DataActor::on_dispose(&mut a).unwrap();
DataActor::on_degrade(&mut a).unwrap();
DataActor::on_fault(&mut a).unwrap();
assert_eq!(A_START.load(Ordering::SeqCst), 1);
assert_eq!(A_STOP.load(Ordering::SeqCst), 1);
assert_eq!(A_RESUME.load(Ordering::SeqCst), 1);
assert_eq!(A_RESET.load(Ordering::SeqCst), 1);
assert_eq!(A_DISPOSE.load(Ordering::SeqCst), 1);
assert_eq!(A_DEGRADE.load(Ordering::SeqCst), 1);
assert_eq!(A_FAULT.load(Ordering::SeqCst), 1);
}
#[rstest]
fn actor_adapter_market_data_hooks_dispatch_to_plugin() {
let _lock = lock_counters();
a_reset();
let mut a = build_actor_adapter("CountingActor-Data");
DataActor::on_instrument(&mut a, &make_instrument()).unwrap();
DataActor::on_book_deltas(&mut a, &stub_deltas()).unwrap();
DataActor::on_book(&mut a, &make_order_book()).unwrap();
DataActor::on_quote(&mut a, &make_quote()).unwrap();
DataActor::on_trade(&mut a, &make_trade()).unwrap();
DataActor::on_bar(&mut a, &make_bar()).unwrap();
DataActor::on_mark_price(&mut a, &make_mark_price()).unwrap();
DataActor::on_index_price(&mut a, &make_index_price()).unwrap();
DataActor::on_funding_rate(&mut a, &make_funding_rate()).unwrap();
DataActor::on_option_greeks(&mut a, &make_option_greeks()).unwrap();
DataActor::on_option_chain(&mut a, &make_option_chain()).unwrap();
DataActor::on_instrument_status(&mut a, &make_instrument_status()).unwrap();
DataActor::on_instrument_close(&mut a, &make_instrument_close()).unwrap();
DataActor::on_time_event(&mut a, &make_time_event()).unwrap();
DataActor::on_signal(&mut a, &make_signal()).unwrap();
assert_eq!(A_INSTRUMENT.load(Ordering::SeqCst), 1);
assert_eq!(A_BOOK_DELTAS.load(Ordering::SeqCst), 1);
assert_eq!(A_BOOK.load(Ordering::SeqCst), 1);
assert_eq!(A_QUOTE.load(Ordering::SeqCst), 1);
assert_eq!(A_TRADE.load(Ordering::SeqCst), 1);
assert_eq!(A_BAR.load(Ordering::SeqCst), 1);
assert_eq!(A_MARK.load(Ordering::SeqCst), 1);
assert_eq!(A_INDEX.load(Ordering::SeqCst), 1);
assert_eq!(A_FUNDING.load(Ordering::SeqCst), 1);
assert_eq!(A_OPTION_GREEKS.load(Ordering::SeqCst), 1);
assert_eq!(A_OPTION_CHAIN.load(Ordering::SeqCst), 1);
assert_eq!(A_INSTR_STATUS.load(Ordering::SeqCst), 1);
assert_eq!(A_INSTR_CLOSE.load(Ordering::SeqCst), 1);
assert_eq!(A_TIME.load(Ordering::SeqCst), 1);
assert_eq!(A_SIGNAL.load(Ordering::SeqCst), 1);
}
#[rstest]
fn actor_adapter_historical_slice_hooks_dispatch_to_plugin() {
let _lock = lock_counters();
a_reset();
let mut a = build_actor_adapter("CountingActor-HistSlices");
let deltas = stub_deltas();
let quotes = [make_quote()];
let trades = [make_trade()];
let bars = [make_bar()];
let mark_prices = [make_mark_price()];
let index_prices = [make_index_price()];
let funding_rates = [make_funding_rate()];
DataActor::on_historical_book_deltas(&mut a, &deltas.deltas).unwrap();
DataActor::on_historical_quotes(&mut a, "es).unwrap();
DataActor::on_historical_trades(&mut a, &trades).unwrap();
DataActor::on_historical_bars(&mut a, &bars).unwrap();
DataActor::on_historical_mark_prices(&mut a, &mark_prices).unwrap();
DataActor::on_historical_index_prices(&mut a, &index_prices).unwrap();
DataActor::on_historical_funding_rates(&mut a, &funding_rates).unwrap();
assert_eq!(A_HIST_BOOK_DELTAS.load(Ordering::SeqCst), 1);
assert_eq!(A_HIST_QUOTES.load(Ordering::SeqCst), 1);
assert_eq!(A_HIST_TRADES.load(Ordering::SeqCst), 1);
assert_eq!(A_HIST_BARS.load(Ordering::SeqCst), 1);
assert_eq!(A_HIST_MARKS.load(Ordering::SeqCst), 1);
assert_eq!(A_HIST_INDEXES.load(Ordering::SeqCst), 1);
assert_eq!(A_HIST_FUNDING.load(Ordering::SeqCst), 1);
}
#[rstest]
fn actor_adapter_custom_data_dispatches_to_plugin_on_data() {
let _lock = lock_counters();
a_reset();
let mut a = build_actor_adapter("CountingActor-CustomData");
let custom = plugin_dispatch_custom_data(42);
DataActor::on_data(&mut a, &custom).unwrap();
assert_eq!(A_DATA.load(Ordering::SeqCst), 1);
assert_eq!(A_DATA_VALUE.load(Ordering::SeqCst), 42);
}
#[rstest]
fn actor_adapter_historical_custom_data_dispatches_to_plugin_on_data() {
let _lock = lock_counters();
a_reset();
let mut a = build_actor_adapter("CountingActor-HistoricalCustomData");
let custom = plugin_dispatch_custom_data(44);
DataActor::on_historical_data(&mut a, &custom).unwrap();
assert_eq!(A_DATA.load(Ordering::SeqCst), 1);
assert_eq!(A_DATA_VALUE.load(Ordering::SeqCst), 44);
}
#[rstest]
fn actor_adapter_non_plugin_custom_data_is_ignored() {
let _lock = lock_counters();
a_reset();
let mut a = build_actor_adapter("CountingActor-ForeignCustomData");
let custom = stub_custom_data(1, 42, None, None);
DataActor::on_data(&mut a, &custom).unwrap();
assert_eq!(A_DATA.load(Ordering::SeqCst), 0);
assert_eq!(A_DATA_VALUE.load(Ordering::SeqCst), 0);
}
#[rstest]
fn actor_adapter_historical_non_plugin_custom_data_is_ignored() {
let _lock = lock_counters();
a_reset();
let mut a = build_actor_adapter("CountingActor-HistoricalForeignCustomData");
let custom = stub_custom_data(1, 42, None, None);
DataActor::on_historical_data(&mut a, &custom).unwrap();
assert_eq!(A_DATA.load(Ordering::SeqCst), 0);
assert_eq!(A_DATA_VALUE.load(Ordering::SeqCst), 0);
}
#[rstest]
fn actor_adapter_historical_non_custom_data_is_ignored() {
let _lock = lock_counters();
a_reset();
let mut a = build_actor_adapter("CountingActor-HistoricalNonCustomData");
let payload = 42_u64;
DataActor::on_historical_data(&mut a, &payload).unwrap();
assert_eq!(A_DATA.load(Ordering::SeqCst), 0);
assert_eq!(A_DATA_VALUE.load(Ordering::SeqCst), 0);
}
#[rstest]
fn actor_adapter_order_event_hooks_dispatch_to_plugin() {
let _lock = lock_counters();
a_reset();
let mut a = build_actor_adapter("CountingActor-Orders");
DataActor::on_order_filled(&mut a, &make_order_filled()).unwrap();
DataActor::on_order_canceled(&mut a, &make_order_canceled()).unwrap();
assert_eq!(A_ORDER_FILLED.load(Ordering::SeqCst), 1);
assert_eq!(A_ORDER_CANCELED.load(Ordering::SeqCst), 1);
}
#[rstest]
fn strategy_adapter_lifecycle_hooks_dispatch_to_plugin() {
let _lock = lock_counters();
s_reset();
let mut s = build_strategy_adapter("CountingStrategy-Life");
register_strategy_adapter(&mut s);
DataActor::on_start(&mut s).unwrap();
DataActor::on_stop(&mut s).unwrap();
DataActor::on_resume(&mut s).unwrap();
DataActor::on_reset(&mut s).unwrap();
DataActor::on_dispose(&mut s).unwrap();
DataActor::on_degrade(&mut s).unwrap();
DataActor::on_fault(&mut s).unwrap();
assert_eq!(S_START.load(Ordering::SeqCst), 1);
assert_eq!(S_STOP.load(Ordering::SeqCst), 1);
assert_eq!(S_RESUME.load(Ordering::SeqCst), 1);
assert_eq!(S_RESET.load(Ordering::SeqCst), 1);
assert_eq!(S_DISPOSE.load(Ordering::SeqCst), 1);
assert_eq!(S_DEGRADE.load(Ordering::SeqCst), 1);
assert_eq!(S_FAULT.load(Ordering::SeqCst), 1);
}
#[rstest]
fn strategy_adapter_actor_callbacks_dispatch_to_plugin() {
let _lock = lock_counters();
s_reset();
let mut s = build_strategy_adapter("CountingStrategy-Data");
DataActor::on_instrument(&mut s, &make_instrument()).unwrap();
DataActor::on_book_deltas(&mut s, &stub_deltas()).unwrap();
DataActor::on_book(&mut s, &make_order_book()).unwrap();
DataActor::on_quote(&mut s, &make_quote()).unwrap();
DataActor::on_trade(&mut s, &make_trade()).unwrap();
DataActor::on_bar(&mut s, &make_bar()).unwrap();
DataActor::on_mark_price(&mut s, &make_mark_price()).unwrap();
DataActor::on_index_price(&mut s, &make_index_price()).unwrap();
DataActor::on_funding_rate(&mut s, &make_funding_rate()).unwrap();
DataActor::on_option_greeks(&mut s, &make_option_greeks()).unwrap();
DataActor::on_option_chain(&mut s, &make_option_chain()).unwrap();
DataActor::on_instrument_status(&mut s, &make_instrument_status()).unwrap();
DataActor::on_instrument_close(&mut s, &make_instrument_close()).unwrap();
DataActor::on_order_filled(&mut s, &make_order_filled()).unwrap();
DataActor::on_order_canceled(&mut s, &make_order_canceled()).unwrap();
DataActor::on_time_event(&mut s, &make_time_event()).unwrap();
DataActor::on_signal(&mut s, &make_signal()).unwrap();
assert_eq!(S_INSTRUMENT.load(Ordering::SeqCst), 1);
assert_eq!(S_BOOK_DELTAS.load(Ordering::SeqCst), 1);
assert_eq!(S_BOOK.load(Ordering::SeqCst), 1);
assert_eq!(S_QUOTE.load(Ordering::SeqCst), 1);
assert_eq!(S_TRADE.load(Ordering::SeqCst), 1);
assert_eq!(S_BAR.load(Ordering::SeqCst), 1);
assert_eq!(S_MARK.load(Ordering::SeqCst), 1);
assert_eq!(S_INDEX.load(Ordering::SeqCst), 1);
assert_eq!(S_FUNDING.load(Ordering::SeqCst), 1);
assert_eq!(S_OPTION_GREEKS.load(Ordering::SeqCst), 1);
assert_eq!(S_OPTION_CHAIN.load(Ordering::SeqCst), 1);
assert_eq!(S_INSTR_STATUS.load(Ordering::SeqCst), 1);
assert_eq!(S_INSTR_CLOSE.load(Ordering::SeqCst), 1);
assert_eq!(S_ORDER_FILLED.load(Ordering::SeqCst), 1);
assert_eq!(S_ORDER_CANCELED.load(Ordering::SeqCst), 1);
assert_eq!(S_TIME.load(Ordering::SeqCst), 1);
assert_eq!(S_SIGNAL.load(Ordering::SeqCst), 1);
}
#[rstest]
fn strategy_adapter_historical_slice_hooks_dispatch_to_plugin() {
let _lock = lock_counters();
s_reset();
let mut s = build_strategy_adapter("CountingStrategy-HistSlices");
let deltas = stub_deltas();
let quotes = [make_quote()];
let trades = [make_trade()];
let bars = [make_bar()];
let mark_prices = [make_mark_price()];
let index_prices = [make_index_price()];
let funding_rates = [make_funding_rate()];
DataActor::on_historical_book_deltas(&mut s, &deltas.deltas).unwrap();
DataActor::on_historical_quotes(&mut s, "es).unwrap();
DataActor::on_historical_trades(&mut s, &trades).unwrap();
DataActor::on_historical_bars(&mut s, &bars).unwrap();
DataActor::on_historical_mark_prices(&mut s, &mark_prices).unwrap();
DataActor::on_historical_index_prices(&mut s, &index_prices).unwrap();
DataActor::on_historical_funding_rates(&mut s, &funding_rates).unwrap();
assert_eq!(S_HIST_BOOK_DELTAS.load(Ordering::SeqCst), 1);
assert_eq!(S_HIST_QUOTES.load(Ordering::SeqCst), 1);
assert_eq!(S_HIST_TRADES.load(Ordering::SeqCst), 1);
assert_eq!(S_HIST_BARS.load(Ordering::SeqCst), 1);
assert_eq!(S_HIST_MARKS.load(Ordering::SeqCst), 1);
assert_eq!(S_HIST_INDEXES.load(Ordering::SeqCst), 1);
assert_eq!(S_HIST_FUNDING.load(Ordering::SeqCst), 1);
}
#[rstest]
fn strategy_adapter_custom_data_dispatches_to_plugin_on_data() {
let _lock = lock_counters();
s_reset();
let mut s = build_strategy_adapter("CountingStrategy-CustomData");
let custom = plugin_dispatch_custom_data(43);
DataActor::on_data(&mut s, &custom).unwrap();
assert_eq!(S_DATA.load(Ordering::SeqCst), 1);
assert_eq!(S_DATA_VALUE.load(Ordering::SeqCst), 43);
}
#[rstest]
fn strategy_adapter_historical_custom_data_dispatches_to_plugin_on_data() {
let _lock = lock_counters();
s_reset();
let mut s = build_strategy_adapter("CountingStrategy-HistoricalCustomData");
let custom = plugin_dispatch_custom_data(45);
DataActor::on_historical_data(&mut s, &custom).unwrap();
assert_eq!(S_DATA.load(Ordering::SeqCst), 1);
assert_eq!(S_DATA_VALUE.load(Ordering::SeqCst), 45);
}
#[rstest]
fn strategy_adapter_non_plugin_custom_data_is_ignored() {
let _lock = lock_counters();
s_reset();
let mut s = build_strategy_adapter("CountingStrategy-ForeignCustomData");
let custom = stub_custom_data(1, 42, None, None);
DataActor::on_data(&mut s, &custom).unwrap();
assert_eq!(S_DATA.load(Ordering::SeqCst), 0);
assert_eq!(S_DATA_VALUE.load(Ordering::SeqCst), 0);
}
#[rstest]
fn strategy_adapter_historical_non_plugin_custom_data_is_ignored() {
let _lock = lock_counters();
s_reset();
let mut s = build_strategy_adapter("CountingStrategy-HistoricalForeignCustomData");
let custom = stub_custom_data(1, 42, None, None);
DataActor::on_historical_data(&mut s, &custom).unwrap();
assert_eq!(S_DATA.load(Ordering::SeqCst), 0);
assert_eq!(S_DATA_VALUE.load(Ordering::SeqCst), 0);
}
#[rstest]
fn strategy_adapter_historical_non_custom_data_is_ignored() {
let _lock = lock_counters();
s_reset();
let mut s = build_strategy_adapter("CountingStrategy-HistoricalNonCustomData");
let payload = 43_u64;
DataActor::on_historical_data(&mut s, &payload).unwrap();
assert_eq!(S_DATA.load(Ordering::SeqCst), 0);
assert_eq!(S_DATA_VALUE.load(Ordering::SeqCst), 0);
}
fn plugin_dispatch_custom_data(value: u64) -> CustomData {
let custom_data = Box::leak(Box::new([CustomDataRegistration {
type_name: BorrowedStr::from_str(DispatchCustomData::TYPE_NAME),
vtable: custom_data_vtable::<DispatchCustomData>(),
}]));
let manifest = PluginManifest {
abi_version: NAUTILUS_PLUGIN_ABI_VERSION,
plugin_name: BorrowedStr::from_str("plugin-dispatch-test"),
plugin_vendor: BorrowedStr::from_str("nautech"),
plugin_version: BorrowedStr::from_str("0.0.0"),
build_id: PluginBuildId::current(),
custom_data: Slice::from_slice(custom_data),
actors: Slice::empty(),
strategies: Slice::empty(),
controllers: Slice::empty(),
};
let manifest =
ValidatedPluginManifest::new(&manifest).expect("test manifest passes validation");
register_custom_data_from_manifest(manifest).expect("custom data registration succeeds");
let envelope = serde_json::json!({
"type": "Custom",
"data_type": {
"type_name": DispatchCustomData::TYPE_NAME,
},
"payload": {
"value": value,
"ts_event": 10,
"ts_init": 11,
},
});
let data = deserialize_custom_from_json(DispatchCustomData::TYPE_NAME, &envelope)
.expect("deserializer succeeds")
.expect("custom data type is registered");
let Data::Custom(custom) = data else {
panic!("expected Custom variant");
};
custom
}
#[rstest]
fn strategy_adapter_order_event_hooks_dispatch_to_plugin() {
let _lock = lock_counters();
s_reset();
let mut s = build_strategy_adapter("CountingStrategy-Orders");
Strategy::on_order_initialized(&mut s, OrderInitialized::default());
Strategy::on_order_submitted(&mut s, OrderSubmitted::default());
Strategy::on_order_accepted(&mut s, OrderAccepted::default());
Strategy::on_order_rejected(&mut s, OrderRejected::default());
Strategy::on_order_expired(&mut s, OrderExpired::default());
Strategy::on_order_triggered(&mut s, OrderTriggered::default());
Strategy::on_order_denied(&mut s, OrderDenied::default());
Strategy::on_order_emulated(&mut s, OrderEmulated::default());
Strategy::on_order_released(&mut s, OrderReleased::default());
Strategy::on_order_pending_update(&mut s, OrderPendingUpdate::default());
Strategy::on_order_pending_cancel(&mut s, OrderPendingCancel::default());
Strategy::on_order_modify_rejected(&mut s, OrderModifyRejected::default());
Strategy::on_order_cancel_rejected(&mut s, OrderCancelRejected::default());
Strategy::on_order_updated(&mut s, OrderUpdated::default());
assert_eq!(S_ORDER_INITIALIZED.load(Ordering::SeqCst), 1);
assert_eq!(S_ORDER_SUBMITTED.load(Ordering::SeqCst), 1);
assert_eq!(S_ORDER_ACCEPTED.load(Ordering::SeqCst), 1);
assert_eq!(S_ORDER_REJECTED.load(Ordering::SeqCst), 1);
assert_eq!(S_ORDER_EXPIRED.load(Ordering::SeqCst), 1);
assert_eq!(S_ORDER_TRIGGERED.load(Ordering::SeqCst), 1);
assert_eq!(S_ORDER_DENIED.load(Ordering::SeqCst), 1);
assert_eq!(S_ORDER_EMULATED.load(Ordering::SeqCst), 1);
assert_eq!(S_ORDER_RELEASED.load(Ordering::SeqCst), 1);
assert_eq!(S_ORDER_PENDING_UPDATE.load(Ordering::SeqCst), 1);
assert_eq!(S_ORDER_PENDING_CANCEL.load(Ordering::SeqCst), 1);
assert_eq!(S_ORDER_MODIFY_REJECTED.load(Ordering::SeqCst), 1);
assert_eq!(S_ORDER_CANCEL_REJECTED.load(Ordering::SeqCst), 1);
assert_eq!(S_ORDER_UPDATED.load(Ordering::SeqCst), 1);
}
#[rstest]
fn strategy_adapter_position_event_hooks_dispatch_to_plugin() {
let _lock = lock_counters();
s_reset();
let mut s = build_strategy_adapter("CountingStrategy-Positions");
Strategy::on_position_opened(&mut s, make_position_opened());
Strategy::on_position_changed(&mut s, make_position_changed());
Strategy::on_position_closed(&mut s, make_position_closed());
assert_eq!(S_POSITION_OPENED.load(Ordering::SeqCst), 1);
assert_eq!(S_POSITION_CHANGED.load(Ordering::SeqCst), 1);
assert_eq!(S_POSITION_CLOSED.load(Ordering::SeqCst), 1);
}
fn register_strategy_adapter(adapter: &mut PluginStrategyAdapter) {
let _ = register_strategy_adapter_with_cache(adapter);
}
fn register_strategy_adapter_with_cache(adapter: &mut PluginStrategyAdapter) -> Rc<RefCell<Cache>> {
let (_, cache) = register_strategy_adapter_with_clock_and_cache(adapter);
cache
}
fn register_strategy_adapter_with_clock_and_cache(
adapter: &mut PluginStrategyAdapter,
) -> (Rc<RefCell<TestClock>>, Rc<RefCell<Cache>>) {
let trader_id = TraderId::from("TRADER-001");
let clock = Rc::new(RefCell::new(TestClock::new()));
clock
.borrow_mut()
.register_default_handler(TimeEventCallback::from(|_event: TimeEvent| {}));
let cache = Rc::new(RefCell::new(Cache::default()));
let portfolio = Rc::new(RefCell::new(Portfolio::new(
cache.clone(),
clock.clone(),
None,
)));
adapter
.core_mut()
.register(trader_id, clock.clone(), cache.clone(), portfolio)
.expect("strategy register");
adapter.initialize().expect("strategy initialize");
(clock, cache)
}
#[rstest]
fn strategy_adapter_on_start_composes_strategy_default_before_forward() {
let _lock = lock_counters();
s_reset();
let mut s = build_strategy_adapter("CountingStrategy-Start");
register_strategy_adapter(&mut s);
DataActor::on_start(&mut s).expect("on_start succeeds end-to-end");
assert_eq!(S_START.load(Ordering::SeqCst), 1);
}
fn make_initialized_market_order(client_order_id: &str, strategy_id: &str) -> OrderAny {
use nautilus_model::{enums::TimeInForce, orders::MarketOrder};
OrderAny::Market(MarketOrder::new(
TraderId::from("TRADER-001"),
StrategyId::from(strategy_id),
instrument_id(),
ClientOrderId::from(client_order_id),
OrderSide::Buy,
Quantity::from("1.0"),
TimeInForce::Gtc,
UUID4::new(),
UnixNanos::default(),
false,
false,
None,
None,
None,
None,
None,
None,
None,
None,
))
}
static RISK_COMMAND_COUNT: AtomicU64 = AtomicU64::new(0);
#[rstest]
fn host_submit_order_routes_through_registered_strategy_adapter() {
let _lock = lock_counters();
let strategy_id = "PluginEnd2End-001";
let mut adapter = build_strategy_adapter(strategy_id);
register_strategy_adapter(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let _registered = register_actor(adapter);
RISK_COMMAND_COUNT.store(0, Ordering::SeqCst);
let captured: std::sync::Arc<std::sync::Mutex<Option<TradingCommand>>> =
std::sync::Arc::new(std::sync::Mutex::new(None));
let captured_clone = std::sync::Arc::clone(&captured);
let risk_handler = TypedIntoHandler::from_with_id(
"PluginRiskProbe.execute",
move |command: TradingCommand| {
assert!(matches!(command, TradingCommand::SubmitOrder(_)));
*captured_clone.lock().unwrap() = Some(command);
RISK_COMMAND_COUNT.fetch_add(1, Ordering::SeqCst);
},
);
msgbus::register_trading_command_endpoint(
MessagingSwitchboard::risk_engine_queue_execute(),
risk_handler,
);
let order = make_initialized_market_order("O-PLUGIN-E2E-1", strategy_id);
let order_client_order_id = order.client_order_id();
let expected_position_id = PositionId::from("P-PLUGIN-E2E-POS");
let handle = SubmitOrderHandle::new(SubmitOrderCommand::new(
order,
Some(expected_position_id),
None,
None,
));
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let r = unsafe { (v.submit_order)(ctx, &raw const handle) };
r.into_result().expect("host submit_order should succeed");
assert_eq!(RISK_COMMAND_COUNT.load(Ordering::SeqCst), 1);
let captured = captured.lock().unwrap().take().expect("command captured");
match captured {
TradingCommand::SubmitOrder(cmd) => {
assert_eq!(cmd.client_order_id, order_client_order_id);
assert_eq!(cmd.position_id, Some(expected_position_id));
}
other => panic!("expected SubmitOrder, was {other:?}"),
}
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_cancel_order_rejects_missing_order() {
let _lock = lock_counters();
let strategy_id = "PluginCancelOrder-001";
let mut adapter = build_strategy_adapter(strategy_id);
register_strategy_adapter(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let _registered = register_actor(adapter);
let handle = CancelOrderHandle::new(CancelOrderCommand::new(
ClientOrderId::from("O-PLUGIN-CANCEL-MISSING"),
None,
None,
));
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let err = unsafe { (v.cancel_order)(ctx, &raw const handle) }
.into_result()
.expect_err("missing order should bail in Strategy::cancel_order");
assert_eq!(err.code, nautilus_plugin::PluginErrorCode::Generic);
assert!(
err.message_string().contains("not found in cache"),
"expected Strategy-level cache miss, was: {}",
err.message_string()
);
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_modify_order_rejects_missing_order() {
let _lock = lock_counters();
let strategy_id = "PluginModifyOrder-001";
let mut adapter = build_strategy_adapter(strategy_id);
register_strategy_adapter(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let _registered = register_actor(adapter);
let handle = ModifyOrderHandle::new(ModifyOrderCommand::new(
ClientOrderId::from("O-PLUGIN-MODIFY-MISSING"),
Some(Quantity::from("2.0")),
None,
None,
None,
None,
));
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let err = unsafe { (v.modify_order)(ctx, &raw const handle) }
.into_result()
.expect_err("missing order should bail in Strategy::modify_order");
assert_eq!(err.code, nautilus_plugin::PluginErrorCode::Generic);
assert!(
err.message_string().contains("not found in cache"),
"expected Strategy-level cache miss, was: {}",
err.message_string()
);
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
#[case::submit_order("submit_order", |v: &HostVTable, ctx| unsafe {
(v.submit_order)(ctx, std::ptr::null())
})]
#[case::cancel_order("cancel_order", |v: &HostVTable, ctx| unsafe {
(v.cancel_order)(ctx, std::ptr::null())
})]
#[case::modify_order("modify_order", |v: &HostVTable, ctx| unsafe {
(v.modify_order)(ctx, std::ptr::null())
})]
#[case::submit_order_list("submit_order_list", |v: &HostVTable, ctx| unsafe {
(v.submit_order_list)(ctx, std::ptr::null())
})]
#[case::cancel_orders("cancel_orders", |v: &HostVTable, ctx| unsafe {
(v.cancel_orders)(ctx, std::ptr::null())
})]
#[case::cancel_all_orders("cancel_all_orders", |v: &HostVTable, ctx| unsafe {
(v.cancel_all_orders)(ctx, std::ptr::null())
})]
#[case::close_position("close_position", |v: &HostVTable, ctx| unsafe {
(v.close_position)(ctx, std::ptr::null())
})]
#[case::close_all_positions("close_all_positions", |v: &HostVTable, ctx| unsafe {
(v.close_all_positions)(ctx, std::ptr::null())
})]
#[case::query_account("query_account", |v: &HostVTable, ctx| unsafe {
(v.query_account)(ctx, std::ptr::null())
})]
#[case::query_order("query_order", |v: &HostVTable, ctx| unsafe {
(v.query_order)(ctx, std::ptr::null())
})]
fn host_execution_slot_rejects_null_handle(
#[case] slot: &'static str,
#[case] invoke: fn(&HostVTable, *const HostContext) -> nautilus_plugin::PluginResult<()>,
) {
let _lock = lock_counters();
let strategy_id = format!("PluginNullHandle-{slot}");
let mut adapter = build_strategy_adapter(&strategy_id);
register_strategy_adapter(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let _registered = register_actor(adapter);
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let err = invoke(v, ctx)
.into_result()
.expect_err("null command handle should be rejected");
assert_eq!(err.code, nautilus_plugin::PluginErrorCode::InvalidArgument);
assert!(
err.message_string().contains("null command handle"),
"{slot}: unexpected message: {}",
err.message_string()
);
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_cache_order_reads_registered_strategy_cache() {
let strategy_id = "PluginPhaseTwoCache-001";
let mut adapter = build_strategy_adapter(strategy_id);
let cache = register_strategy_adapter_with_cache(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let order = make_initialized_market_order("O-PHASE2-CACHE-1", strategy_id);
cache
.borrow_mut()
.add_order(order, None, None, false)
.expect("cache accepts order");
let _registered = register_actor(adapter);
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let order_id = BorrowedStr::from_str("O-PHASE2-CACHE-1");
let encoded = unsafe { (v.cache_order)(ctx, order_id) }
.into_result()
.expect("cache_order succeeds");
let decoded: OrderAny = serde_json::from_slice(unsafe { encoded.as_bytes() }).unwrap();
assert_eq!(
decoded.client_order_id(),
ClientOrderId::from("O-PHASE2-CACHE-1")
);
assert_eq!(decoded.strategy_id(), StrategyId::from(strategy_id));
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_cache_instrument_reads_registered_strategy_cache() {
let mut adapter = build_strategy_adapter("PluginPhaseTwoInstrument-001");
let cache = register_strategy_adapter_with_cache(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let instrument = InstrumentAny::CurrencyPair(currency_pair_ethusdt());
let instrument_id = instrument.id();
cache
.borrow_mut()
.add_instrument(instrument)
.expect("cache accepts instrument");
let _registered = register_actor(adapter);
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let encoded =
unsafe { (v.cache_instrument)(ctx, BorrowedStr::from_str(&instrument_id.to_string())) }
.into_result()
.expect("cache_instrument succeeds");
let decoded: InstrumentAny = serde_json::from_slice(unsafe { encoded.as_bytes() }).unwrap();
let missing = unsafe { (v.cache_instrument)(ctx, BorrowedStr::from_str("MISSING.BINANCE")) }
.into_result()
.expect("cache_instrument miss succeeds");
assert_eq!(decoded.id(), instrument_id);
assert!(unsafe { missing.as_bytes() }.is_empty());
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_cache_account_reads_registered_strategy_cache() {
let mut adapter = build_strategy_adapter("PluginPhaseTwoAccount-001");
let cache = register_strategy_adapter_with_cache(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let account_id = AccountId::from("BINANCE-001");
cache
.borrow_mut()
.add_account(make_margin_account(account_id))
.expect("cache accepts account");
let _registered = register_actor(adapter);
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let encoded = unsafe { (v.cache_account)(ctx, BorrowedStr::from_str("BINANCE-001")) }
.into_result()
.expect("cache_account succeeds");
let decoded: AccountAny = serde_json::from_slice(unsafe { encoded.as_bytes() }).unwrap();
let missing = unsafe { (v.cache_account)(ctx, BorrowedStr::from_str("BINANCE-999")) }
.into_result()
.expect("cache_account miss succeeds");
assert_eq!(decoded.id(), account_id);
assert!(unsafe { missing.as_bytes() }.is_empty());
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_cache_position_reads_registered_strategy_cache() {
let strategy_id = "PluginPhaseTwoPosition-001";
let mut adapter = build_strategy_adapter(strategy_id);
let cache = register_strategy_adapter_with_cache(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let position = make_position_for_strategy("P-19700101-0000-000-000-2", strategy_id);
let position_id = position.id;
cache
.borrow_mut()
.add_position(&position, OmsType::Hedging)
.expect("cache accepts position");
let _registered = register_actor(adapter);
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let encoded = unsafe { (v.cache_position)(ctx, BorrowedStr::from_str(position_id.as_ref())) }
.into_result()
.expect("cache_position succeeds");
let decoded: Position = serde_json::from_slice(unsafe { encoded.as_bytes() }).unwrap();
let missing =
unsafe { (v.cache_position)(ctx, BorrowedStr::from_str("P-19700101-0000-000-000-9")) }
.into_result()
.expect("cache_position miss succeeds");
assert_eq!(decoded.id, position_id);
assert_eq!(decoded.strategy_id, StrategyId::from(strategy_id));
assert!(unsafe { missing.as_bytes() }.is_empty());
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_cache_strategy_lists_default_to_calling_strategy() {
let strategy_id = "PluginPhaseTwoLists-001";
let other_strategy_id = "PluginPhaseTwoLists-Other";
let mut adapter = build_strategy_adapter(strategy_id);
let cache = register_strategy_adapter_with_cache(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let order = make_initialized_market_order("O-PHASE2-LIST-1", strategy_id);
let other_order = make_initialized_market_order("O-PHASE2-LIST-2", other_strategy_id);
let position = make_position_for_strategy("P-19700101-0000-000-000-3", strategy_id);
let other_position = make_position_for_strategy("P-19700101-0000-000-000-4", other_strategy_id);
cache
.borrow_mut()
.add_order(order, None, None, false)
.expect("cache accepts strategy order");
cache
.borrow_mut()
.add_order(other_order, None, None, false)
.expect("cache accepts other strategy order");
cache
.borrow_mut()
.add_position(&position, OmsType::Hedging)
.expect("cache accepts strategy position");
cache
.borrow_mut()
.add_position(&other_position, OmsType::Hedging)
.expect("cache accepts other strategy position");
let _registered = register_actor(adapter);
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let orders = unsafe { (v.cache_orders_for_strategy)(ctx, BorrowedStr::empty()) }
.into_result()
.expect("cache_orders_for_strategy succeeds");
let positions = unsafe { (v.cache_positions_for_strategy)(ctx, BorrowedStr::empty()) }
.into_result()
.expect("cache_positions_for_strategy succeeds");
let decoded_orders: Vec<OrderAny> =
serde_json::from_slice(unsafe { orders.as_bytes() }).unwrap();
let decoded_positions: Vec<Position> =
serde_json::from_slice(unsafe { positions.as_bytes() }).unwrap();
assert_eq!(decoded_orders.len(), 1);
assert_eq!(
decoded_orders[0].client_order_id(),
ClientOrderId::from("O-PHASE2-LIST-1")
);
assert_eq!(decoded_positions.len(), 1);
assert_eq!(decoded_positions[0].id, position.id);
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_subscribe_quotes_routes_msgbus_events_to_registered_strategy_adapter() {
let _lock = lock_counters();
s_reset();
let strategy_id = "PluginPhaseTwoSub-001";
let mut adapter = build_strategy_adapter(strategy_id);
register_strategy_adapter(&mut adapter);
Component::start(&mut adapter).expect("strategy starts");
let actor_id_ustr = adapter.actor_id().inner();
let _registered = register_actor(adapter);
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let instrument = instrument_id().to_string();
let p = host_vtable();
let v = unsafe { &*p };
let r = unsafe {
(v.subscribe_quotes)(
ctx,
BorrowedStr::from_str(&instrument),
BorrowedStr::empty(),
BorrowedStr::empty(),
)
};
r.into_result().expect("subscribe_quotes succeeds");
msgbus::publish_quote(get_quotes_topic(instrument_id()), &make_quote());
assert_eq!(S_QUOTE.load(Ordering::SeqCst), 1);
let r = unsafe {
(v.unsubscribe_quotes)(
ctx,
BorrowedStr::from_str(&instrument),
BorrowedStr::empty(),
BorrowedStr::empty(),
)
};
r.into_result().expect("unsubscribe_quotes succeeds");
msgbus::publish_quote(get_quotes_topic(instrument_id()), &make_quote());
assert_eq!(S_QUOTE.load(Ordering::SeqCst), 1);
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_subscribe_trades_routes_msgbus_events_to_registered_strategy_adapter() {
let _lock = lock_counters();
s_reset();
let strategy_id = "PluginPhaseTwoTrades-001";
let mut adapter = build_strategy_adapter(strategy_id);
register_strategy_adapter(&mut adapter);
Component::start(&mut adapter).expect("strategy starts");
let actor_id_ustr = adapter.actor_id().inner();
let _registered = register_actor(adapter);
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let instrument = instrument_id().to_string();
let p = host_vtable();
let v = unsafe { &*p };
let r = unsafe {
(v.subscribe_trades)(
ctx,
BorrowedStr::from_str(&instrument),
BorrowedStr::empty(),
BorrowedStr::empty(),
)
};
r.into_result().expect("subscribe_trades succeeds");
msgbus::publish_trade(get_trades_topic(instrument_id()), &make_trade());
assert_eq!(S_TRADE.load(Ordering::SeqCst), 1);
let r = unsafe {
(v.unsubscribe_trades)(
ctx,
BorrowedStr::from_str(&instrument),
BorrowedStr::empty(),
BorrowedStr::empty(),
)
};
r.into_result().expect("unsubscribe_trades succeeds");
msgbus::publish_trade(get_trades_topic(instrument_id()), &make_trade());
assert_eq!(S_TRADE.load(Ordering::SeqCst), 1);
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_subscribe_bars_routes_msgbus_events_to_registered_strategy_adapter() {
let _lock = lock_counters();
s_reset();
let strategy_id = "PluginPhaseTwoBars-001";
let mut adapter = build_strategy_adapter(strategy_id);
register_strategy_adapter(&mut adapter);
Component::start(&mut adapter).expect("strategy starts");
let actor_id_ustr = adapter.actor_id().inner();
let _registered = register_actor(adapter);
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let bar = make_bar();
let bar_type = bar.bar_type;
let p = host_vtable();
let v = unsafe { &*p };
let r = unsafe {
(v.subscribe_bars)(
ctx,
BorrowedStr::from_str(&bar_type.to_string()),
BorrowedStr::empty(),
BorrowedStr::empty(),
)
};
r.into_result().expect("subscribe_bars succeeds");
msgbus::publish_bar(get_bars_topic(bar_type), &bar);
assert_eq!(S_BAR.load(Ordering::SeqCst), 1);
let r = unsafe {
(v.unsubscribe_bars)(
ctx,
BorrowedStr::from_str(&bar_type.to_string()),
BorrowedStr::empty(),
BorrowedStr::empty(),
)
};
r.into_result().expect("unsubscribe_bars succeeds");
msgbus::publish_bar(get_bars_topic(bar_type), &bar);
assert_eq!(S_BAR.load(Ordering::SeqCst), 1);
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_subscriptions_route_msgbus_events_to_registered_actor_adapter() {
let _lock = lock_counters();
a_reset();
let mut adapter = build_actor_adapter("PluginPhaseTwoActorSub-001");
adapter
.register(
TraderId::from("TRADER-001"),
Rc::new(RefCell::new(TestClock::new())),
Rc::new(RefCell::new(Cache::default())),
)
.expect("actor register");
Component::start(&mut adapter).expect("actor starts");
let actor_id_ustr = adapter.actor_id().inner();
let _registered = register_actor(adapter);
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: false,
});
let instrument = instrument_id().to_string();
let bar = make_bar();
let bar_type = bar.bar_type;
let p = host_vtable();
let v = unsafe { &*p };
unsafe {
(v.subscribe_quotes)(
ctx,
BorrowedStr::from_str(&instrument),
BorrowedStr::empty(),
BorrowedStr::empty(),
)
}
.into_result()
.expect("subscribe_quotes succeeds");
unsafe {
(v.subscribe_trades)(
ctx,
BorrowedStr::from_str(&instrument),
BorrowedStr::empty(),
BorrowedStr::empty(),
)
}
.into_result()
.expect("subscribe_trades succeeds");
unsafe {
(v.subscribe_bars)(
ctx,
BorrowedStr::from_str(&bar_type.to_string()),
BorrowedStr::empty(),
BorrowedStr::empty(),
)
}
.into_result()
.expect("subscribe_bars succeeds");
msgbus::publish_quote(get_quotes_topic(instrument_id()), &make_quote());
msgbus::publish_trade(get_trades_topic(instrument_id()), &make_trade());
msgbus::publish_bar(get_bars_topic(bar_type), &bar);
assert_eq!(A_QUOTE.load(Ordering::SeqCst), 1);
assert_eq!(A_TRADE.load(Ordering::SeqCst), 1);
assert_eq!(A_BAR.load(Ordering::SeqCst), 1);
unsafe {
(v.unsubscribe_quotes)(
ctx,
BorrowedStr::from_str(&instrument),
BorrowedStr::empty(),
BorrowedStr::empty(),
)
}
.into_result()
.expect("unsubscribe_quotes succeeds");
unsafe {
(v.unsubscribe_trades)(
ctx,
BorrowedStr::from_str(&instrument),
BorrowedStr::empty(),
BorrowedStr::empty(),
)
}
.into_result()
.expect("unsubscribe_trades succeeds");
unsafe {
(v.unsubscribe_bars)(
ctx,
BorrowedStr::from_str(&bar_type.to_string()),
BorrowedStr::empty(),
BorrowedStr::empty(),
)
}
.into_result()
.expect("unsubscribe_bars succeeds");
msgbus::publish_quote(get_quotes_topic(instrument_id()), &make_quote());
msgbus::publish_trade(get_trades_topic(instrument_id()), &make_trade());
msgbus::publish_bar(get_bars_topic(bar_type), &bar);
assert_eq!(A_QUOTE.load(Ordering::SeqCst), 1);
assert_eq!(A_TRADE.load(Ordering::SeqCst), 1);
assert_eq!(A_BAR.load(Ordering::SeqCst), 1);
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_book_subscription_callbacks_emit_data_commands() {
let strategy_id = "PluginPhaseTwoBookCmd-001";
let mut adapter = build_strategy_adapter(strategy_id);
register_strategy_adapter(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let _registered = register_actor(adapter);
let commands = Rc::new(RefCell::new(Vec::new()));
let commands_clone = commands.clone();
let handler = TypedIntoHandler::from(move |command: DataCommand| {
commands_clone.borrow_mut().push(command);
});
msgbus::register_data_command_endpoint(
MessagingSwitchboard::data_engine_queue_execute(),
handler,
);
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let instrument = instrument_id().to_string();
let interval_ms = 1_000;
let p = host_vtable();
let v = unsafe { &*p };
unsafe {
(v.subscribe_book_deltas)(
ctx,
BorrowedStr::from_str(&instrument),
BookType::L2_MBP as u8,
5,
BorrowedStr::empty(),
1,
BorrowedStr::empty(),
)
}
.into_result()
.expect("subscribe_book_deltas succeeds");
unsafe {
(v.unsubscribe_book_deltas)(
ctx,
BorrowedStr::from_str(&instrument),
BorrowedStr::empty(),
BorrowedStr::empty(),
)
}
.into_result()
.expect("unsubscribe_book_deltas succeeds");
unsafe {
(v.subscribe_book_at_interval)(
ctx,
BorrowedStr::from_str(&instrument),
BookType::L2_MBP as u8,
7,
interval_ms,
BorrowedStr::empty(),
BorrowedStr::empty(),
)
}
.into_result()
.expect("subscribe_book_at_interval succeeds");
unsafe {
(v.unsubscribe_book_at_interval)(
ctx,
BorrowedStr::from_str(&instrument),
interval_ms,
BorrowedStr::empty(),
BorrowedStr::empty(),
)
}
.into_result()
.expect("unsubscribe_book_at_interval succeeds");
let commands = commands.borrow();
assert_eq!(commands.len(), 4);
match &commands[0] {
DataCommand::Subscribe(SubscribeCommand::BookDeltas(command)) => {
assert_eq!(command.instrument_id, instrument_id());
assert_eq!(command.book_type, BookType::L2_MBP);
assert_eq!(command.depth.map(std::num::NonZeroUsize::get), Some(5));
assert!(command.managed);
}
other => panic!("expected SubscribeCommand::BookDeltas, was {other:?}"),
}
match &commands[1] {
DataCommand::Unsubscribe(UnsubscribeCommand::BookDeltas(command)) => {
assert_eq!(command.instrument_id, instrument_id());
}
other => panic!("expected UnsubscribeCommand::BookDeltas, was {other:?}"),
}
match &commands[2] {
DataCommand::Subscribe(SubscribeCommand::BookSnapshots(command)) => {
assert_eq!(command.instrument_id, instrument_id());
assert_eq!(command.book_type, BookType::L2_MBP);
assert_eq!(command.depth.map(std::num::NonZeroUsize::get), Some(7));
assert_eq!(command.interval_ms.get(), interval_ms);
}
other => panic!("expected SubscribeCommand::BookSnapshots, was {other:?}"),
}
match &commands[3] {
DataCommand::Unsubscribe(UnsubscribeCommand::BookSnapshots(command)) => {
assert_eq!(command.instrument_id, instrument_id());
assert_eq!(command.interval_ms.get(), interval_ms);
}
other => panic!("expected UnsubscribeCommand::BookSnapshots, was {other:?}"),
}
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_msgbus_publish_routes_bytes_to_any_subscribers() {
let topic = "plugin.phase2.bytes";
let received = Rc::new(RefCell::new(Vec::new()));
let received_clone = received.clone();
let handler = ShareableMessageHandler::from_typed(move |payload: &Vec<u8>| {
received_clone.borrow_mut().push(payload.clone());
});
msgbus::subscribe_any(topic.into(), handler.clone(), None);
let mut adapter = build_strategy_adapter("PluginPhaseTwoMsgbus-001");
register_strategy_adapter(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let _registered = register_actor(adapter);
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let payload = [1_u8, 2, 3, 5, 8];
let p = host_vtable();
let v = unsafe { &*p };
let r = unsafe {
(v.msgbus_publish)(
ctx,
BorrowedStr::from_str(topic),
Slice::from_slice(&payload),
)
};
r.into_result().expect("msgbus_publish succeeds");
msgbus::unsubscribe_any(topic.into(), &handler);
assert_eq!(received.borrow().as_slice(), &[payload.to_vec()]);
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_msgbus_publish_rejects_unregistered_context() {
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from("PluginPhaseTwoMsgbus-Missing"),
is_strategy: true,
});
let payload = [1_u8];
let p = host_vtable();
let v = unsafe { &*p };
let err = unsafe {
(v.msgbus_publish)(
ctx,
BorrowedStr::from_str("plugin.phase2.missing"),
Slice::from_slice(&payload),
)
}
.into_result()
.expect_err("msgbus_publish rejects missing adapter");
assert_eq!(err.code, nautilus_plugin::PluginErrorCode::Generic);
assert!(err.message_string().contains("could not resolve"));
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_timer_callbacks_register_on_strategy_clock() {
let strategy_id = "PluginPhaseTwoTimer-001";
let mut adapter = build_strategy_adapter(strategy_id);
let (clock, _) = register_strategy_adapter_with_clock_and_cache(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let _registered = register_actor(adapter);
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let r = unsafe { (v.set_timer)(ctx, BorrowedStr::from_str("PHASE2_TIMER"), 10, 0, 0, 1, 0) };
r.into_result().expect("set_timer succeeds");
assert!(
clock
.borrow()
.timer_exists(&ustr::Ustr::from("PHASE2_TIMER"))
);
let r = unsafe { (v.cancel_timer)(ctx, BorrowedStr::from_str("PHASE2_TIMER")) };
r.into_result().expect("cancel_timer succeeds");
assert_eq!(clock.borrow().timer_count(), 0);
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_time_alert_callback_registers_on_strategy_clock() {
let strategy_id = "PluginPhaseTwoAlert-001";
let mut adapter = build_strategy_adapter(strategy_id);
let (clock, _) = register_strategy_adapter_with_clock_and_cache(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let _registered = register_actor(adapter);
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let r = unsafe { (v.set_time_alert)(ctx, BorrowedStr::from_str("PHASE2_ALERT"), 10, 1) };
r.into_result().expect("set_time_alert succeeds");
assert!(
clock
.borrow()
.timer_exists(&ustr::Ustr::from("PHASE2_ALERT"))
);
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_subscription_callbacks_reject_invalid_boundary_inputs() {
let p = host_vtable();
let v = unsafe { &*p };
let cases = [
(
"invalid instrument_id",
unsafe {
(v.subscribe_quotes)(
std::ptr::null(),
BorrowedStr::empty(),
BorrowedStr::empty(),
BorrowedStr::empty(),
)
},
),
(
"invalid bar_type",
unsafe {
(v.subscribe_bars)(
std::ptr::null(),
BorrowedStr::from_str("not-a-bar-type"),
BorrowedStr::empty(),
BorrowedStr::empty(),
)
},
),
(
"invalid book_type",
unsafe {
(v.subscribe_book_deltas)(
std::ptr::null(),
BorrowedStr::from_str("BTCUSDT.BINANCE"),
255,
0,
BorrowedStr::empty(),
0,
BorrowedStr::empty(),
)
},
),
(
"invalid params_json",
unsafe {
(v.subscribe_quotes)(
std::ptr::null(),
BorrowedStr::from_str("BTCUSDT.BINANCE"),
BorrowedStr::empty(),
BorrowedStr::from_str("{"),
)
},
),
];
for (expected_message, result) in cases {
let err = result
.into_result()
.expect_err("invalid input should be rejected before dispatch");
assert_eq!(err.code, nautilus_plugin::PluginErrorCode::InvalidArgument);
assert!(
err.message_string().contains(expected_message),
"expected message containing '{expected_message}', was: {}",
err.message_string(),
);
}
}
#[rstest]
#[case::subscribe(true)]
#[case::unsubscribe(false)]
fn host_book_interval_callbacks_reject_zero_interval(#[case] subscribe: bool) {
let p = host_vtable();
let v = unsafe { &*p };
let instrument = BorrowedStr::from_str("BTCUSDT.BINANCE");
let result = if subscribe {
unsafe {
(v.subscribe_book_at_interval)(
std::ptr::null(),
instrument,
BookType::L2_MBP as u8,
0,
0,
BorrowedStr::empty(),
BorrowedStr::empty(),
)
}
} else {
unsafe {
(v.unsubscribe_book_at_interval)(
std::ptr::null(),
instrument,
0,
BorrowedStr::empty(),
BorrowedStr::empty(),
)
}
};
let err = result
.into_result()
.expect_err("zero interval should be rejected before dispatch");
assert_eq!(err.code, nautilus_plugin::PluginErrorCode::InvalidArgument);
assert_eq!(
err.message_string(),
"interval_ms must be greater than zero"
);
}
fn make_margin_account(account_id: AccountId) -> AccountAny {
let account_state = AccountState::new(
account_id,
AccountType::Margin,
vec![AccountBalance::new(
Money::from("1000000 USDT"),
Money::from("0 USDT"),
Money::from("1000000 USDT"),
)],
vec![],
true,
UUID4::new(),
UnixNanos::default(),
UnixNanos::default(),
Some(Currency::USDT()),
);
AccountAny::Margin(MarginAccount::new(account_state, true))
}
fn make_position_for_strategy(position_id: &str, strategy_id: &str) -> Position {
let instrument = InstrumentAny::CurrencyPair(currency_pair_ethusdt());
let mut fill = make_order_filled();
fill.strategy_id = StrategyId::from(strategy_id);
fill.instrument_id = InstrumentId::from("ETHUSDT.BINANCE");
fill.position_id = Some(PositionId::from(position_id));
Position::new(&instrument, fill)
}
#[rstest]
fn host_submit_order_list_routes_through_registered_strategy_adapter() {
let _lock = lock_counters();
let strategy_id = "PluginSubmitList-001";
let mut adapter = build_strategy_adapter(strategy_id);
register_strategy_adapter(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let _registered = register_actor(adapter);
RISK_COMMAND_COUNT.store(0, Ordering::SeqCst);
let captured: std::sync::Arc<std::sync::Mutex<Option<TradingCommand>>> =
std::sync::Arc::new(std::sync::Mutex::new(None));
let captured_clone = std::sync::Arc::clone(&captured);
let risk_handler = TypedIntoHandler::from_with_id(
"PluginRiskProbeList.execute",
move |command: TradingCommand| {
assert!(matches!(command, TradingCommand::SubmitOrderList(_)));
*captured_clone.lock().unwrap() = Some(command);
RISK_COMMAND_COUNT.fetch_add(1, Ordering::SeqCst);
},
);
msgbus::register_trading_command_endpoint(
MessagingSwitchboard::risk_engine_queue_execute(),
risk_handler,
);
let expected_ids = [
ClientOrderId::from("O-PLUGIN-LIST-1"),
ClientOrderId::from("O-PLUGIN-LIST-2"),
];
let handle = SubmitOrderListHandle::new(SubmitOrderListCommand::new(
vec![
make_initialized_market_order("O-PLUGIN-LIST-1", strategy_id),
make_initialized_market_order("O-PLUGIN-LIST-2", strategy_id),
],
None,
None,
None,
));
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let r = unsafe { (v.submit_order_list)(ctx, &raw const handle) };
r.into_result()
.expect("host submit_order_list should succeed");
assert_eq!(RISK_COMMAND_COUNT.load(Ordering::SeqCst), 1);
let captured = captured.lock().unwrap().take().expect("command captured");
match captured {
TradingCommand::SubmitOrderList(cmd) => {
assert_eq!(cmd.order_list.client_order_ids.len(), 2);
assert_eq!(cmd.order_list.client_order_ids[0], expected_ids[0]);
assert_eq!(cmd.order_list.client_order_ids[1], expected_ids[1]);
assert_eq!(cmd.order_inits.len(), 2);
}
other => panic!("expected SubmitOrderList, was {other:?}"),
}
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_submit_order_list_rejects_empty_orders() {
let _lock = lock_counters();
let strategy_id = "PluginSubmitListEmpty-001";
let mut adapter = build_strategy_adapter(strategy_id);
register_strategy_adapter(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let _registered = register_actor(adapter);
let handle =
SubmitOrderListHandle::new(SubmitOrderListCommand::new(Vec::new(), None, None, None));
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let err = unsafe { (v.submit_order_list)(ctx, &raw const handle) }
.into_result()
.expect_err("empty order list should be rejected");
assert_ne!(err.code, nautilus_plugin::PluginErrorCode::NotImplemented);
assert!(
err.message_string().contains("no orders to submit"),
"unexpected error: {}",
err.message_string()
);
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_cancel_orders_dispatches_to_strategy_through_cache_lookup() {
let _lock = lock_counters();
let strategy_id = "PluginCancelBatch-001";
let mut adapter = build_strategy_adapter(strategy_id);
let cache = register_strategy_adapter_with_cache(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
for id in ["O-PLUGIN-BATCH-1", "O-PLUGIN-BATCH-2"] {
let order = make_initialized_market_order(id, strategy_id);
cache
.borrow_mut()
.add_order(order, None, None, false)
.expect("cache accepts order");
}
let _registered = register_actor(adapter);
let handle = CancelOrdersHandle::new(CancelOrdersCommand::new(
vec![
ClientOrderId::from("O-PLUGIN-BATCH-1"),
ClientOrderId::from("O-PLUGIN-BATCH-2"),
],
None,
None,
));
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let err = unsafe { (v.cancel_orders)(ctx, &raw const handle) }
.into_result()
.expect_err("Initialized orders should be rejected as local by Strategy");
assert_ne!(err.code, nautilus_plugin::PluginErrorCode::NotImplemented);
assert!(
err.message_string().contains("emulated or local"),
"expected Strategy-level rejection, was: {}",
err.message_string()
);
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_cancel_all_orders_dispatches_to_strategy_with_empty_cache_path() {
let _lock = lock_counters();
let strategy_id = "PluginCancelAll-001";
let mut adapter = build_strategy_adapter(strategy_id);
register_strategy_adapter(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let _registered = register_actor(adapter);
let handle = CancelAllOrdersHandle::new(CancelAllOrdersCommand::new(
instrument_id(),
Some(OrderSide::Buy),
None,
None,
));
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let r = unsafe { (v.cancel_all_orders)(ctx, &raw const handle) };
r.into_result()
.expect("host cancel_all_orders should succeed");
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_close_position_routes_through_registered_strategy_adapter() {
let _lock = lock_counters();
let strategy_id = "PluginClosePos-001";
let mut adapter = build_strategy_adapter(strategy_id);
let cache = register_strategy_adapter_with_cache(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let instrument = InstrumentAny::CurrencyPair(currency_pair_ethusdt());
cache
.borrow_mut()
.add_instrument(instrument)
.expect("cache accepts instrument");
let position = make_position_for_strategy("P-PLUGIN-CLOSE-1", strategy_id);
cache
.borrow_mut()
.add_position(&position, OmsType::Netting)
.expect("cache accepts position");
let _registered = register_actor(adapter);
RISK_COMMAND_COUNT.store(0, Ordering::SeqCst);
let captured: std::sync::Arc<std::sync::Mutex<Option<TradingCommand>>> =
std::sync::Arc::new(std::sync::Mutex::new(None));
let captured_clone = std::sync::Arc::clone(&captured);
let risk_handler = TypedIntoHandler::from_with_id(
"PluginRiskProbeClosePos.execute",
move |command: TradingCommand| {
assert!(matches!(command, TradingCommand::SubmitOrder(_)));
*captured_clone.lock().unwrap() = Some(command);
RISK_COMMAND_COUNT.fetch_add(1, Ordering::SeqCst);
},
);
msgbus::register_trading_command_endpoint(
MessagingSwitchboard::risk_engine_queue_execute(),
risk_handler,
);
let expected_tag = ustr::Ustr::from("plugin-exit");
let handle = ClosePositionHandle::new(ClosePositionCommand::new(
PositionId::from("P-PLUGIN-CLOSE-1"),
None,
Some(vec![expected_tag]),
Some(TimeInForce::Ioc),
None,
None,
));
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let r = unsafe { (v.close_position)(ctx, &raw const handle) };
r.into_result().expect("host close_position should succeed");
assert_eq!(RISK_COMMAND_COUNT.load(Ordering::SeqCst), 1);
let captured = captured.lock().unwrap().take().expect("command captured");
match captured {
TradingCommand::SubmitOrder(cmd) => {
assert_eq!(cmd.position_id, Some(PositionId::from("P-PLUGIN-CLOSE-1")));
let order_tags = cmd.order_init.tags.expect("closing order carries tags");
assert!(
order_tags.contains(&expected_tag),
"expected tag {expected_tag} to flow into closing order, was: {order_tags:?}",
);
assert_eq!(cmd.order_init.time_in_force, TimeInForce::Ioc);
}
other => panic!("expected SubmitOrder, was {other:?}"),
}
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_close_position_rejects_missing_position() {
let _lock = lock_counters();
let strategy_id = "PluginClosePosMissing-001";
let mut adapter = build_strategy_adapter(strategy_id);
register_strategy_adapter(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let _registered = register_actor(adapter);
let handle = ClosePositionHandle::new(ClosePositionCommand::new(
PositionId::from("P-PLUGIN-MISSING"),
None,
None,
None,
None,
None,
));
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let err = unsafe { (v.close_position)(ctx, &raw const handle) }
.into_result()
.expect_err("missing position should surface as an error");
assert!(
err.message_string().contains("not found in cache"),
"unexpected error: {}",
err.message_string()
);
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_close_all_positions_routes_through_registered_strategy_adapter() {
let _lock = lock_counters();
let strategy_id = "PluginCloseAll-001";
let mut adapter = build_strategy_adapter(strategy_id);
let cache = register_strategy_adapter_with_cache(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let instrument = InstrumentAny::CurrencyPair(currency_pair_ethusdt());
let target_instrument_id = instrument.id();
cache
.borrow_mut()
.add_instrument(instrument)
.expect("cache accepts instrument");
let position = make_position_for_strategy("P-PLUGIN-CLOSE-ALL-1", strategy_id);
cache
.borrow_mut()
.add_position(&position, OmsType::Netting)
.expect("cache accepts position");
let _registered = register_actor(adapter);
RISK_COMMAND_COUNT.store(0, Ordering::SeqCst);
let captured: std::sync::Arc<std::sync::Mutex<Option<TradingCommand>>> =
std::sync::Arc::new(std::sync::Mutex::new(None));
let captured_clone = std::sync::Arc::clone(&captured);
let risk_handler = TypedIntoHandler::from_with_id(
"PluginRiskProbeCloseAll.execute",
move |command: TradingCommand| {
assert!(matches!(command, TradingCommand::SubmitOrder(_)));
*captured_clone.lock().unwrap() = Some(command);
RISK_COMMAND_COUNT.fetch_add(1, Ordering::SeqCst);
},
);
msgbus::register_trading_command_endpoint(
MessagingSwitchboard::risk_engine_queue_execute(),
risk_handler,
);
let expected_tag = ustr::Ustr::from("plugin-flatten");
let handle = CloseAllPositionsHandle::new(CloseAllPositionsCommand::new(
target_instrument_id,
Some(position.side),
None,
Some(vec![expected_tag]),
Some(TimeInForce::Ioc),
None,
None,
));
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let r = unsafe { (v.close_all_positions)(ctx, &raw const handle) };
r.into_result()
.expect("host close_all_positions should succeed");
assert_eq!(RISK_COMMAND_COUNT.load(Ordering::SeqCst), 1);
let captured = captured.lock().unwrap().take().expect("command captured");
match captured {
TradingCommand::SubmitOrder(cmd) => {
assert_eq!(cmd.instrument_id, target_instrument_id);
let order_tags = cmd.order_init.tags.expect("closing order carries tags");
assert!(
order_tags.contains(&expected_tag),
"expected tag {expected_tag} to flow into closing order, was: {order_tags:?}",
);
assert_eq!(cmd.order_init.time_in_force, TimeInForce::Ioc);
}
other => panic!("expected SubmitOrder, was {other:?}"),
}
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_query_account_routes_through_registered_strategy_adapter() {
let _lock = lock_counters();
let strategy_id = "PluginQueryAccount-001";
let mut adapter = build_strategy_adapter(strategy_id);
register_strategy_adapter(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let _registered = register_actor(adapter);
RISK_COMMAND_COUNT.store(0, Ordering::SeqCst);
let captured: std::sync::Arc<std::sync::Mutex<Option<TradingCommand>>> =
std::sync::Arc::new(std::sync::Mutex::new(None));
let captured_clone = std::sync::Arc::clone(&captured);
let exec_handler = TypedIntoHandler::from_with_id(
"PluginExecProbeQueryAccount.execute",
move |command: TradingCommand| {
assert!(matches!(command, TradingCommand::QueryAccount(_)));
*captured_clone.lock().unwrap() = Some(command);
RISK_COMMAND_COUNT.fetch_add(1, Ordering::SeqCst);
},
);
msgbus::register_trading_command_endpoint(
MessagingSwitchboard::exec_engine_queue_execute(),
exec_handler,
);
let mut params = Params::new();
params.insert(
"marker".to_string(),
serde_json::Value::String("plugin-query-account".to_string()),
);
let expected_client_id = ClientId::from("BINANCE");
let expected_params = params.clone();
let handle = QueryAccountHandle::new(QueryAccountCommand::new(
AccountId::from("BINANCE-001"),
Some(expected_client_id),
Some(params),
));
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let r = unsafe { (v.query_account)(ctx, &raw const handle) };
r.into_result().expect("host query_account should succeed");
assert_eq!(RISK_COMMAND_COUNT.load(Ordering::SeqCst), 1);
let captured = captured.lock().unwrap().take().expect("command captured");
match captured {
TradingCommand::QueryAccount(cmd) => {
assert_eq!(cmd.account_id, AccountId::from("BINANCE-001"));
assert_eq!(cmd.client_id, Some(expected_client_id));
assert_eq!(cmd.params, Some(expected_params));
}
other => panic!("expected QueryAccount, was {other:?}"),
}
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_query_order_routes_through_registered_strategy_adapter() {
let _lock = lock_counters();
let strategy_id = "PluginQueryOrder-001";
let mut adapter = build_strategy_adapter(strategy_id);
let cache = register_strategy_adapter_with_cache(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let order = make_initialized_market_order("O-PLUGIN-QUERY-1", strategy_id);
cache
.borrow_mut()
.add_order(order, None, None, false)
.expect("cache accepts order");
let _registered = register_actor(adapter);
RISK_COMMAND_COUNT.store(0, Ordering::SeqCst);
let captured: std::sync::Arc<std::sync::Mutex<Option<TradingCommand>>> =
std::sync::Arc::new(std::sync::Mutex::new(None));
let captured_clone = std::sync::Arc::clone(&captured);
let exec_handler = TypedIntoHandler::from_with_id(
"PluginExecProbeQueryOrder.execute",
move |command: TradingCommand| {
assert!(matches!(command, TradingCommand::QueryOrder(_)));
*captured_clone.lock().unwrap() = Some(command);
RISK_COMMAND_COUNT.fetch_add(1, Ordering::SeqCst);
},
);
msgbus::register_trading_command_endpoint(
MessagingSwitchboard::exec_engine_queue_execute(),
exec_handler,
);
let mut params = Params::new();
params.insert(
"marker".to_string(),
serde_json::Value::String("plugin-query-order".to_string()),
);
let expected_params = params.clone();
let handle = QueryOrderHandle::new(QueryOrderCommand::new(
ClientOrderId::from("O-PLUGIN-QUERY-1"),
None,
Some(params),
));
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let r = unsafe { (v.query_order)(ctx, &raw const handle) };
r.into_result().expect("host query_order should succeed");
assert_eq!(RISK_COMMAND_COUNT.load(Ordering::SeqCst), 1);
let captured = captured.lock().unwrap().take().expect("command captured");
match captured {
TradingCommand::QueryOrder(cmd) => {
assert_eq!(cmd.client_order_id, ClientOrderId::from("O-PLUGIN-QUERY-1"));
assert_eq!(cmd.params, Some(expected_params));
}
other => panic!("expected QueryOrder, was {other:?}"),
}
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn host_query_order_rejects_missing_order() {
let _lock = lock_counters();
let strategy_id = "PluginQueryOrderMissing-001";
let mut adapter = build_strategy_adapter(strategy_id);
register_strategy_adapter(&mut adapter);
let actor_id_ustr = adapter.actor_id().inner();
let _registered = register_actor(adapter);
let handle = QueryOrderHandle::new(QueryOrderCommand::new(
ClientOrderId::from("O-PLUGIN-MISSING"),
None,
None,
));
let ctx = nautilus_live::plugin::registry::leak_host_context(HostContextInner {
actor_id: ActorId::from(actor_id_ustr.as_str()),
is_strategy: true,
});
let p = host_vtable();
let v = unsafe { &*p };
let err = unsafe { (v.query_order)(ctx, &raw const handle) }
.into_result()
.expect_err("missing order should surface as an error");
assert!(
err.message_string().contains("not found in cache"),
"unexpected error: {}",
err.message_string()
);
unsafe { nautilus_live::plugin::registry::drop_host_context(ctx) };
}
#[rstest]
fn strategy_adapter_on_time_event_composes_strategy_default_before_forward() {
let _lock = lock_counters();
s_reset();
a_reset();
let mut s = build_strategy_adapter("CountingStrategy-Time");
register_strategy_adapter(&mut s);
let event = TimeEvent::new(
ustr::Ustr::from("PLUGIN-TEST-TIMER"),
UUID4::new(),
UnixNanos::from(1u64),
UnixNanos::from(1u64),
);
let mut a = build_actor_adapter("CountingActor-Probe");
DataActor::on_time_event(&mut s, &event).expect("on_time_event succeeds");
DataActor::on_time_event(&mut a, &event).expect("actor forwards too");
}