1use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use async_trait::async_trait;
21use nautilus_common::{
22 cache::Cache,
23 clients::ExecutionClient,
24 clock::Clock,
25 factories::OrderEventFactory,
26 messages::execution::{
27 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
28 SubmitOrder, SubmitOrderList, TradingCommand,
29 },
30 msgbus::{self, MessagingSwitchboard},
31};
32use nautilus_core::{SharedCell, UnixNanos, WeakCell};
33use nautilus_execution::client::core::ExecutionClientCore;
34use nautilus_model::{
35 accounts::AccountAny,
36 enums::OmsType,
37 events::OrderEventAny,
38 identifiers::{AccountId, ClientId, ClientOrderId, TraderId, Venue},
39 orders::OrderAny,
40 types::{AccountBalance, MarginBalance},
41};
42
43use crate::exchange::SimulatedExchange;
44
45#[derive(Clone)]
52pub struct BacktestExecutionClient {
53 core: ExecutionClientCore,
54 factory: OrderEventFactory,
55 cache: Rc<RefCell<Cache>>,
56 clock: Rc<RefCell<dyn Clock>>,
57 exchange: WeakCell<SimulatedExchange>,
58 queued_events: Rc<RefCell<Vec<OrderEventAny>>>,
65 routing: bool,
66 _frozen_account: bool,
67}
68
69impl Debug for BacktestExecutionClient {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct(stringify!(BacktestExecutionClient))
72 .field("client_id", &self.core.client_id)
73 .field("routing", &self.routing)
74 .finish()
75 }
76}
77
78impl BacktestExecutionClient {
79 #[must_use]
81 pub fn new(
82 trader_id: TraderId,
83 account_id: AccountId,
84 exchange: &Rc<RefCell<SimulatedExchange>>,
85 cache: Rc<RefCell<Cache>>,
86 clock: Rc<RefCell<dyn Clock>>,
87 routing: Option<bool>,
88 frozen_account: Option<bool>,
89 ) -> Self {
90 let routing = routing.unwrap_or(false);
91 let frozen_account = frozen_account.unwrap_or(false);
92 let exchange_shared: SharedCell<SimulatedExchange> = SharedCell::from(exchange.clone());
93 let exchange_id = exchange_shared.borrow().id;
94 let account_type = exchange.borrow().account_type;
95 let base_currency = exchange.borrow().base_currency;
96
97 let core = ExecutionClientCore::new(
98 trader_id,
99 ClientId::from(exchange_id.as_str()),
100 Venue::from(exchange_id.as_str()),
101 exchange.borrow().oms_type,
102 account_id,
103 account_type,
104 base_currency,
105 cache.clone(),
106 );
107
108 let factory = OrderEventFactory::new(trader_id, account_id, account_type, base_currency);
109
110 Self {
111 core,
112 factory,
113 exchange: exchange_shared.downgrade(),
114 cache,
115 clock,
116 queued_events: Rc::new(RefCell::new(Vec::new())),
117 routing,
118 _frozen_account: frozen_account,
119 }
120 }
121
122 fn get_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<OrderAny> {
123 self.cache
124 .borrow()
125 .order(client_order_id)
126 .map(|o| o.clone())
127 .ok_or_else(|| anyhow::anyhow!("Order not found in cache for {client_order_id}"))
128 }
129
130 pub fn drain_queued_events(&self) {
132 let events: Vec<OrderEventAny> = self.queued_events.borrow_mut().drain(..).collect();
133 let endpoint = MessagingSwitchboard::exec_engine_process();
134 for event in events {
135 msgbus::send_order_event(endpoint, event);
136 }
137 }
138}
139
140#[async_trait(?Send)]
141impl ExecutionClient for BacktestExecutionClient {
142 fn is_connected(&self) -> bool {
143 self.core.is_connected()
144 }
145
146 fn client_id(&self) -> ClientId {
147 self.core.client_id
148 }
149
150 fn account_id(&self) -> AccountId {
151 self.core.account_id
152 }
153
154 fn venue(&self) -> Venue {
155 self.core.venue
156 }
157
158 fn oms_type(&self) -> OmsType {
159 self.core.oms_type
160 }
161
162 fn get_account(&self) -> Option<AccountAny> {
163 self.cache.borrow().account_owned(&self.core.account_id)
164 }
165
166 fn generate_account_state(
167 &self,
168 balances: Vec<AccountBalance>,
169 margins: Vec<MarginBalance>,
170 reported: bool,
171 ts_event: UnixNanos,
172 ) -> anyhow::Result<()> {
173 let ts_init = self.clock.borrow().timestamp_ns();
174 let state = self
175 .factory
176 .generate_account_state(balances, margins, reported, ts_event, ts_init);
177 let endpoint = MessagingSwitchboard::portfolio_update_account();
178 msgbus::send_account_state(endpoint, &state);
179 Ok(())
180 }
181
182 fn start(&mut self) -> anyhow::Result<()> {
183 self.core.set_connected();
184 log::info!("Backtest execution client started");
185 Ok(())
186 }
187
188 fn stop(&mut self) -> anyhow::Result<()> {
189 self.core.set_disconnected();
190 log::info!("Backtest execution client stopped");
191 Ok(())
192 }
193
194 fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
195 let order = self.get_order(&cmd.client_order_id)?;
198 let ts_init = self.clock.borrow().timestamp_ns();
199 let event = self.factory.generate_order_submitted(&order, ts_init);
200 self.queued_events.borrow_mut().push(event);
201
202 if let Some(exchange) = self.exchange.upgrade() {
203 exchange.borrow_mut().send(TradingCommand::SubmitOrder(cmd));
204 } else {
205 log::error!("submit_order: SimulatedExchange has been dropped");
206 }
207 Ok(())
208 }
209
210 fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
211 let ts_init = self.clock.borrow().timestamp_ns();
212
213 let orders: Vec<OrderAny> = self
214 .cache
215 .borrow()
216 .orders_for_ids(&cmd.order_list.client_order_ids, &cmd);
217
218 let mut queued = self.queued_events.borrow_mut();
220
221 for order in &orders {
222 let event = self.factory.generate_order_submitted(order, ts_init);
223 queued.push(event);
224 }
225 drop(queued);
226
227 if let Some(exchange) = self.exchange.upgrade() {
228 exchange
229 .borrow_mut()
230 .send(TradingCommand::SubmitOrderList(cmd));
231 } else {
232 log::error!("submit_order_list: SimulatedExchange has been dropped");
233 }
234 Ok(())
235 }
236
237 fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
238 if let Some(exchange) = self.exchange.upgrade() {
239 exchange.borrow_mut().send(TradingCommand::ModifyOrder(cmd));
240 } else {
241 log::error!("modify_order: SimulatedExchange has been dropped");
242 }
243 Ok(())
244 }
245
246 fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
247 if let Some(exchange) = self.exchange.upgrade() {
248 exchange.borrow_mut().send(TradingCommand::CancelOrder(cmd));
249 } else {
250 log::error!("cancel_order: SimulatedExchange has been dropped");
251 }
252 Ok(())
253 }
254
255 fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
256 if let Some(exchange) = self.exchange.upgrade() {
257 exchange
258 .borrow_mut()
259 .send(TradingCommand::CancelAllOrders(cmd));
260 } else {
261 log::error!("cancel_all_orders: SimulatedExchange has been dropped");
262 }
263 Ok(())
264 }
265
266 fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
267 if let Some(exchange) = self.exchange.upgrade() {
268 exchange
269 .borrow_mut()
270 .send(TradingCommand::BatchCancelOrders(cmd));
271 } else {
272 log::error!("batch_cancel_orders: SimulatedExchange has been dropped");
273 }
274 Ok(())
275 }
276
277 fn query_account(&self, cmd: QueryAccount) -> anyhow::Result<()> {
278 if let Some(exchange) = self.exchange.upgrade() {
279 exchange
280 .borrow_mut()
281 .send(TradingCommand::QueryAccount(cmd));
282 } else {
283 log::error!("query_account: SimulatedExchange has been dropped");
284 }
285 Ok(())
286 }
287
288 fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
289 if let Some(exchange) = self.exchange.upgrade() {
290 exchange.borrow_mut().send(TradingCommand::QueryOrder(cmd));
291 } else {
292 log::error!("query_order: SimulatedExchange has been dropped");
293 }
294 Ok(())
295 }
296}