1use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use async_trait::async_trait;
21use nautilus_common::{
22 cache::Cache,
23 clients::ExecutionClient,
24 clock::Clock,
25 factories::OrderEventFactory,
26 messages::execution::{
27 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
28 SubmitOrder, SubmitOrderList, TradingCommand,
29 },
30 msgbus::{self, MessagingSwitchboard},
31};
32use nautilus_core::{SharedCell, UnixNanos, WeakCell};
33use nautilus_execution::client::core::ExecutionClientCore;
34use nautilus_model::{
35 accounts::AccountAny,
36 enums::OmsType,
37 events::OrderEventAny,
38 identifiers::{AccountId, ClientId, ClientOrderId, TraderId, Venue},
39 orders::OrderAny,
40 types::{AccountBalance, MarginBalance},
41};
42
43use crate::exchange::SimulatedExchange;
44
45#[derive(Clone)]
52pub struct BacktestExecutionClient {
53 core: ExecutionClientCore,
54 factory: OrderEventFactory,
55 cache: Rc<RefCell<Cache>>,
56 clock: Rc<RefCell<dyn Clock>>,
57 exchange: WeakCell<SimulatedExchange>,
58 queued_events: Rc<RefCell<Vec<OrderEventAny>>>,
65 routing: bool,
66 _frozen_account: bool,
67}
68
69impl Debug for BacktestExecutionClient {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct(stringify!(BacktestExecutionClient))
72 .field("client_id", &self.core.client_id)
73 .field("routing", &self.routing)
74 .finish()
75 }
76}
77
78impl BacktestExecutionClient {
79 #[must_use]
81 #[allow(clippy::too_many_arguments)]
82 pub fn new(
83 trader_id: TraderId,
84 account_id: AccountId,
85 exchange: Rc<RefCell<SimulatedExchange>>,
86 cache: Rc<RefCell<Cache>>,
87 clock: Rc<RefCell<dyn Clock>>,
88 routing: Option<bool>,
89 frozen_account: Option<bool>,
90 ) -> Self {
91 let routing = routing.unwrap_or(false);
92 let frozen_account = frozen_account.unwrap_or(false);
93 let exchange_shared: SharedCell<SimulatedExchange> = SharedCell::from(exchange.clone());
94 let exchange_id = exchange_shared.borrow().id;
95 let account_type = exchange.borrow().account_type;
96 let base_currency = exchange.borrow().base_currency;
97
98 let core = ExecutionClientCore::new(
99 trader_id,
100 ClientId::from(exchange_id.as_str()),
101 Venue::from(exchange_id.as_str()),
102 exchange.borrow().oms_type,
103 account_id,
104 account_type,
105 base_currency,
106 cache.clone(),
107 );
108
109 let factory = OrderEventFactory::new(trader_id, account_id, account_type, base_currency);
110
111 if !frozen_account {
112 }
114
115 Self {
116 core,
117 factory,
118 exchange: exchange_shared.downgrade(),
119 cache,
120 clock,
121 queued_events: Rc::new(RefCell::new(Vec::new())),
122 routing,
123 _frozen_account: frozen_account,
124 }
125 }
126
127 fn get_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<OrderAny> {
128 self.cache
129 .borrow()
130 .order(client_order_id)
131 .cloned()
132 .ok_or_else(|| anyhow::anyhow!("Order not found in cache for {client_order_id}"))
133 }
134
135 pub fn drain_queued_events(&self) {
137 let events: Vec<OrderEventAny> = self.queued_events.borrow_mut().drain(..).collect();
138 let endpoint = MessagingSwitchboard::exec_engine_process();
139 for event in events {
140 msgbus::send_order_event(endpoint, event);
141 }
142 }
143}
144
145#[async_trait(?Send)]
146impl ExecutionClient for BacktestExecutionClient {
147 fn is_connected(&self) -> bool {
148 self.core.is_connected()
149 }
150
151 fn client_id(&self) -> ClientId {
152 self.core.client_id
153 }
154
155 fn account_id(&self) -> AccountId {
156 self.core.account_id
157 }
158
159 fn venue(&self) -> Venue {
160 self.core.venue
161 }
162
163 fn oms_type(&self) -> OmsType {
164 self.core.oms_type
165 }
166
167 fn get_account(&self) -> Option<AccountAny> {
168 self.cache.borrow().account(&self.core.account_id).cloned()
169 }
170
171 fn generate_account_state(
172 &self,
173 balances: Vec<AccountBalance>,
174 margins: Vec<MarginBalance>,
175 reported: bool,
176 ts_event: UnixNanos,
177 ) -> anyhow::Result<()> {
178 let ts_init = self.clock.borrow().timestamp_ns();
179 let state = self
180 .factory
181 .generate_account_state(balances, margins, reported, ts_event, ts_init);
182 let endpoint = MessagingSwitchboard::portfolio_update_account();
183 msgbus::send_account_state(endpoint, &state);
184 Ok(())
185 }
186
187 fn start(&mut self) -> anyhow::Result<()> {
188 self.core.set_connected();
189 log::info!("Backtest execution client started");
190 Ok(())
191 }
192
193 fn stop(&mut self) -> anyhow::Result<()> {
194 self.core.set_disconnected();
195 log::info!("Backtest execution client stopped");
196 Ok(())
197 }
198
199 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
200 let order = self.get_order(&cmd.client_order_id)?;
203 let ts_init = self.clock.borrow().timestamp_ns();
204 let event = self.factory.generate_order_submitted(&order, ts_init);
205 self.queued_events.borrow_mut().push(event);
206
207 if let Some(exchange) = self.exchange.upgrade() {
208 exchange
209 .borrow_mut()
210 .send(TradingCommand::SubmitOrder(cmd.clone()));
211 } else {
212 log::error!("submit_order: SimulatedExchange has been dropped");
213 }
214 Ok(())
215 }
216
217 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
218 let ts_init = self.clock.borrow().timestamp_ns();
219
220 let orders: Vec<OrderAny> = self
221 .cache
222 .borrow()
223 .orders_for_ids(&cmd.order_list.client_order_ids, cmd);
224
225 let mut queued = self.queued_events.borrow_mut();
227 for order in &orders {
228 let event = self.factory.generate_order_submitted(order, ts_init);
229 queued.push(event);
230 }
231 drop(queued);
232
233 if let Some(exchange) = self.exchange.upgrade() {
234 exchange
235 .borrow_mut()
236 .send(TradingCommand::SubmitOrderList(cmd.clone()));
237 } else {
238 log::error!("submit_order_list: SimulatedExchange has been dropped");
239 }
240 Ok(())
241 }
242
243 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
244 if let Some(exchange) = self.exchange.upgrade() {
245 exchange
246 .borrow_mut()
247 .send(TradingCommand::ModifyOrder(cmd.clone()));
248 } else {
249 log::error!("modify_order: SimulatedExchange has been dropped");
250 }
251 Ok(())
252 }
253
254 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
255 if let Some(exchange) = self.exchange.upgrade() {
256 exchange
257 .borrow_mut()
258 .send(TradingCommand::CancelOrder(cmd.clone()));
259 } else {
260 log::error!("cancel_order: SimulatedExchange has been dropped");
261 }
262 Ok(())
263 }
264
265 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
266 if let Some(exchange) = self.exchange.upgrade() {
267 exchange
268 .borrow_mut()
269 .send(TradingCommand::CancelAllOrders(cmd.clone()));
270 } else {
271 log::error!("cancel_all_orders: SimulatedExchange has been dropped");
272 }
273 Ok(())
274 }
275
276 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
277 if let Some(exchange) = self.exchange.upgrade() {
278 exchange
279 .borrow_mut()
280 .send(TradingCommand::BatchCancelOrders(cmd.clone()));
281 } else {
282 log::error!("batch_cancel_orders: SimulatedExchange has been dropped");
283 }
284 Ok(())
285 }
286
287 fn query_account(&self, cmd: &QueryAccount) -> anyhow::Result<()> {
288 if let Some(exchange) = self.exchange.upgrade() {
289 exchange
290 .borrow_mut()
291 .send(TradingCommand::QueryAccount(cmd.clone()));
292 } else {
293 log::error!("query_account: SimulatedExchange has been dropped");
294 }
295 Ok(())
296 }
297
298 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
299 if let Some(exchange) = self.exchange.upgrade() {
300 exchange
301 .borrow_mut()
302 .send(TradingCommand::QueryOrder(cmd.clone()));
303 } else {
304 log::error!("query_order: SimulatedExchange has been dropped");
305 }
306 Ok(())
307 }
308}