Skip to main content

nautilus_backtest/
execution_client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a `BacktestExecutionClient` implementation for backtesting.
17
18use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use async_trait::async_trait;
21use nautilus_common::{
22    cache::Cache,
23    clients::ExecutionClient,
24    clock::Clock,
25    factories::OrderEventFactory,
26    messages::execution::{
27        BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
28        SubmitOrder, SubmitOrderList, TradingCommand,
29    },
30    msgbus::{self, MessagingSwitchboard},
31};
32use nautilus_core::{SharedCell, UnixNanos, WeakCell};
33use nautilus_execution::client::core::ExecutionClientCore;
34use nautilus_model::{
35    accounts::AccountAny,
36    enums::OmsType,
37    events::OrderEventAny,
38    identifiers::{AccountId, ClientId, ClientOrderId, TraderId, Venue},
39    orders::OrderAny,
40    types::{AccountBalance, MarginBalance},
41};
42
43use crate::exchange::SimulatedExchange;
44
45/// Execution client implementation for backtesting trading operations.
46///
47/// The `BacktestExecutionClient` provides an execution client interface for
48/// backtesting environments, handling order management and trade execution
49/// through simulated exchanges. It processes trading commands and coordinates
50/// with the simulation infrastructure to provide realistic execution behavior.
51#[derive(Clone)]
52pub struct BacktestExecutionClient {
53    core: ExecutionClientCore,
54    factory: OrderEventFactory,
55    cache: Rc<RefCell<Cache>>,
56    clock: Rc<RefCell<dyn Clock>>,
57    exchange: WeakCell<SimulatedExchange>,
58    /// Buffered order events for deferred processing.
59    ///
60    /// Events like `OrderSubmitted` cannot be sent synchronously through
61    /// the msgbus during `submit_order` because the exec engine holds a
62    /// borrow via its `execute` handler. Instead, events are buffered here
63    /// and drained by the engine after the execute borrow is released.
64    queued_events: Rc<RefCell<Vec<OrderEventAny>>>,
65    routing: bool,
66    _frozen_account: bool,
67}
68
69impl Debug for BacktestExecutionClient {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct(stringify!(BacktestExecutionClient))
72            .field("client_id", &self.core.client_id)
73            .field("routing", &self.routing)
74            .finish()
75    }
76}
77
78impl BacktestExecutionClient {
79    /// Creates a new [`BacktestExecutionClient`] instance.
80    #[must_use]
81    #[allow(clippy::too_many_arguments)]
82    pub fn new(
83        trader_id: TraderId,
84        account_id: AccountId,
85        exchange: Rc<RefCell<SimulatedExchange>>,
86        cache: Rc<RefCell<Cache>>,
87        clock: Rc<RefCell<dyn Clock>>,
88        routing: Option<bool>,
89        frozen_account: Option<bool>,
90    ) -> Self {
91        let routing = routing.unwrap_or(false);
92        let frozen_account = frozen_account.unwrap_or(false);
93        let exchange_shared: SharedCell<SimulatedExchange> = SharedCell::from(exchange.clone());
94        let exchange_id = exchange_shared.borrow().id;
95        let account_type = exchange.borrow().account_type;
96        let base_currency = exchange.borrow().base_currency;
97
98        let core = ExecutionClientCore::new(
99            trader_id,
100            ClientId::from(exchange_id.as_str()),
101            Venue::from(exchange_id.as_str()),
102            exchange.borrow().oms_type,
103            account_id,
104            account_type,
105            base_currency,
106            cache.clone(),
107        );
108
109        let factory = OrderEventFactory::new(trader_id, account_id, account_type, base_currency);
110
111        if !frozen_account {
112            // TODO Register calculated account
113        }
114
115        Self {
116            core,
117            factory,
118            exchange: exchange_shared.downgrade(),
119            cache,
120            clock,
121            queued_events: Rc::new(RefCell::new(Vec::new())),
122            routing,
123            _frozen_account: frozen_account,
124        }
125    }
126
127    fn get_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<OrderAny> {
128        self.cache
129            .borrow()
130            .order(client_order_id)
131            .cloned()
132            .ok_or_else(|| anyhow::anyhow!("Order not found in cache for {client_order_id}"))
133    }
134
135    /// Drain buffered order events, sending each to the exec engine.
136    pub fn drain_queued_events(&self) {
137        let events: Vec<OrderEventAny> = self.queued_events.borrow_mut().drain(..).collect();
138        let endpoint = MessagingSwitchboard::exec_engine_process();
139        for event in events {
140            msgbus::send_order_event(endpoint, event);
141        }
142    }
143}
144
145#[async_trait(?Send)]
146impl ExecutionClient for BacktestExecutionClient {
147    fn is_connected(&self) -> bool {
148        self.core.is_connected()
149    }
150
151    fn client_id(&self) -> ClientId {
152        self.core.client_id
153    }
154
155    fn account_id(&self) -> AccountId {
156        self.core.account_id
157    }
158
159    fn venue(&self) -> Venue {
160        self.core.venue
161    }
162
163    fn oms_type(&self) -> OmsType {
164        self.core.oms_type
165    }
166
167    fn get_account(&self) -> Option<AccountAny> {
168        self.cache.borrow().account(&self.core.account_id).cloned()
169    }
170
171    fn generate_account_state(
172        &self,
173        balances: Vec<AccountBalance>,
174        margins: Vec<MarginBalance>,
175        reported: bool,
176        ts_event: UnixNanos,
177    ) -> anyhow::Result<()> {
178        let ts_init = self.clock.borrow().timestamp_ns();
179        let state = self
180            .factory
181            .generate_account_state(balances, margins, reported, ts_event, ts_init);
182        let endpoint = MessagingSwitchboard::portfolio_update_account();
183        msgbus::send_account_state(endpoint, &state);
184        Ok(())
185    }
186
187    fn start(&mut self) -> anyhow::Result<()> {
188        self.core.set_connected();
189        log::info!("Backtest execution client started");
190        Ok(())
191    }
192
193    fn stop(&mut self) -> anyhow::Result<()> {
194        self.core.set_disconnected();
195        log::info!("Backtest execution client stopped");
196        Ok(())
197    }
198
199    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
200        // Buffer the OrderSubmitted event for deferred processing to avoid
201        // RefCell re-entrancy (exec_engine holds a borrow during execute)
202        let order = self.get_order(&cmd.client_order_id)?;
203        let ts_init = self.clock.borrow().timestamp_ns();
204        let event = self.factory.generate_order_submitted(&order, ts_init);
205        self.queued_events.borrow_mut().push(event);
206
207        if let Some(exchange) = self.exchange.upgrade() {
208            exchange
209                .borrow_mut()
210                .send(TradingCommand::SubmitOrder(cmd.clone()));
211        } else {
212            log::error!("submit_order: SimulatedExchange has been dropped");
213        }
214        Ok(())
215    }
216
217    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
218        let ts_init = self.clock.borrow().timestamp_ns();
219
220        let orders: Vec<OrderAny> = self
221            .cache
222            .borrow()
223            .orders_for_ids(&cmd.order_list.client_order_ids, cmd);
224
225        // Buffer events for deferred processing
226        let mut queued = self.queued_events.borrow_mut();
227        for order in &orders {
228            let event = self.factory.generate_order_submitted(order, ts_init);
229            queued.push(event);
230        }
231        drop(queued);
232
233        if let Some(exchange) = self.exchange.upgrade() {
234            exchange
235                .borrow_mut()
236                .send(TradingCommand::SubmitOrderList(cmd.clone()));
237        } else {
238            log::error!("submit_order_list: SimulatedExchange has been dropped");
239        }
240        Ok(())
241    }
242
243    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
244        if let Some(exchange) = self.exchange.upgrade() {
245            exchange
246                .borrow_mut()
247                .send(TradingCommand::ModifyOrder(cmd.clone()));
248        } else {
249            log::error!("modify_order: SimulatedExchange has been dropped");
250        }
251        Ok(())
252    }
253
254    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
255        if let Some(exchange) = self.exchange.upgrade() {
256            exchange
257                .borrow_mut()
258                .send(TradingCommand::CancelOrder(cmd.clone()));
259        } else {
260            log::error!("cancel_order: SimulatedExchange has been dropped");
261        }
262        Ok(())
263    }
264
265    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
266        if let Some(exchange) = self.exchange.upgrade() {
267            exchange
268                .borrow_mut()
269                .send(TradingCommand::CancelAllOrders(cmd.clone()));
270        } else {
271            log::error!("cancel_all_orders: SimulatedExchange has been dropped");
272        }
273        Ok(())
274    }
275
276    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
277        if let Some(exchange) = self.exchange.upgrade() {
278            exchange
279                .borrow_mut()
280                .send(TradingCommand::BatchCancelOrders(cmd.clone()));
281        } else {
282            log::error!("batch_cancel_orders: SimulatedExchange has been dropped");
283        }
284        Ok(())
285    }
286
287    fn query_account(&self, cmd: &QueryAccount) -> anyhow::Result<()> {
288        if let Some(exchange) = self.exchange.upgrade() {
289            exchange
290                .borrow_mut()
291                .send(TradingCommand::QueryAccount(cmd.clone()));
292        } else {
293            log::error!("query_account: SimulatedExchange has been dropped");
294        }
295        Ok(())
296    }
297
298    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
299        if let Some(exchange) = self.exchange.upgrade() {
300            exchange
301                .borrow_mut()
302                .send(TradingCommand::QueryOrder(cmd.clone()));
303        } else {
304            log::error!("query_order: SimulatedExchange has been dropped");
305        }
306        Ok(())
307    }
308}