Skip to main content

nautilus_backtest/
execution_client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a `BacktestExecutionClient` implementation for backtesting.
17
18use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use async_trait::async_trait;
21use nautilus_common::{
22    cache::Cache,
23    clients::ExecutionClient,
24    clock::Clock,
25    factories::OrderEventFactory,
26    messages::execution::{
27        BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
28        SubmitOrder, SubmitOrderList, TradingCommand,
29    },
30    msgbus::{self, MessagingSwitchboard},
31};
32use nautilus_core::{SharedCell, UnixNanos, WeakCell};
33use nautilus_execution::client::core::ExecutionClientCore;
34use nautilus_model::{
35    accounts::AccountAny,
36    enums::OmsType,
37    events::OrderEventAny,
38    identifiers::{AccountId, ClientId, ClientOrderId, TraderId, Venue},
39    orders::OrderAny,
40    types::{AccountBalance, MarginBalance},
41};
42
43use crate::exchange::SimulatedExchange;
44
45/// Execution client implementation for backtesting trading operations.
46///
47/// The `BacktestExecutionClient` provides an execution client interface for
48/// backtesting environments, handling order management and trade execution
49/// through simulated exchanges. It processes trading commands and coordinates
50/// with the simulation infrastructure to provide realistic execution behavior.
51#[derive(Clone)]
52pub struct BacktestExecutionClient {
53    core: ExecutionClientCore,
54    factory: OrderEventFactory,
55    cache: Rc<RefCell<Cache>>,
56    clock: Rc<RefCell<dyn Clock>>,
57    exchange: WeakCell<SimulatedExchange>,
58    /// Buffered order events for deferred processing.
59    ///
60    /// Events like `OrderSubmitted` cannot be sent synchronously through
61    /// the msgbus during `submit_order` because the exec engine holds a
62    /// borrow via its `execute` handler. Instead, events are buffered here
63    /// and drained by the engine after the execute borrow is released.
64    queued_events: Rc<RefCell<Vec<OrderEventAny>>>,
65    routing: bool,
66    _frozen_account: bool,
67}
68
69impl Debug for BacktestExecutionClient {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct(stringify!(BacktestExecutionClient))
72            .field("client_id", &self.core.client_id)
73            .field("routing", &self.routing)
74            .finish()
75    }
76}
77
78impl BacktestExecutionClient {
79    /// Creates a new [`BacktestExecutionClient`] instance.
80    #[must_use]
81    pub fn new(
82        trader_id: TraderId,
83        account_id: AccountId,
84        exchange: &Rc<RefCell<SimulatedExchange>>,
85        cache: Rc<RefCell<Cache>>,
86        clock: Rc<RefCell<dyn Clock>>,
87        routing: Option<bool>,
88        frozen_account: Option<bool>,
89    ) -> Self {
90        let routing = routing.unwrap_or(false);
91        let frozen_account = frozen_account.unwrap_or(false);
92        let exchange_shared: SharedCell<SimulatedExchange> = SharedCell::from(exchange.clone());
93        let exchange_id = exchange_shared.borrow().id;
94        let account_type = exchange.borrow().account_type;
95        let base_currency = exchange.borrow().base_currency;
96
97        let core = ExecutionClientCore::new(
98            trader_id,
99            ClientId::from(exchange_id.as_str()),
100            Venue::from(exchange_id.as_str()),
101            exchange.borrow().oms_type,
102            account_id,
103            account_type,
104            base_currency,
105            cache.clone(),
106        );
107
108        let factory = OrderEventFactory::new(trader_id, account_id, account_type, base_currency);
109
110        Self {
111            core,
112            factory,
113            exchange: exchange_shared.downgrade(),
114            cache,
115            clock,
116            queued_events: Rc::new(RefCell::new(Vec::new())),
117            routing,
118            _frozen_account: frozen_account,
119        }
120    }
121
122    fn get_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<OrderAny> {
123        self.cache
124            .borrow()
125            .order(client_order_id)
126            .map(|o| o.clone())
127            .ok_or_else(|| anyhow::anyhow!("Order not found in cache for {client_order_id}"))
128    }
129
130    /// Drain buffered order events, sending each to the exec engine.
131    pub fn drain_queued_events(&self) {
132        let events: Vec<OrderEventAny> = self.queued_events.borrow_mut().drain(..).collect();
133        let endpoint = MessagingSwitchboard::exec_engine_process();
134        for event in events {
135            msgbus::send_order_event(endpoint, event);
136        }
137    }
138}
139
140#[async_trait(?Send)]
141impl ExecutionClient for BacktestExecutionClient {
142    fn is_connected(&self) -> bool {
143        self.core.is_connected()
144    }
145
146    fn client_id(&self) -> ClientId {
147        self.core.client_id
148    }
149
150    fn account_id(&self) -> AccountId {
151        self.core.account_id
152    }
153
154    fn venue(&self) -> Venue {
155        self.core.venue
156    }
157
158    fn oms_type(&self) -> OmsType {
159        self.core.oms_type
160    }
161
162    fn get_account(&self) -> Option<AccountAny> {
163        self.cache.borrow().account_owned(&self.core.account_id)
164    }
165
166    fn generate_account_state(
167        &self,
168        balances: Vec<AccountBalance>,
169        margins: Vec<MarginBalance>,
170        reported: bool,
171        ts_event: UnixNanos,
172    ) -> anyhow::Result<()> {
173        let ts_init = self.clock.borrow().timestamp_ns();
174        let state = self
175            .factory
176            .generate_account_state(balances, margins, reported, ts_event, ts_init);
177        let endpoint = MessagingSwitchboard::portfolio_update_account();
178        msgbus::send_account_state(endpoint, &state);
179        Ok(())
180    }
181
182    fn start(&mut self) -> anyhow::Result<()> {
183        self.core.set_connected();
184        log::info!("Backtest execution client started");
185        Ok(())
186    }
187
188    fn stop(&mut self) -> anyhow::Result<()> {
189        self.core.set_disconnected();
190        log::info!("Backtest execution client stopped");
191        Ok(())
192    }
193
194    fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
195        // Buffer the OrderSubmitted event for deferred processing to avoid
196        // RefCell re-entrancy (exec_engine holds a borrow during execute)
197        let order = self.get_order(&cmd.client_order_id)?;
198        let ts_init = self.clock.borrow().timestamp_ns();
199        let event = self.factory.generate_order_submitted(&order, ts_init);
200        self.queued_events.borrow_mut().push(event);
201
202        if let Some(exchange) = self.exchange.upgrade() {
203            exchange.borrow_mut().send(TradingCommand::SubmitOrder(cmd));
204        } else {
205            log::error!("submit_order: SimulatedExchange has been dropped");
206        }
207        Ok(())
208    }
209
210    fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
211        let ts_init = self.clock.borrow().timestamp_ns();
212
213        let orders: Vec<OrderAny> = self
214            .cache
215            .borrow()
216            .orders_for_ids(&cmd.order_list.client_order_ids, &cmd);
217
218        // Buffer events for deferred processing
219        let mut queued = self.queued_events.borrow_mut();
220
221        for order in &orders {
222            let event = self.factory.generate_order_submitted(order, ts_init);
223            queued.push(event);
224        }
225        drop(queued);
226
227        if let Some(exchange) = self.exchange.upgrade() {
228            exchange
229                .borrow_mut()
230                .send(TradingCommand::SubmitOrderList(cmd));
231        } else {
232            log::error!("submit_order_list: SimulatedExchange has been dropped");
233        }
234        Ok(())
235    }
236
237    fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
238        if let Some(exchange) = self.exchange.upgrade() {
239            exchange.borrow_mut().send(TradingCommand::ModifyOrder(cmd));
240        } else {
241            log::error!("modify_order: SimulatedExchange has been dropped");
242        }
243        Ok(())
244    }
245
246    fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
247        if let Some(exchange) = self.exchange.upgrade() {
248            exchange.borrow_mut().send(TradingCommand::CancelOrder(cmd));
249        } else {
250            log::error!("cancel_order: SimulatedExchange has been dropped");
251        }
252        Ok(())
253    }
254
255    fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
256        if let Some(exchange) = self.exchange.upgrade() {
257            exchange
258                .borrow_mut()
259                .send(TradingCommand::CancelAllOrders(cmd));
260        } else {
261            log::error!("cancel_all_orders: SimulatedExchange has been dropped");
262        }
263        Ok(())
264    }
265
266    fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
267        if let Some(exchange) = self.exchange.upgrade() {
268            exchange
269                .borrow_mut()
270                .send(TradingCommand::BatchCancelOrders(cmd));
271        } else {
272            log::error!("batch_cancel_orders: SimulatedExchange has been dropped");
273        }
274        Ok(())
275    }
276
277    fn query_account(&self, cmd: QueryAccount) -> anyhow::Result<()> {
278        if let Some(exchange) = self.exchange.upgrade() {
279            exchange
280                .borrow_mut()
281                .send(TradingCommand::QueryAccount(cmd));
282        } else {
283            log::error!("query_account: SimulatedExchange has been dropped");
284        }
285        Ok(())
286    }
287
288    fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
289        if let Some(exchange) = self.exchange.upgrade() {
290            exchange.borrow_mut().send(TradingCommand::QueryOrder(cmd));
291        } else {
292            log::error!("query_order: SimulatedExchange has been dropped");
293        }
294        Ok(())
295    }
296}