1use ahash::AHashSet;
17use nautilus_common::{actor::DataActor, enums::LogColor, log_info, log_warn, timer::TimeEvent};
18use nautilus_core::{UnixNanos, datetime::secs_to_nanos_unchecked};
19use nautilus_model::{
20 data::{Bar, IndexPriceUpdate, MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick},
21 enums::{ContingencyType, OrderSide, OrderType, TimeInForce},
22 identifiers::{ClientId, ClientOrderId, InstrumentId, StrategyId},
23 instruments::{Instrument, InstrumentAny},
24 orderbook::OrderBook,
25 orders::{Order, OrderAny},
26 types::{Price, price::PriceRaw},
27};
28use nautilus_trading::{
29 nautilus_strategy,
30 strategy::{Strategy, StrategyCore},
31};
32use rust_decimal::{Decimal, prelude::ToPrimitive};
33
34use super::config::ExecTesterConfig;
35
36#[derive(Debug)]
45pub struct ExecTester {
46 pub(super) core: StrategyCore,
47 pub(super) config: ExecTesterConfig,
48 pub(super) instrument: Option<InstrumentAny>,
49 pub(super) price_offset: Option<u64>,
50 pub(super) preinitialized_market_data: bool,
51
52 pub(super) buy_order: Option<OrderAny>,
54 pub(super) sell_order: Option<OrderAny>,
55 pub(super) buy_stop_order: Option<OrderAny>,
56 pub(super) sell_stop_order: Option<OrderAny>,
57 pub(super) open_position_submitted: bool,
58
59 pub(super) modify_rejected_attempted: bool,
62 pub(super) pending_open_position_qty: Option<Decimal>,
63 pub(super) buy_cancel_replace_attempted: bool,
64 pub(super) sell_cancel_replace_attempted: bool,
65 pub(super) buy_stop_cancel_replace_attempted: bool,
66 pub(super) sell_stop_cancel_replace_attempted: bool,
67}
68
69nautilus_strategy!(ExecTester, {
70 fn external_order_claims(&self) -> Option<Vec<InstrumentId>> {
71 self.config.base.external_order_claims.clone()
72 }
73});
74
75impl DataActor for ExecTester {
76 fn on_start(&mut self) -> anyhow::Result<()> {
77 Strategy::on_start(self)?;
78
79 let instrument_id = self.config.instrument_id;
80 let client_id = self.config.client_id;
81
82 let instrument = {
83 let cache = self.cache();
84 cache.instrument(&instrument_id).cloned()
85 };
86
87 if let Some(inst) = instrument {
88 self.initialize_with_instrument(inst, true)?;
89 } else {
90 log::info!("Instrument {instrument_id} not in cache, subscribing...");
91 self.subscribe_instrument(instrument_id, client_id, None);
92
93 if self.config.subscribe_quotes {
96 self.subscribe_quotes(instrument_id, client_id, None);
97 }
98
99 if self.config.subscribe_trades {
100 self.subscribe_trades(instrument_id, client_id, None);
101 }
102 self.preinitialized_market_data =
103 self.config.subscribe_quotes || self.config.subscribe_trades;
104 }
105
106 Ok(())
107 }
108
109 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
110 if instrument.id() == self.config.instrument_id && self.instrument.is_none() {
111 let id = instrument.id();
112 log::info!("Received instrument {id}, initializing...");
113 self.initialize_with_instrument(instrument.clone(), !self.preinitialized_market_data)?;
114 }
115 Ok(())
116 }
117
118 fn on_stop(&mut self) -> anyhow::Result<()> {
119 if self.config.dry_run {
120 log_warn!("Dry run mode, skipping cancel all orders and close all positions");
121 return Ok(());
122 }
123
124 let instrument_id = self.config.instrument_id;
125 let client_id = self.config.client_id;
126 let strategy_id = StrategyId::from(self.core.actor_id.inner().as_str());
127
128 if self.config.cancel_orders_on_stop {
129 self.cancel_active_orders(instrument_id, strategy_id, client_id);
130 }
131
132 if self.config.close_positions_on_stop {
133 let time_in_force = self
134 .config
135 .close_positions_time_in_force
136 .or(Some(TimeInForce::Gtc));
137
138 if let Err(e) = self.close_all_positions(
139 instrument_id,
140 None,
141 client_id,
142 None,
143 time_in_force,
144 Some(self.config.reduce_only_on_stop),
145 None,
146 ) {
147 log::error!("Failed to close all positions: {e}");
148 }
149 }
150
151 if self.config.can_unsubscribe && self.instrument.is_some() {
152 if self.config.subscribe_quotes {
153 self.unsubscribe_quotes(instrument_id, client_id, None);
154 }
155
156 if self.config.subscribe_trades {
157 self.unsubscribe_trades(instrument_id, client_id, None);
158 }
159
160 if self.config.subscribe_book {
161 self.unsubscribe_book_at_interval(
162 instrument_id,
163 self.config.book_interval_ms,
164 client_id,
165 None,
166 );
167 }
168 }
169
170 Ok(())
171 }
172
173 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
174 if self.config.log_data {
175 log_info!("{quote:?}", color = LogColor::Cyan);
176 }
177
178 if quote.instrument_id == self.config.instrument_id
179 && self.config.open_position_on_first_quote
180 {
181 self.submit_pending_open_position();
182 }
183
184 self.maintain_orders(quote.bid_price, quote.ask_price);
185 Ok(())
186 }
187
188 fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
189 if self.config.log_data {
190 log_info!("{trade:?}", color = LogColor::Cyan);
191 }
192 Ok(())
193 }
194
195 fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
196 if self.config.log_data {
197 let num_levels = self.config.book_levels_to_print;
198 let instrument_id = book.instrument_id;
199 let book_str = book.pprint(num_levels, None);
200 log_info!("\n{instrument_id}\n{book_str}", color = LogColor::Cyan);
201
202 if self.is_registered() {
204 let cache = self.cache();
205 if let Some(own_book) = cache.own_order_book(&instrument_id) {
206 let own_book_str = own_book.pprint(num_levels, None);
207 log_info!(
208 "\n{instrument_id} (own)\n{own_book_str}",
209 color = LogColor::Magenta
210 );
211 }
212 }
213 }
214
215 let Some(best_bid) = book.best_bid_price() else {
216 return Ok(()); };
218 let Some(best_ask) = book.best_ask_price() else {
219 return Ok(()); };
221
222 self.maintain_orders(best_bid, best_ask);
223 Ok(())
224 }
225
226 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
227 if self.config.log_data {
228 log_info!("{deltas:?}", color = LogColor::Cyan);
229 }
230 Ok(())
231 }
232
233 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
234 if self.config.log_data {
235 log_info!("{bar:?}", color = LogColor::Cyan);
236 }
237 Ok(())
238 }
239
240 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
241 if self.config.log_data {
242 log_info!("{mark_price:?}", color = LogColor::Cyan);
243 }
244 Ok(())
245 }
246
247 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
248 if self.config.log_data {
249 log_info!("{index_price:?}", color = LogColor::Cyan);
250 }
251 Ok(())
252 }
253
254 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
255 Strategy::on_time_event(self, event)
256 }
257}
258
259impl ExecTester {
260 #[must_use]
262 pub fn new(config: ExecTesterConfig) -> Self {
263 let pending_open_position_qty = config.open_position_on_start_qty;
264
265 Self {
266 core: StrategyCore::new(config.base.clone()),
267 config,
268 instrument: None,
269 price_offset: None,
270 preinitialized_market_data: false,
271 buy_order: None,
272 sell_order: None,
273 buy_stop_order: None,
274 sell_stop_order: None,
275 open_position_submitted: false,
276 modify_rejected_attempted: false,
277 pending_open_position_qty,
278 buy_cancel_replace_attempted: false,
279 sell_cancel_replace_attempted: false,
280 buy_stop_cancel_replace_attempted: false,
281 sell_stop_cancel_replace_attempted: false,
282 }
283 }
284
285 fn initialize_with_instrument(
286 &mut self,
287 instrument: InstrumentAny,
288 subscribe_market_data: bool,
289 ) -> anyhow::Result<()> {
290 let instrument_id = self.config.instrument_id;
291 let client_id = self.config.client_id;
292
293 self.price_offset = Some(self.get_price_offset(&instrument));
294 self.instrument = Some(instrument);
295
296 if subscribe_market_data && self.config.subscribe_quotes {
297 self.subscribe_quotes(instrument_id, client_id, None);
298 }
299
300 if subscribe_market_data && self.config.subscribe_trades {
301 self.subscribe_trades(instrument_id, client_id, None);
302 }
303
304 if self.config.subscribe_book {
305 self.subscribe_book_at_interval(
306 instrument_id,
307 self.config.book_type,
308 self.config.book_depth,
309 self.config.book_interval_ms,
310 client_id,
311 None,
312 );
313 }
314
315 if let Some(qty) = self.pending_open_position_qty {
316 let quote_ready = {
317 let cache = self.cache();
318 cache.quote(&instrument_id).is_some()
319 };
320
321 if self.config.open_position_on_first_quote
322 && self.config.subscribe_quotes
323 && !quote_ready
324 {
325 log::info!("Waiting for first quote before opening {instrument_id} position");
326 } else {
327 self.pending_open_position_qty = None;
328 self.open_position(qty)?;
329 self.open_position_submitted = true;
330 }
331 }
332
333 Ok(())
334 }
335
336 pub(super) fn get_price_offset(&self, _instrument: &InstrumentAny) -> u64 {
337 self.config.tob_offset_ticks
338 }
339
340 fn expire_time_from_delta(&self, mins: u64) -> UnixNanos {
341 let current_ns = self.timestamp_ns();
342 let delta_ns = secs_to_nanos_unchecked((mins * 60) as f64);
343 UnixNanos::from(current_ns.as_u64() + delta_ns)
344 }
345
346 fn resolve_time_in_force(
347 &self,
348 tif_override: Option<TimeInForce>,
349 ) -> (TimeInForce, Option<UnixNanos>) {
350 match (tif_override, self.config.order_expire_time_delta_mins) {
351 (Some(TimeInForce::Gtd), Some(mins)) => {
352 (TimeInForce::Gtd, Some(self.expire_time_from_delta(mins)))
353 }
354 (Some(TimeInForce::Gtd), None) => {
355 log_warn!(
356 "GTD time in force requires order_expire_time_delta_mins, falling back to GTC"
357 );
358 (TimeInForce::Gtc, None)
359 }
360 (Some(tif), _) => (tif, None),
361 (None, Some(mins)) => (TimeInForce::Gtd, Some(self.expire_time_from_delta(mins))),
362 (None, None) => (TimeInForce::Gtc, None),
363 }
364 }
365
366 fn submit_pending_open_position(&mut self) {
367 if self.instrument.is_none() {
368 return;
369 }
370
371 let Some(qty) = self.pending_open_position_qty.take() else {
372 return;
373 };
374
375 if let Err(e) = self.open_position(qty) {
376 log::error!("Failed to submit pending open position: {e}");
377 } else {
378 self.open_position_submitted = true;
379 }
380 }
381
382 pub(super) fn is_order_active(&self, order: &OrderAny) -> bool {
383 order.is_active_local() || order.is_inflight() || order.is_open()
384 }
385
386 pub(super) fn limit_order_is_one_shot(&self) -> bool {
387 self.config.test_reject_post_only
388 || self.config.limit_aggressive
389 || self.config.order_expire_time_delta_mins.is_some()
390 || matches!(
391 self.config.limit_time_in_force,
392 Some(TimeInForce::Ioc | TimeInForce::Fok)
393 )
394 }
395
396 pub(super) fn stop_order_is_one_shot(&self) -> bool {
397 self.config.order_expire_time_delta_mins.is_some()
398 || matches!(
399 self.config.stop_time_in_force,
400 Some(TimeInForce::Ioc | TimeInForce::Fok)
401 )
402 || matches!(self.config.stop_order_type, OrderType::TrailingStopMarket)
403 }
404
405 pub(super) fn get_order_trigger_price(&self, order: &OrderAny) -> Option<Price> {
406 order.trigger_price()
407 }
408
409 fn modify_stop_order(
410 &mut self,
411 order: &OrderAny,
412 trigger_price: Price,
413 limit_price: Option<Price>,
414 ) -> anyhow::Result<()> {
415 let client_id = self.config.client_id;
416
417 match order {
418 OrderAny::StopMarket(_)
419 | OrderAny::MarketIfTouched(_)
420 | OrderAny::TrailingStopMarket(_) => self.modify_order(
421 order.client_order_id(),
422 None,
423 None,
424 Some(trigger_price),
425 client_id,
426 None,
427 ),
428 OrderAny::StopLimit(_) | OrderAny::LimitIfTouched(_) => self.modify_order(
429 order.client_order_id(),
430 None,
431 limit_price,
432 Some(trigger_price),
433 client_id,
434 None,
435 ),
436 _ => {
437 log_warn!("Cannot modify order of type {:?}", order.order_type());
438 Ok(())
439 }
440 }
441 }
442
443 fn submit_order_apply_params(&mut self, order: OrderAny) -> anyhow::Result<()> {
445 let client_id = self.config.client_id;
446 if let Some(params) = &self.config.order_params {
447 self.submit_order(order, None, client_id, Some(params.clone()))
448 } else {
449 self.submit_order(order, None, client_id, None)
450 }
451 }
452
453 pub(super) fn maintain_orders(&mut self, best_bid: Price, best_ask: Price) {
455 if self.instrument.is_none() || self.config.dry_run {
456 return;
457 }
458
459 if self.config.batch_submit_limit_pair
460 && self.config.enable_limit_buys
461 && self.config.enable_limit_sells
462 {
463 self.maintain_batch_limit_pair(best_bid, best_ask);
464 return;
465 }
466
467 if self.config.enable_limit_buys {
468 self.maintain_buy_orders(best_bid, best_ask);
469 }
470
471 if self.config.enable_limit_sells {
472 self.maintain_sell_orders(best_bid, best_ask);
473 }
474
475 if self.config.enable_stop_buys {
476 self.maintain_stop_buy_orders(best_bid, best_ask);
477 }
478
479 if self.config.enable_stop_sells {
480 self.maintain_stop_sell_orders(best_bid, best_ask);
481 }
482 }
483
484 fn refresh_tracked_order(&mut self, side: OrderSide) {
488 let cid = match side {
489 OrderSide::Buy => self.buy_order.as_ref().map(|o| o.client_order_id()),
490 OrderSide::Sell => self.sell_order.as_ref().map(|o| o.client_order_id()),
491 _ => None,
492 };
493 let Some(cid) = cid else {
494 return;
495 };
496 let latest = self.cache().order(&cid).map(|o| o.clone());
497 if let Some(latest) = latest {
498 match side {
499 OrderSide::Buy => self.buy_order = Some(latest),
500 OrderSide::Sell => self.sell_order = Some(latest),
501 _ => {}
502 }
503 }
504 }
505
506 fn refresh_tracked_stop_order(&mut self, side: OrderSide) {
507 let cid = match side {
508 OrderSide::Buy => self.buy_stop_order.as_ref().map(|o| o.client_order_id()),
509 OrderSide::Sell => self.sell_stop_order.as_ref().map(|o| o.client_order_id()),
510 _ => None,
511 };
512 let Some(cid) = cid else {
513 return;
514 };
515 let latest = self.cache().order(&cid).map(|o| o.clone());
516 if let Some(latest) = latest {
517 match side {
518 OrderSide::Buy => self.buy_stop_order = Some(latest),
519 OrderSide::Sell => self.sell_stop_order = Some(latest),
520 _ => {}
521 }
522 }
523 }
524
525 fn maintain_buy_orders(&mut self, best_bid: Price, best_ask: Price) {
527 self.refresh_tracked_order(OrderSide::Buy);
531
532 let Some(instrument) = &self.instrument else {
533 return;
534 };
535 let Some(price_offset_ticks) = self.price_offset else {
536 return;
537 };
538
539 let increment = instrument.price_increment();
540 let precision = instrument.price_precision();
541
542 let cross_spread = self.config.test_reject_post_only || self.config.limit_aggressive;
547 let raw_price = if cross_spread {
548 add_price_ticks(best_ask, increment, price_offset_ticks, precision)
549 } else {
550 sub_price_ticks(best_bid, increment, price_offset_ticks, precision)
551 };
552 let price = clamp_price_to_range(
553 raw_price,
554 instrument,
555 self.config.clamp_to_instrument_price_range,
556 );
557
558 let needs_new_order = match &self.buy_order {
559 None => true,
560 Some(order) => !self.is_order_active(order) && !self.limit_order_is_one_shot(),
561 };
562
563 if needs_new_order {
564 let result = if self.config.enable_brackets {
565 self.submit_bracket_order(OrderSide::Buy, price)
566 } else {
567 self.submit_limit_order(OrderSide::Buy, price)
568 };
569
570 if let Err(e) = result {
571 log::error!("Failed to submit buy order: {e}");
572 }
573 } else if let Some(order) = &self.buy_order
574 && order.venue_order_id().is_some()
575 && !order.is_pending_update()
576 && !order.is_pending_cancel()
577 {
578 let client_id = self.config.client_id;
579
580 if self.config.test_modify_rejected && !self.modify_rejected_attempted {
583 self.modify_rejected_attempted = true;
584 let order_clone = order.clone();
585 let bumped = clamp_price_to_range(
586 add_price_ticks(price, increment, 1, precision),
587 instrument,
588 self.config.clamp_to_instrument_price_range,
589 );
590
591 if let Err(e) = self.modify_order(
592 order_clone.client_order_id(),
593 None,
594 Some(bumped),
595 None,
596 client_id,
597 None,
598 ) {
599 log::error!("Failed to submit test modify on buy order: {e}");
600 }
601 return;
602 }
603
604 if let Some(order_price) = order.price()
605 && order_price < price
606 {
607 if self.config.modify_orders_to_maintain_tob_offset {
608 let order_clone = order.clone();
609 if let Err(e) = self.modify_order(
610 order_clone.client_order_id(),
611 None,
612 Some(price),
613 None,
614 client_id,
615 None,
616 ) {
617 log::error!("Failed to modify buy order: {e}");
618 }
619 } else if self.config.cancel_replace_orders_to_maintain_tob_offset
620 && !self.buy_cancel_replace_attempted
621 {
622 self.buy_cancel_replace_attempted = true;
623 let order_clone = order.clone();
624 let _ = self.cancel_order(order_clone.client_order_id(), client_id, None);
625
626 if let Err(e) = self.submit_limit_order(OrderSide::Buy, price) {
627 log::error!("Failed to submit replacement buy order: {e}");
628 }
629 }
630 }
631 }
632 }
633
634 fn maintain_sell_orders(&mut self, best_bid: Price, best_ask: Price) {
636 self.refresh_tracked_order(OrderSide::Sell);
639
640 let Some(instrument) = &self.instrument else {
641 return;
642 };
643 let Some(price_offset_ticks) = self.price_offset else {
644 return;
645 };
646
647 let increment = instrument.price_increment();
648 let precision = instrument.price_precision();
649
650 let cross_spread = self.config.test_reject_post_only || self.config.limit_aggressive;
652 let raw_price = if cross_spread {
653 sub_price_ticks(best_bid, increment, price_offset_ticks, precision)
654 } else {
655 add_price_ticks(best_ask, increment, price_offset_ticks, precision)
656 };
657 let price = clamp_price_to_range(
658 raw_price,
659 instrument,
660 self.config.clamp_to_instrument_price_range,
661 );
662
663 let needs_new_order = match &self.sell_order {
664 None => true,
665 Some(order) => !self.is_order_active(order) && !self.limit_order_is_one_shot(),
666 };
667
668 if needs_new_order {
669 let result = if self.config.enable_brackets {
670 self.submit_bracket_order(OrderSide::Sell, price)
671 } else {
672 self.submit_limit_order(OrderSide::Sell, price)
673 };
674
675 if let Err(e) = result {
676 log::error!("Failed to submit sell order: {e}");
677 }
678 } else if let Some(order) = &self.sell_order
679 && order.venue_order_id().is_some()
680 && !order.is_pending_update()
681 && !order.is_pending_cancel()
682 {
683 let client_id = self.config.client_id;
684
685 if self.config.test_modify_rejected && !self.modify_rejected_attempted {
687 self.modify_rejected_attempted = true;
688 let order_clone = order.clone();
689 let bumped = clamp_price_to_range(
690 sub_price_ticks(price, increment, 1, precision),
691 instrument,
692 self.config.clamp_to_instrument_price_range,
693 );
694
695 if let Err(e) = self.modify_order(
696 order_clone.client_order_id(),
697 None,
698 Some(bumped),
699 None,
700 client_id,
701 None,
702 ) {
703 log::error!("Failed to submit test modify on sell order: {e}");
704 }
705 return;
706 }
707
708 if let Some(order_price) = order.price()
709 && order_price > price
710 {
711 if self.config.modify_orders_to_maintain_tob_offset {
712 let order_clone = order.clone();
713 if let Err(e) = self.modify_order(
714 order_clone.client_order_id(),
715 None,
716 Some(price),
717 None,
718 client_id,
719 None,
720 ) {
721 log::error!("Failed to modify sell order: {e}");
722 }
723 } else if self.config.cancel_replace_orders_to_maintain_tob_offset
724 && !self.sell_cancel_replace_attempted
725 {
726 self.sell_cancel_replace_attempted = true;
727 let order_clone = order.clone();
728 let _ = self.cancel_order(order_clone.client_order_id(), client_id, None);
729
730 if let Err(e) = self.submit_limit_order(OrderSide::Sell, price) {
731 log::error!("Failed to submit replacement sell order: {e}");
732 }
733 }
734 }
735 }
736 }
737
738 fn maintain_batch_limit_pair(&mut self, best_bid: Price, best_ask: Price) {
740 self.refresh_tracked_order(OrderSide::Buy);
744 self.refresh_tracked_order(OrderSide::Sell);
745
746 let Some(instrument) = &self.instrument else {
747 return;
748 };
749 let Some(price_offset_ticks) = self.price_offset else {
750 return;
751 };
752
753 let buy_needs = match &self.buy_order {
754 None => true,
755 Some(order) => !self.is_order_active(order) && !self.limit_order_is_one_shot(),
756 };
757 let sell_needs = match &self.sell_order {
758 None => true,
759 Some(order) => !self.is_order_active(order) && !self.limit_order_is_one_shot(),
760 };
761
762 if !buy_needs || !sell_needs {
763 return;
764 }
765
766 let increment = instrument.price_increment();
767 let precision = instrument.price_precision();
768
769 let cross_spread = self.config.test_reject_post_only || self.config.limit_aggressive;
773 let (raw_buy_price, raw_sell_price) = if cross_spread {
774 (
775 add_price_ticks(best_ask, increment, price_offset_ticks, precision),
776 sub_price_ticks(best_bid, increment, price_offset_ticks, precision),
777 )
778 } else {
779 (
780 sub_price_ticks(best_bid, increment, price_offset_ticks, precision),
781 add_price_ticks(best_ask, increment, price_offset_ticks, precision),
782 )
783 };
784 let clamp = self.config.clamp_to_instrument_price_range;
785 let buy_price = clamp_price_to_range(raw_buy_price, instrument, clamp);
786 let sell_price = clamp_price_to_range(raw_sell_price, instrument, clamp);
787 let quantity = instrument.make_qty(self.config.order_qty.as_f64(), None);
788 let (time_in_force, expire_time) =
789 self.resolve_time_in_force(self.config.limit_time_in_force);
790
791 let buy_order = self.core.order_factory().limit(
792 self.config.instrument_id,
793 OrderSide::Buy,
794 quantity,
795 buy_price,
796 Some(time_in_force),
797 expire_time,
798 Some(self.config.use_post_only || self.config.test_reject_post_only),
799 None,
800 Some(self.config.use_quote_quantity),
801 self.config.order_display_qty,
802 self.config.emulation_trigger,
803 None,
804 None,
805 None,
806 None,
807 None,
808 );
809
810 let sell_order = self.core.order_factory().limit(
811 self.config.instrument_id,
812 OrderSide::Sell,
813 quantity,
814 sell_price,
815 Some(time_in_force),
816 expire_time,
817 Some(self.config.use_post_only || self.config.test_reject_post_only),
818 None,
819 Some(self.config.use_quote_quantity),
820 self.config.order_display_qty,
821 self.config.emulation_trigger,
822 None,
823 None,
824 None,
825 None,
826 None,
827 );
828
829 self.buy_order = Some(buy_order.clone());
830 self.sell_order = Some(sell_order.clone());
831
832 let client_id = self.config.client_id;
833 if let Err(e) = self.submit_order_list(vec![buy_order, sell_order], None, client_id, None) {
834 log::error!("Failed to submit batch limit pair: {e}");
835 }
836 }
837
838 fn maintain_stop_buy_orders(&mut self, best_bid: Price, best_ask: Price) {
840 self.refresh_tracked_stop_order(OrderSide::Buy);
841
842 let Some(instrument) = &self.instrument else {
843 return;
844 };
845
846 let increment = instrument.price_increment();
847 let precision = instrument.price_precision();
848 let stop_offset_ticks = self.config.stop_offset_ticks;
849
850 let raw_trigger_price = if matches!(
852 self.config.stop_order_type,
853 OrderType::LimitIfTouched | OrderType::MarketIfTouched | OrderType::TrailingStopMarket
854 ) {
855 sub_price_ticks(best_bid, increment, stop_offset_ticks, precision)
857 } else {
858 add_price_ticks(best_ask, increment, stop_offset_ticks, precision)
860 };
861 let clamp = self.config.clamp_to_instrument_price_range;
862 let trigger_price = clamp_price_to_range(raw_trigger_price, instrument, clamp);
863
864 let limit_price = if matches!(
866 self.config.stop_order_type,
867 OrderType::StopLimit | OrderType::LimitIfTouched
868 ) {
869 let raw_limit = if let Some(limit_offset_ticks) = self.config.stop_limit_offset_ticks {
870 add_price_ticks(trigger_price, increment, limit_offset_ticks, precision)
872 } else {
873 trigger_price
874 };
875 Some(clamp_price_to_range(raw_limit, instrument, clamp))
876 } else {
877 None
878 };
879
880 let needs_new_order = match &self.buy_stop_order {
881 None => true,
882 Some(order) => !self.is_order_active(order) && !self.stop_order_is_one_shot(),
883 };
884
885 if needs_new_order {
886 if let Err(e) = self.submit_stop_order(OrderSide::Buy, trigger_price, limit_price) {
887 log::error!("Failed to submit buy stop order: {e}");
888 }
889 } else if let Some(order) = &self.buy_stop_order
890 && order.venue_order_id().is_some()
891 && !order.is_pending_update()
892 && !order.is_pending_cancel()
893 {
894 let current_trigger = self.get_order_trigger_price(order);
895 if current_trigger.is_some() && current_trigger != Some(trigger_price) {
896 if self.config.modify_stop_orders_to_maintain_offset {
897 let order_clone = order.clone();
898 if let Err(e) = self.modify_stop_order(&order_clone, trigger_price, limit_price)
899 {
900 log::error!("Failed to modify buy stop order: {e}");
901 }
902 } else if self.config.cancel_replace_stop_orders_to_maintain_offset
903 && !self.buy_stop_cancel_replace_attempted
904 {
905 self.buy_stop_cancel_replace_attempted = true;
906 let order_clone = order.clone();
907 let _ = self.cancel_order(
908 order_clone.client_order_id(),
909 self.config.client_id,
910 None,
911 );
912
913 if let Err(e) =
914 self.submit_stop_order(OrderSide::Buy, trigger_price, limit_price)
915 {
916 log::error!("Failed to submit replacement buy stop order: {e}");
917 }
918 }
919 }
920 }
921 }
922
923 fn maintain_stop_sell_orders(&mut self, best_bid: Price, best_ask: Price) {
925 self.refresh_tracked_stop_order(OrderSide::Sell);
926
927 let Some(instrument) = &self.instrument else {
928 return;
929 };
930
931 let increment = instrument.price_increment();
932 let precision = instrument.price_precision();
933 let stop_offset_ticks = self.config.stop_offset_ticks;
934
935 let raw_trigger_price = if matches!(
937 self.config.stop_order_type,
938 OrderType::LimitIfTouched | OrderType::MarketIfTouched | OrderType::TrailingStopMarket
939 ) {
940 add_price_ticks(best_ask, increment, stop_offset_ticks, precision)
942 } else {
943 sub_price_ticks(best_bid, increment, stop_offset_ticks, precision)
945 };
946 let clamp = self.config.clamp_to_instrument_price_range;
947 let trigger_price = clamp_price_to_range(raw_trigger_price, instrument, clamp);
948
949 let limit_price = if matches!(
951 self.config.stop_order_type,
952 OrderType::StopLimit | OrderType::LimitIfTouched
953 ) {
954 let raw_limit = if let Some(limit_offset_ticks) = self.config.stop_limit_offset_ticks {
955 sub_price_ticks(trigger_price, increment, limit_offset_ticks, precision)
957 } else {
958 trigger_price
959 };
960 Some(clamp_price_to_range(raw_limit, instrument, clamp))
961 } else {
962 None
963 };
964
965 let needs_new_order = match &self.sell_stop_order {
966 None => true,
967 Some(order) => !self.is_order_active(order) && !self.stop_order_is_one_shot(),
968 };
969
970 if needs_new_order {
971 if let Err(e) = self.submit_stop_order(OrderSide::Sell, trigger_price, limit_price) {
972 log::error!("Failed to submit sell stop order: {e}");
973 }
974 } else if let Some(order) = &self.sell_stop_order
975 && order.venue_order_id().is_some()
976 && !order.is_pending_update()
977 && !order.is_pending_cancel()
978 {
979 let current_trigger = self.get_order_trigger_price(order);
980 if current_trigger.is_some() && current_trigger != Some(trigger_price) {
981 if self.config.modify_stop_orders_to_maintain_offset {
982 let order_clone = order.clone();
983 if let Err(e) = self.modify_stop_order(&order_clone, trigger_price, limit_price)
984 {
985 log::error!("Failed to modify sell stop order: {e}");
986 }
987 } else if self.config.cancel_replace_stop_orders_to_maintain_offset
988 && !self.sell_stop_cancel_replace_attempted
989 {
990 self.sell_stop_cancel_replace_attempted = true;
991 let order_clone = order.clone();
992 let _ = self.cancel_order(
993 order_clone.client_order_id(),
994 self.config.client_id,
995 None,
996 );
997
998 if let Err(e) =
999 self.submit_stop_order(OrderSide::Sell, trigger_price, limit_price)
1000 {
1001 log::error!("Failed to submit replacement sell stop order: {e}");
1002 }
1003 }
1004 }
1005 }
1006 }
1007
1008 pub(super) fn submit_limit_order(
1014 &mut self,
1015 order_side: OrderSide,
1016 price: Price,
1017 ) -> anyhow::Result<()> {
1018 let Some(instrument) = &self.instrument else {
1019 anyhow::bail!("No instrument loaded");
1020 };
1021
1022 if self.config.dry_run {
1023 log_warn!("Dry run, skipping create {order_side:?} order");
1024 return Ok(());
1025 }
1026
1027 if order_side == OrderSide::Buy && !self.config.enable_limit_buys {
1028 log_warn!("BUY orders not enabled, skipping");
1029 return Ok(());
1030 } else if order_side == OrderSide::Sell && !self.config.enable_limit_sells {
1031 log_warn!("SELL orders not enabled, skipping");
1032 return Ok(());
1033 }
1034
1035 let (time_in_force, expire_time) =
1036 self.resolve_time_in_force(self.config.limit_time_in_force);
1037
1038 let quantity = instrument.make_qty(self.config.order_qty.as_f64(), None);
1039
1040 let order = self.core.order_factory().limit(
1041 self.config.instrument_id,
1042 order_side,
1043 quantity,
1044 price,
1045 Some(time_in_force),
1046 expire_time,
1047 Some(self.config.use_post_only || self.config.test_reject_post_only),
1048 None, Some(self.config.use_quote_quantity),
1050 self.config.order_display_qty,
1051 self.config.emulation_trigger,
1052 None, None, None, None, None, );
1058
1059 if order_side == OrderSide::Buy {
1060 self.buy_order = Some(order.clone());
1061 } else {
1062 self.sell_order = Some(order.clone());
1063 }
1064
1065 self.submit_order_apply_params(order)
1066 }
1067
1068 pub(super) fn submit_stop_order(
1074 &mut self,
1075 order_side: OrderSide,
1076 trigger_price: Price,
1077 limit_price: Option<Price>,
1078 ) -> anyhow::Result<()> {
1079 let Some(instrument) = &self.instrument else {
1080 anyhow::bail!("No instrument loaded");
1081 };
1082
1083 if self.config.dry_run {
1084 log_warn!("Dry run, skipping create {order_side:?} stop order");
1085 return Ok(());
1086 }
1087
1088 if order_side == OrderSide::Buy && !self.config.enable_stop_buys {
1089 log_warn!("BUY stop orders not enabled, skipping");
1090 return Ok(());
1091 } else if order_side == OrderSide::Sell && !self.config.enable_stop_sells {
1092 log_warn!("SELL stop orders not enabled, skipping");
1093 return Ok(());
1094 }
1095
1096 let (time_in_force, expire_time) =
1097 self.resolve_time_in_force(self.config.stop_time_in_force);
1098
1099 let quantity = instrument.make_qty(self.config.order_qty.as_f64(), None);
1101
1102 let factory = self.core.order_factory();
1103
1104 let mut order: OrderAny = match self.config.stop_order_type {
1105 OrderType::StopMarket => factory.stop_market(
1106 self.config.instrument_id,
1107 order_side,
1108 quantity,
1109 trigger_price,
1110 Some(self.config.stop_trigger_type),
1111 Some(time_in_force),
1112 expire_time,
1113 None, Some(self.config.use_quote_quantity),
1115 None, self.config.emulation_trigger,
1117 None, None, None, None, None, ),
1123 OrderType::StopLimit => {
1124 let Some(limit_price) = limit_price else {
1125 anyhow::bail!("STOP_LIMIT order requires limit_price");
1126 };
1127 factory.stop_limit(
1128 self.config.instrument_id,
1129 order_side,
1130 quantity,
1131 limit_price,
1132 trigger_price,
1133 Some(self.config.stop_trigger_type),
1134 Some(time_in_force),
1135 expire_time,
1136 None, None, Some(self.config.use_quote_quantity),
1139 self.config.order_display_qty,
1140 self.config.emulation_trigger,
1141 None, None, None, None, None, )
1147 }
1148 OrderType::MarketIfTouched => factory.market_if_touched(
1149 self.config.instrument_id,
1150 order_side,
1151 quantity,
1152 trigger_price,
1153 Some(self.config.stop_trigger_type),
1154 Some(time_in_force),
1155 expire_time,
1156 None, Some(self.config.use_quote_quantity),
1158 self.config.emulation_trigger,
1159 None, None, None, None, None, ),
1165 OrderType::LimitIfTouched => {
1166 let Some(limit_price) = limit_price else {
1167 anyhow::bail!("LIMIT_IF_TOUCHED order requires limit_price");
1168 };
1169 factory.limit_if_touched(
1170 self.config.instrument_id,
1171 order_side,
1172 quantity,
1173 limit_price,
1174 trigger_price,
1175 Some(self.config.stop_trigger_type),
1176 Some(time_in_force),
1177 expire_time,
1178 None, None, Some(self.config.use_quote_quantity),
1181 self.config.order_display_qty,
1182 self.config.emulation_trigger,
1183 None, None, None, None, None, )
1189 }
1190 OrderType::TrailingStopMarket => {
1191 let Some(trailing_offset) = self.config.trailing_offset else {
1192 anyhow::bail!("TRAILING_STOP_MARKET order requires trailing_offset config");
1193 };
1194 factory.trailing_stop_market(
1195 self.config.instrument_id,
1196 order_side,
1197 quantity,
1198 trailing_offset,
1199 Some(self.config.trailing_offset_type),
1200 None,
1201 Some(trigger_price),
1202 Some(self.config.stop_trigger_type),
1203 Some(time_in_force),
1204 expire_time,
1205 None, Some(self.config.use_quote_quantity),
1207 None, self.config.emulation_trigger,
1209 None, None, None, None, None, )
1215 }
1216 _ => {
1217 anyhow::bail!("Unknown stop order type: {:?}", self.config.stop_order_type);
1218 }
1219 };
1220
1221 if let OrderAny::TrailingStopMarket(order) = &mut order {
1222 order.activation_price = Some(trigger_price);
1223 }
1224
1225 if order_side == OrderSide::Buy {
1226 self.buy_stop_order = Some(order.clone());
1227 } else {
1228 self.sell_stop_order = Some(order.clone());
1229 }
1230
1231 self.submit_order_apply_params(order)
1232 }
1233
1234 pub(super) fn submit_bracket_order(
1240 &mut self,
1241 order_side: OrderSide,
1242 entry_price: Price,
1243 ) -> anyhow::Result<()> {
1244 let Some(instrument) = &self.instrument else {
1245 anyhow::bail!("No instrument loaded");
1246 };
1247
1248 if self.config.dry_run {
1249 log_warn!("Dry run, skipping create {order_side:?} bracket order");
1250 return Ok(());
1251 }
1252
1253 if self.config.bracket_entry_order_type != OrderType::Limit {
1254 anyhow::bail!(
1255 "Only Limit entry orders are supported for brackets, was {:?}",
1256 self.config.bracket_entry_order_type
1257 );
1258 }
1259
1260 if order_side == OrderSide::Buy && !self.config.enable_limit_buys {
1261 log_warn!("BUY orders not enabled, skipping bracket");
1262 return Ok(());
1263 } else if order_side == OrderSide::Sell && !self.config.enable_limit_sells {
1264 log_warn!("SELL orders not enabled, skipping bracket");
1265 return Ok(());
1266 }
1267
1268 let (time_in_force, expire_time) =
1269 self.resolve_time_in_force(self.config.limit_time_in_force);
1270 let sl_time_in_force = self.config.stop_time_in_force.unwrap_or(TimeInForce::Gtc);
1271 if sl_time_in_force == TimeInForce::Gtd {
1272 anyhow::bail!("GTD time in force not supported for bracket stop-loss legs");
1273 }
1274
1275 let quantity = instrument.make_qty(self.config.order_qty.as_f64(), None);
1276 let increment = instrument.price_increment();
1277 let precision = instrument.price_precision();
1278 let bracket_offset_ticks = self.config.bracket_offset_ticks;
1279
1280 let (raw_tp_price, raw_sl_trigger_price) = match order_side {
1281 OrderSide::Buy => {
1282 let tp = add_price_ticks(entry_price, increment, bracket_offset_ticks, precision);
1283 let sl = sub_price_ticks(entry_price, increment, bracket_offset_ticks, precision);
1284 (tp, sl)
1285 }
1286 OrderSide::Sell => {
1287 let tp = sub_price_ticks(entry_price, increment, bracket_offset_ticks, precision);
1288 let sl = add_price_ticks(entry_price, increment, bracket_offset_ticks, precision);
1289 (tp, sl)
1290 }
1291 _ => anyhow::bail!("Invalid order side for bracket: {order_side:?}"),
1292 };
1293 let clamp = self.config.clamp_to_instrument_price_range;
1294 let tp_price = clamp_price_to_range(raw_tp_price, instrument, clamp);
1295 let sl_trigger_price = clamp_price_to_range(raw_sl_trigger_price, instrument, clamp);
1296
1297 let entry_post_only = self.config.use_post_only || self.config.test_reject_post_only;
1298 let orders = self
1299 .core
1300 .order_factory()
1301 .bracket()
1302 .instrument_id(self.config.instrument_id)
1303 .order_side(order_side)
1304 .quantity(quantity)
1305 .quote_quantity(self.config.use_quote_quantity)
1306 .entry_order_type(OrderType::Limit)
1307 .entry_price(entry_price)
1308 .time_in_force(time_in_force)
1309 .entry_post_only(entry_post_only)
1310 .maybe_emulation_trigger(self.config.emulation_trigger)
1311 .maybe_expire_time(expire_time)
1312 .tp_price(tp_price)
1313 .tp_post_only(entry_post_only)
1314 .tp_time_in_force(time_in_force)
1315 .sl_trigger_price(sl_trigger_price)
1316 .sl_trigger_type(self.config.stop_trigger_type)
1317 .sl_time_in_force(sl_time_in_force)
1318 .call();
1319
1320 if let Some(entry_order) = orders.first() {
1321 if order_side == OrderSide::Buy {
1322 self.buy_order = Some(entry_order.clone());
1323 } else {
1324 self.sell_order = Some(entry_order.clone());
1325 }
1326 }
1327
1328 let client_id = self.config.client_id;
1329 if let Some(params) = &self.config.order_params {
1330 self.submit_order_list(orders, None, client_id, Some(params.clone()))
1331 } else {
1332 self.submit_order_list(orders, None, client_id, None)
1333 }
1334 }
1335
1336 pub(super) fn open_position(&mut self, net_qty: Decimal) -> anyhow::Result<()> {
1342 let Some(instrument) = &self.instrument else {
1343 anyhow::bail!("No instrument loaded");
1344 };
1345
1346 if net_qty == Decimal::ZERO {
1347 log_warn!("Open position with zero quantity, skipping");
1348 return Ok(());
1349 }
1350
1351 let order_side = if net_qty > Decimal::ZERO {
1352 OrderSide::Buy
1353 } else {
1354 OrderSide::Sell
1355 };
1356
1357 let quantity = instrument.make_qty(net_qty.abs().to_f64().unwrap_or(0.0), None);
1358
1359 let reduce_only = if self.config.test_reject_reduce_only {
1361 Some(true)
1362 } else {
1363 None
1364 };
1365
1366 let order = self.core.order_factory().market(
1367 self.config.instrument_id,
1368 order_side,
1369 quantity,
1370 Some(self.config.open_position_time_in_force),
1371 reduce_only,
1372 Some(self.config.use_quote_quantity),
1373 None, None, None, None, );
1378
1379 self.submit_order_apply_params(order)
1380 }
1381
1382 pub(super) fn cancel_active_orders(
1383 &mut self,
1384 instrument_id: InstrumentId,
1385 strategy_id: StrategyId,
1386 client_id: Option<ClientId>,
1387 ) {
1388 let bracket_targets: Vec<ClientOrderId> = {
1391 let cache = self.cache();
1392 let mut targets = Vec::new();
1393
1394 for order_list in
1395 cache.order_lists(None, Some(&instrument_id), Some(&strategy_id), None)
1396 {
1397 let is_bracket = order_list.client_order_ids.iter().any(|cid| {
1398 cache
1399 .order(cid)
1400 .is_some_and(|o| is_in_contingency_group(&o))
1401 });
1402
1403 if !is_bracket {
1404 continue;
1405 }
1406
1407 for cid in &order_list.client_order_ids {
1408 if let Some(order) = cache.order(cid)
1409 && !order.is_closed()
1410 && !order.is_pending_cancel()
1411 {
1412 targets.push(*cid);
1413 }
1414 }
1415 }
1416 targets
1417 };
1418
1419 for cid in bracket_targets {
1420 if let Err(e) = self.cancel_order(cid, client_id, None) {
1421 log::error!("Failed to cancel bracket leg {cid}: {e}");
1422 }
1423 }
1424
1425 if self.config.use_individual_cancels_on_stop {
1426 for cid in self.collect_cancellable_order_ids(instrument_id, strategy_id) {
1427 if let Err(e) = self.cancel_order(cid, client_id, None) {
1428 log::error!("Failed to cancel order {cid}: {e}");
1429 }
1430 }
1431 } else if self.config.use_batch_cancel_on_stop {
1432 let candidates = self.collect_cancellable_orders(instrument_id, strategy_id);
1433 let mut batchable: Vec<ClientOrderId> = Vec::new();
1434
1435 for order in candidates {
1436 let cid = order.client_order_id();
1437 if order.is_emulated() || order.is_active_local() {
1438 if let Err(e) = self.cancel_order(cid, client_id, None) {
1439 log::error!("Failed to cancel local order {cid}: {e}");
1440 }
1441 } else {
1442 batchable.push(cid);
1443 }
1444 }
1445
1446 if !batchable.is_empty()
1447 && let Err(e) = self.cancel_orders(batchable, client_id, None)
1448 {
1449 log::error!("Failed to batch cancel orders: {e}");
1450 }
1451 } else {
1452 let local_ids: Vec<ClientOrderId> = {
1455 let cache = self.cache();
1456 cache
1457 .orders_active_local(None, Some(&instrument_id), Some(&strategy_id), None, None)
1458 .into_iter()
1459 .filter(|o| {
1460 !o.is_closed() && !o.is_pending_cancel() && !is_in_contingency_group(o)
1461 })
1462 .map(|o| o.client_order_id())
1463 .collect()
1464 };
1465
1466 for cid in local_ids {
1467 if let Err(e) = self.cancel_order(cid, client_id, None) {
1468 log::error!("Failed to cancel active-local order {cid}: {e}");
1469 }
1470 }
1471
1472 if let Err(e) = self.cancel_all_orders(instrument_id, None, client_id, None) {
1473 log::error!("Failed to cancel all orders: {e}");
1474 }
1475 }
1476 }
1477
1478 pub(super) fn collect_cancellable_orders(
1479 &self,
1480 instrument_id: InstrumentId,
1481 strategy_id: StrategyId,
1482 ) -> Vec<OrderAny> {
1483 let cache = self.cache();
1484 let mut seen: AHashSet<ClientOrderId> = AHashSet::new();
1485 let mut candidates: Vec<OrderAny> = Vec::new();
1486 let sources = [
1489 cache.orders_active_local(None, Some(&instrument_id), Some(&strategy_id), None, None),
1490 cache.orders_emulated(None, Some(&instrument_id), Some(&strategy_id), None, None),
1491 cache.orders_inflight(None, Some(&instrument_id), Some(&strategy_id), None, None),
1492 cache.orders_open(None, Some(&instrument_id), Some(&strategy_id), None, None),
1493 ];
1494
1495 for orders in sources {
1496 for order in orders {
1497 if order.is_closed() || order.is_pending_cancel() || is_in_contingency_group(&order)
1498 {
1499 continue;
1500 }
1501 let cid = order.client_order_id();
1502 if seen.insert(cid) {
1503 candidates.push(order.cloned());
1504 }
1505 }
1506 }
1507 candidates
1508 }
1509
1510 pub(super) fn collect_cancellable_order_ids(
1511 &self,
1512 instrument_id: InstrumentId,
1513 strategy_id: StrategyId,
1514 ) -> Vec<ClientOrderId> {
1515 self.collect_cancellable_orders(instrument_id, strategy_id)
1516 .into_iter()
1517 .map(|o| o.client_order_id())
1518 .collect()
1519 }
1520}
1521
1522fn add_price_ticks(base: Price, increment: Price, ticks: u64, precision: u8) -> Price {
1523 let offset_raw = increment.raw * ticks as PriceRaw;
1524 Price::from_raw(base.raw + offset_raw, precision)
1525}
1526
1527fn sub_price_ticks(base: Price, increment: Price, ticks: u64, precision: u8) -> Price {
1528 let offset_raw = increment.raw * ticks as PriceRaw;
1529 Price::from_raw(base.raw - offset_raw, precision)
1530}
1531
1532fn is_in_contingency_group(order: &OrderAny) -> bool {
1535 matches!(
1536 order.contingency_type(),
1537 Some(ContingencyType::Oto | ContingencyType::Oco | ContingencyType::Ouo)
1538 )
1539}
1540
1541fn clamp_price_to_range(price: Price, instrument: &InstrumentAny, enabled: bool) -> Price {
1542 if !enabled {
1543 return price;
1544 }
1545 let mut clamped = price;
1546 if let Some(min) = instrument.min_price()
1547 && clamped < min
1548 {
1549 clamped = min;
1550 }
1551
1552 if let Some(max) = instrument.max_price()
1553 && clamped > max
1554 {
1555 clamped = max;
1556 }
1557
1558 clamped
1559}