rustrade 0.2.1

Framework for building high-performance live-trading, paper-trading and back-testing systems
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
use crate::{
    EngineEvent, Sequence,
    engine::{
        action::{
            ActionOutput,
            cancel_orders::CancelOrders,
            close_positions::ClosePositions,
            generate_algo_orders::{GenerateAlgoOrders, GenerateAlgoOrdersOutput},
            send_requests::SendRequests,
        },
        audit::{AuditTick, Auditor, EngineAudit, ProcessAudit, context::EngineContext},
        clock::EngineClock,
        command::Command,
        execution_tx::ExecutionTxMap,
        state::{
            EngineState,
            instrument::data::InstrumentDataState,
            order::{in_flight_recorder::InFlightRequestRecorder, manager::OrderManager},
            position::{PositionExited, PositionId},
            trading::TradingState,
        },
    },
    execution::{AccountStreamEvent, request::ExecutionRequest},
    risk::RiskManager,
    shutdown::SyncShutdown,
    statistic::summary::TradingSummaryGenerator,
    strategy::{
        algo::AlgoStrategy, close_positions::ClosePositionsStrategy,
        on_disconnect::OnDisconnectStrategy, on_trading_disabled::OnTradingDisabled,
    },
};
use chrono::{DateTime, Utc};
use rust_decimal::Decimal;
use rustrade_data::{event::MarketEvent, streams::consumer::MarketStreamEvent};
use rustrade_execution::{
    AccountEvent,
    order::Order,
    trade::{AssetFees, Trade, TradeId},
};
use rustrade_instrument::{
    Side,
    asset::AssetIndex,
    exchange::ExchangeIndex,
    instrument::{InstrumentIndex, kind::option::OptionKind},
};
use rustrade_integration::channel::Tx;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use tracing::{info, warn};

/// Defines how the [`Engine`] actions a [`Command`], and the associated outputs.
pub mod action;

/// Defines an `Engine` audit types as well as utilities for handling the `Engine` `AuditStream`.
///
/// eg/ `StateReplicaManager` component can be used to maintain an `EngineState` replica.
pub mod audit;

/// Defines the [`EngineClock`] interface used to determine the current `Engine` time.
///
/// This flexibility enables back-testing runs to use approximately correct historical timestamps.
pub mod clock;

/// Defines an [`Engine`] [`Command`] - used to give trading directives to the `Engine` from an
/// external process (eg/ ClosePositions).
pub mod command;

/// Defines all possible errors that can occur in the [`Engine`].
pub mod error;

/// Defines the [`ExecutionTxMap`] interface that models a collection of transmitters used to route
/// can `ExecutionRequest` to the appropriate `ExecutionManagers`.
pub mod execution_tx;

/// Defines all state used by the`Engine` to algorithmically trade.
///
/// eg/ `ConnectivityStates`, `AssetStates`, `InstrumentStates`, `Position`, etc.
pub mod state;

/// `Engine` runners for processing input `Events`.
///
/// eg/ `fn sync_run`, `fn sync_run_with_audit`, `fn async_run`, `fn async_run_with_audit`,
pub mod run;

/// Defines how a component processing an input Event and generates an appropriate Audit.
pub trait Processor<Event> {
    type Audit;
    fn process(&mut self, event: Event) -> Self::Audit;
}

/// Process and `Event` with the `Engine` and product an [`AuditTick`] of work done.
pub fn process_with_audit<Event, Engine>(
    engine: &mut Engine,
    event: Event,
) -> AuditTick<Engine::Audit, EngineContext>
where
    Engine: Processor<Event> + Auditor<Engine::Audit, Context = EngineContext>,
{
    let output = engine.process(event);
    engine.audit(output)
}

/// Algorithmic trading `Engine`.
///
/// The `Engine`:
/// * Processes input [`EngineEvent`] (or custom events if implemented).
/// * Maintains the internal [`EngineState`] (instrument data state, open orders, positions, etc.).
/// * Generates algo orders (if `TradingState::Enabled`).
///
/// # Type Parameters
/// * `Clock` - [`EngineClock`] implementation.
/// * `State` - Engine `State` implementation (eg/ [`EngineState`]).
/// * `ExecutionTxs` - [`ExecutionTxMap`] implementation for sending execution requests.
/// * `Strategy` - Trading Strategy implementation (see [`super::strategy`]).
/// * `Risk` - [`RiskManager`] implementation.
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct Engine<Clock, State, ExecutionTxs, Strategy, Risk> {
    pub clock: Clock,
    pub meta: EngineMeta,
    pub state: State,
    pub execution_txs: ExecutionTxs,
    pub strategy: Strategy,
    pub risk: Risk,
}

/// Running [`Engine`] metadata.
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
pub struct EngineMeta {
    /// [`EngineClock`] start timestamp of the current [`Engine`] `run`.
    pub time_start: DateTime<Utc>,
    /// Monotonically increasing [`Sequence`] associated with the number of events processed.
    pub sequence: Sequence,
}

impl<Clock, GlobalData, InstrumentData, ExecutionTxs, Strategy, Risk>
    Processor<EngineEvent<InstrumentData::MarketEventKind>>
    for Engine<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Strategy, Risk>
where
    Clock: EngineClock + for<'a> Processor<&'a EngineEvent<InstrumentData::MarketEventKind>>,
    InstrumentData: InstrumentDataState,
    GlobalData: for<'a> Processor<&'a AccountEvent>
        + for<'a> Processor<&'a MarketEvent<InstrumentIndex, InstrumentData::MarketEventKind>>,
    ExecutionTxs: ExecutionTxMap<ExchangeIndex, InstrumentIndex>,
    Strategy: OnTradingDisabled<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Risk>
        + OnDisconnectStrategy<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Risk>
        + AlgoStrategy<State = EngineState<GlobalData, InstrumentData>>
        + ClosePositionsStrategy<State = EngineState<GlobalData, InstrumentData>>,
    Risk: RiskManager<State = EngineState<GlobalData, InstrumentData>>,
{
    type Audit = EngineAudit<
        EngineEvent<InstrumentData::MarketEventKind>,
        EngineOutput<Strategy::OnTradingDisabled, Strategy::OnDisconnect>,
    >;

    fn process(&mut self, event: EngineEvent<InstrumentData::MarketEventKind>) -> Self::Audit {
        self.clock.process(&event);

        let process_audit = match &event {
            EngineEvent::Shutdown(_) => return EngineAudit::process(event),
            EngineEvent::Command(command) => {
                let output = self.action(command);

                if let Some(unrecoverable) = output.unrecoverable_errors() {
                    return EngineAudit::process_with_output_and_errs(event, unrecoverable, output);
                } else {
                    ProcessAudit::with_output(event, output)
                }
            }
            EngineEvent::TradingStateUpdate(trading_state) => {
                let trading_disabled = self.update_from_trading_state_update(*trading_state);
                ProcessAudit::with_trading_state_update(event, trading_disabled)
            }
            EngineEvent::Account(account) => {
                let output = self.update_from_account_stream(account);
                ProcessAudit::with_account_update(event, output)
            }
            EngineEvent::Market(market) => {
                let output = self.update_from_market_stream(market);
                ProcessAudit::with_market_update(event, output)
            }
            EngineEvent::ContractExpiry(key) => {
                let exited = self.process_contract_expiry(key);
                // Fold all closed positions into the audit as separate PositionExit outputs.
                // In Netting mode this is 0 or 1 entries; in Hedging mode it may be N.
                let mut audit = ProcessAudit::with_event(event);
                for position_exited in exited {
                    audit = audit.add_output(position_exited);
                }
                // ContractExpiry settles regardless of TradingState and does not
                // trigger algo order generation — return early before that check.
                return EngineAudit::from(audit);
            }
        };

        if let TradingState::Enabled = self.state.trading {
            let output = self.generate_algo_orders();

            if output.is_empty() {
                EngineAudit::from(process_audit)
            } else if let Some(unrecoverable) = output.unrecoverable_errors() {
                EngineAudit::Process(process_audit.add_errors(unrecoverable))
            } else {
                EngineAudit::from(process_audit.add_output(output))
            }
        } else {
            EngineAudit::from(process_audit)
        }
    }
}

impl<Clock, GlobalData, InstrumentData, ExecutionTxs, Strategy, Risk> SyncShutdown
    for Engine<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Strategy, Risk>
where
    ExecutionTxs: ExecutionTxMap,
{
    type Result = ();

    fn shutdown(&mut self) -> Self::Result {
        self.execution_txs.iter().for_each(|execution_tx| {
            let _send_result = execution_tx.send(ExecutionRequest::Shutdown);
        });
    }
}

impl<Clock, GlobalData, InstrumentData, ExecutionTxs, Strategy, Risk>
    Engine<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Strategy, Risk>
{
    /// Action an `Engine` [`Command`], producing an [`ActionOutput`] of work done.
    pub fn action(&mut self, command: &Command) -> ActionOutput
    where
        InstrumentData: InFlightRequestRecorder,
        ExecutionTxs: ExecutionTxMap,
        Strategy: ClosePositionsStrategy<State = EngineState<GlobalData, InstrumentData>>,
        Risk: RiskManager,
    {
        match &command {
            Command::SendCancelRequests(requests) => {
                info!(
                    ?requests,
                    "Engine actioning user Command::SendCancelRequests"
                );
                let output = self.send_requests(requests.clone());
                self.state.record_in_flight_cancels(&output.sent);
                ActionOutput::CancelOrders(output)
            }
            Command::SendOpenRequests(requests) => {
                info!(?requests, "Engine actioning user Command::SendOpenRequests");
                let output = self.send_requests(requests.clone());
                self.state.record_in_flight_opens(&output.sent);
                ActionOutput::OpenOrders(output)
            }
            Command::ClosePositions(filter) => {
                info!(?filter, "Engine actioning user Command::ClosePositions");
                ActionOutput::ClosePositions(self.close_positions(filter))
            }
            Command::CancelOrders(filter) => {
                info!(?filter, "Engine actioning user Command::CancelOrders");
                ActionOutput::CancelOrders(self.cancel_orders(filter))
            }
        }
    }

    /// Update the `Engine` [`TradingState`].
    ///
    /// If the `TradingState` transitions to `TradingState::Disabled`, the `Engine` will call
    /// the configured [`OnTradingDisabled`] strategy logic.
    pub fn update_from_trading_state_update(
        &mut self,
        update: TradingState,
    ) -> Option<Strategy::OnTradingDisabled>
    where
        Strategy:
            OnTradingDisabled<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Risk>,
    {
        self.state
            .trading
            .update(update)
            .transitioned_to_disabled()
            .then(|| Strategy::on_trading_disabled(self))
    }

    /// Update the [`Engine`] from an [`AccountStreamEvent`].
    ///
    /// If the input `AccountStreamEvent` indicates the exchange execution link has disconnected,
    /// the `Engine` will call the configured [`OnDisconnectStrategy`] strategy logic.
    pub fn update_from_account_stream(
        &mut self,
        event: &AccountStreamEvent,
    ) -> UpdateFromAccountOutput<Strategy::OnDisconnect>
    where
        InstrumentData: for<'a> Processor<&'a AccountEvent>,
        GlobalData: for<'a> Processor<&'a AccountEvent>,
        Strategy: OnDisconnectStrategy<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Risk>,
    {
        match event {
            AccountStreamEvent::Reconnecting(exchange) => {
                self.state
                    .connectivity
                    .update_from_account_reconnecting(exchange);

                UpdateFromAccountOutput::OnDisconnect(Strategy::on_disconnect(self, *exchange))
            }
            AccountStreamEvent::Item(event) => self
                .state
                .update_from_account(event)
                .map(UpdateFromAccountOutput::PositionExit)
                .unwrap_or(UpdateFromAccountOutput::None),
        }
    }

    /// Update the [`Engine`] from a [`MarketStreamEvent`].
    ///
    /// If the input `MarketStreamEvent` indicates the exchange market data link has disconnected,
    /// the `Engine` will call the configured [`OnDisconnectStrategy`] strategy logic.
    pub fn update_from_market_stream(
        &mut self,
        event: &MarketStreamEvent<InstrumentIndex, InstrumentData::MarketEventKind>,
    ) -> UpdateFromMarketOutput<Strategy::OnDisconnect>
    where
        InstrumentData: InstrumentDataState,
        GlobalData:
            for<'a> Processor<&'a MarketEvent<InstrumentIndex, InstrumentData::MarketEventKind>>,
        Strategy: OnDisconnectStrategy<Clock, EngineState<GlobalData, InstrumentData>, ExecutionTxs, Risk>,
    {
        match event {
            MarketStreamEvent::Reconnecting(exchange) => {
                self.state
                    .connectivity
                    .update_from_market_reconnecting(exchange);

                UpdateFromMarketOutput::OnDisconnect(Strategy::on_disconnect(self, *exchange))
            }
            MarketStreamEvent::Item(event) => {
                self.state.update_from_market(event);
                UpdateFromMarketOutput::None
            }
        }
    }

    /// Returns a [`TradingSummaryGenerator`] for the current trading session.
    pub fn trading_summary_generator(&self, risk_free_return: Decimal) -> TradingSummaryGenerator
    where
        Clock: EngineClock,
    {
        TradingSummaryGenerator::init(
            risk_free_return,
            self.meta.time_start,
            self.time(),
            &self.state.instruments,
            &self.state.assets,
        )
    }

    /// Processes a `ContractExpiry` event for the given `InstrumentIndex`.
    ///
    /// # Algorithm
    /// 1. Guards on `expiration_processed` (idempotent).
    /// 2. Cancels all open orders for the instrument by sending `ExecutionRequest::Cancel` for each.
    /// 3. Derives settlement price from instrument data and contract specification:
    ///    - OTM: settlement price = 0
    ///    - ITM call: settlement = spot - strike (per-contract intrinsic value)
    ///    - ITM put: settlement = strike - spot (per-contract intrinsic value)
    /// 4. Synthesises a closing `Trade` at the settlement price and routes it through
    ///    `instrument_state.update_from_trade`.
    /// 5. Sets `instrument_state.expiration_processed = true`.
    ///
    /// If no position is open for the instrument, steps 3–4 are skipped.
    /// If no market price is available, settlement cannot be computed and the method
    /// logs a warning and returns without synthesising a fill. The `expiration_processed`
    /// flag is **not** set in this case, making the event **retryable** — re-inject
    /// `ContractExpiry` once the underlying spot instrument has received market data.
    ///
    /// # Not modelled (deferred)
    ///
    /// - **Assignment for short writers:** short positions at expiry are closed at intrinsic
    ///   value identical to long positions (OTM at 0, ITM at `|spot − strike|`). True
    ///   assignment — where the short writer is obligated to deliver the underlying — is not
    ///   modelled. A future enhancement would detect net-short positions and open a synthetic
    ///   underlying position instead of (or in addition to) closing the option position.
    ///
    /// - **Physical settlement:** all settlements are cash-equivalent (a synthetic fill at the
    ///   settlement price adjusts PnL). No separate "deliver/receive underlying" position is
    ///   opened. Physically-settled contracts (e.g. some futures-style options) are out of scope
    ///   until this is revisited.
    pub fn process_contract_expiry(
        &mut self,
        key: &InstrumentIndex,
    ) -> Vec<PositionExited<AssetIndex, InstrumentIndex>>
    where
        Clock: EngineClock,
        InstrumentData: InstrumentDataState + InFlightRequestRecorder,
        ExecutionTxs: ExecutionTxMap,
    {
        let instrument_state = self.state.instruments.instrument_index_mut(key);

        // Guard: idempotent — ignore duplicates after first processing.
        if instrument_state.expiration_processed {
            return vec![];
        }

        // Step 2: Cancel all open orders for this instrument.
        let cancel_requests: Vec<_> = instrument_state
            .orders
            .orders()
            .filter_map(Order::to_request_cancel)
            .collect();
        let cancels = self.send_requests(cancel_requests);
        self.state.record_in_flight_cancels(&cancels.sent);

        // Re-borrow after send_requests (which takes &self for execution_txs).
        let instrument_state = self.state.instruments.instrument_index_mut(key);

        // Step 3–4: Synthesise settlement fills only if positions are open.
        if instrument_state.position.positions.is_empty() {
            instrument_state.expiration_processed = true;
            instrument_state.orders.clear();
            instrument_state.exchange_id_to_cid.clear();
            instrument_state.position_ids.clear();
            // Clear pending_fills even when no positions exist — a fill-before-ack race
            // that was in progress at expiry should not accumulate orphaned fills.
            instrument_state.pending_fills.clear();
            return vec![];
        }

        // Derive settlement price from the underlying spot price and contract spec.
        // For options, ITM/OTM determination requires the *underlying's* price, not the
        // option's own market price (which includes premium and would give wrong results).
        // We find the underlying spot instrument by matching the option's underlying.base.
        use rustrade_instrument::instrument::kind::InstrumentKind;
        // Capture both the underlying base key and the exchange so we can filter
        // the spot scan to the same exchange. Without the exchange filter, a
        // multi-exchange setup (e.g. BTC/USD on both Binance and Alpaca) would
        // silently use the wrong exchange's price.
        let option_spec = match &instrument_state.instrument.kind {
            InstrumentKind::Option(_) => Some((
                instrument_state.instrument.underlying.base,
                instrument_state.instrument.underlying.quote,
                instrument_state.instrument.exchange,
            )),
            _ => None,
        };

        let spot_price = match option_spec {
            Some((base_key, quote_key, exchange)) => {
                // Find the spot instrument on the same exchange whose underlying matches
                // the option's underlying base AND quote. Both are required: without the
                // quote filter, BTC/USDT and BTC/USDC options on the same exchange would
                // silently share the same spot price (M3).
                // Single-pass: collect all matching spot instruments so we can both
                // warn on ambiguity (visible in production) and use the first match,
                // without scanning the instrument list twice.
                let spot_matches: Vec<_> = self
                    .state
                    .instruments
                    .0
                    .values()
                    .filter(|s| {
                        matches!(&s.instrument.kind, InstrumentKind::Spot)
                            && s.instrument.underlying.base == base_key
                            && s.instrument.underlying.quote == quote_key
                            && s.instrument.exchange == exchange
                    })
                    .collect();
                if spot_matches.len() > 1 {
                    warn!(
                        count = spot_matches.len(),
                        "process_contract_expiry: multiple Spot instruments match the option \
                         underlying — using the first. Deduplicate your instrument config."
                    );
                }
                spot_matches.into_iter().next().and_then(|s| s.data.price())
            }
            // Non-option instruments: use the instrument's own last price.
            None => self.state.instruments.instrument_index(key).data.price(),
        };

        // Re-borrow mutably after the immutable scan above.
        let instrument_state = self.state.instruments.instrument_index_mut(key);

        let Some(spot_price) = spot_price else {
            warn!(
                instrument = ?key,
                "ContractExpiry: underlying price unavailable — cannot compute settlement. \
                 Ensure the underlying spot instrument is subscribed. \
                 Re-inject ContractExpiry once market data arrives."
            );
            // Do NOT set expiration_processed — the event is retryable once data is available.
            return vec![];
        };

        let settlement_price = match &instrument_state.instrument.kind {
            InstrumentKind::Option(contract) => {
                match contract.kind {
                    OptionKind::Call => {
                        // ITM call: intrinsic = underlying_spot - strike (per-share)
                        // ATM (spot == strike): intrinsic = 0 by cash-settlement convention.
                        if spot_price > contract.strike {
                            spot_price - contract.strike
                        } else {
                            Decimal::ZERO
                        }
                    }
                    OptionKind::Put => {
                        // ITM put: intrinsic = strike - underlying_spot (per-share)
                        // ATM (spot == strike): intrinsic = 0 by cash-settlement convention.
                        if contract.strike > spot_price {
                            contract.strike - spot_price
                        } else {
                            Decimal::ZERO
                        }
                    }
                }
            }
            // Non-option instruments: settlement at current market price.
            _ => spot_price,
        };

        // Collect all position IDs before iterating so we can re-borrow instrument_state
        // mutably inside the loop without conflicting with the keys() borrow.
        let position_ids: Vec<PositionId> = instrument_state
            .position
            .positions
            .keys()
            .cloned()
            .collect();

        // Engine clock time for all synthetic trades in this expiry batch.
        // Using self.time() (not Utc::now()) keeps backtests deterministic.
        let engine_time = self.time();

        let mut exited = Vec::with_capacity(position_ids.len());

        for pos_id in position_ids {
            let instrument_state = self.state.instruments.instrument_index_mut(key);

            let Some(open_position) = instrument_state.position.positions.get(&pos_id) else {
                continue;
            };

            let closing_side = match open_position.side {
                Side::Buy => Side::Sell,
                Side::Sell => Side::Buy,
            };
            let closing_quantity = open_position.quantity_abs;

            // Each synthetic trade gets a unique ID derived from its position ID and
            // the engine clock timestamp. The timestamp component prevents dedup key
            // collisions across engine restarts where expiration_processed was not
            // persisted (Netting mode always uses the same pos_id = "netting").
            // Always heap-allocates (>22 chars): use String directly rather than SmolStr.
            let trade_tag = format!(
                "expiry-settlement-{}-{}",
                pos_id,
                engine_time.timestamp_micros()
            );
            // Use the instrument's quote asset for fee tracking (amount is zero)
            let quote_asset = instrument_state.instrument.underlying.quote;
            let settlement_trade = Trade {
                id: TradeId::new(&trade_tag),
                order_id: rustrade_execution::order::id::OrderId::new(&trade_tag),
                instrument: *key,
                strategy: rustrade_execution::order::id::StrategyId::ENGINE_EXPIRY,
                time_exchange: engine_time,
                side: closing_side,
                price: settlement_price,
                quantity: closing_quantity,
                fees: AssetFees {
                    asset: quote_asset,
                    fees: Decimal::ZERO,
                    fees_quote: Some(Decimal::ZERO),
                },
            };

            // Route settlement directly to the correct position by ID.
            // We bypass InstrumentState::update_from_trade (which calls update_from_trade
            // without a PositionId) because in Hedging mode that would derive the ID from
            // trade.order_id, opening a spurious new position instead of closing the real one.
            //
            // Fee model bypass: settlement_trade.fees is always Decimal::ZERO (set above).
            // The fee model is intentionally not applied — exchange settlement commission,
            // if any, must be accounted for separately by the caller. Callers that configure
            // FeeModelConfig::PerContract for options should note this invariant.
            debug_assert_eq!(
                settlement_trade.fees.fees,
                Decimal::ZERO,
                "settlement trade must carry zero fees before update_from_trade_with_id"
            );
            let contract_size = instrument_state.instrument.kind.contract_size();
            if let Some(exit) = instrument_state.position.update_from_trade_with_id(
                &settlement_trade,
                &pos_id,
                contract_size,
            ) {
                instrument_state.tear_sheet.update_from_position(&exit);
                exited.push(exit);
            }
        }

        // Step 5: Mark as processed and clear all routing tables.
        // No fills will arrive for this instrument post-expiry. Cancel-ack messages
        // for the orders cancelled in step 2 may never arrive (exchanges silently void
        // them), so cleanup_routing_tables() cannot remove the CancelInFlight entries —
        // they would accumulate indefinitely across expiry cycles in Hedging mode.
        // Clear orders, position_ids, exchange_id_to_cid, and pending_fills explicitly,
        // matching the replica's eager-clear logic in StateReplicaManager::update_from_event.
        let instrument_state = self.state.instruments.instrument_index_mut(key);
        instrument_state.expiration_processed = true;
        instrument_state.orders.clear();
        instrument_state.exchange_id_to_cid.clear();
        instrument_state.position_ids.clear();
        // Clear any fills buffered in a fill-before-ack race that was in progress at
        // expiry. Without this, orphaned pending_fills accumulate across expiry cycles
        // in a long-running engine with persisted state.
        instrument_state.pending_fills.clear();

        exited
    }
}

impl<Clock, State, ExecutionTxs, Strategy, Risk> Engine<Clock, State, ExecutionTxs, Strategy, Risk>
where
    Clock: EngineClock,
{
    /// Construct a new `Engine`.
    ///
    /// An initial [`EngineMeta`] is constructed form the provided `clock` and `Sequence(0)`.
    pub fn new(
        clock: Clock,
        state: State,
        execution_txs: ExecutionTxs,
        strategy: Strategy,
        risk: Risk,
    ) -> Self {
        Self {
            meta: EngineMeta {
                time_start: clock.time(),
                sequence: Sequence(0),
            },
            clock,
            state,
            execution_txs,
            strategy,
            risk,
        }
    }

    /// Return `Engine` clock time.
    pub fn time(&self) -> DateTime<Utc> {
        self.clock.time()
    }

    /// Reset the internal `EngineMeta` to the `clock` time and `Sequence(0)`.
    pub fn reset_metadata(&mut self) {
        self.meta.time_start = self.clock.time();
        self.meta.sequence = Sequence(0);
    }
}

/// Output produced by [`Engine`] operations, used to construct an `Engine` [`EngineAudit`].
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
pub enum EngineOutput<
    OnTradingDisabled,
    OnDisconnect,
    ExchangeKey = ExchangeIndex,
    InstrumentKey = InstrumentIndex,
> {
    Commanded(ActionOutput<ExchangeKey, InstrumentKey>),
    OnTradingDisabled(OnTradingDisabled),
    AccountDisconnect(OnDisconnect),
    PositionExit(PositionExited<AssetIndex, InstrumentKey>),
    MarketDisconnect(OnDisconnect),
    AlgoOrders(GenerateAlgoOrdersOutput<ExchangeKey, InstrumentKey>),
}

/// Output produced by the [`Engine`] updating from an [`TradingState`], used to construct
/// an `Engine` [`EngineAudit`].
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
pub enum UpdateTradingStateOutput<OnTradingDisabled> {
    None,
    OnTradingDisabled(OnTradingDisabled),
}

/// Output produced by the [`Engine`] updating from an [`AccountStreamEvent`], used to construct
/// an `Engine` [`EngineAudit`].
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
#[allow(clippy::large_enum_variant)] // PositionExit is rare; avoiding Box keeps API simple
pub enum UpdateFromAccountOutput<OnDisconnect, InstrumentKey = InstrumentIndex> {
    None,
    OnDisconnect(OnDisconnect),
    PositionExit(PositionExited<AssetIndex, InstrumentKey>),
}

/// Output produced by the [`Engine`] updating from an [`MarketStreamEvent`], used to construct
/// an `Engine` [`EngineAudit`].
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
pub enum UpdateFromMarketOutput<OnDisconnect> {
    None,
    OnDisconnect(OnDisconnect),
}

impl<OnTradingDisabled, OnDisconnect, ExchangeKey, InstrumentKey>
    From<ActionOutput<ExchangeKey, InstrumentKey>>
    for EngineOutput<OnTradingDisabled, OnDisconnect, ExchangeKey, InstrumentKey>
{
    fn from(value: ActionOutput<ExchangeKey, InstrumentKey>) -> Self {
        Self::Commanded(value)
    }
}

impl<OnTradingDisabled, OnDisconnect, ExchangeKey, InstrumentKey>
    From<PositionExited<AssetIndex, InstrumentKey>>
    for EngineOutput<OnTradingDisabled, OnDisconnect, ExchangeKey, InstrumentKey>
{
    fn from(value: PositionExited<AssetIndex, InstrumentKey>) -> Self {
        Self::PositionExit(value)
    }
}

impl<OnTradingDisabled, OnDisconnect, ExchangeKey, InstrumentKey>
    From<GenerateAlgoOrdersOutput<ExchangeKey, InstrumentKey>>
    for EngineOutput<OnTradingDisabled, OnDisconnect, ExchangeKey, InstrumentKey>
{
    fn from(value: GenerateAlgoOrdersOutput<ExchangeKey, InstrumentKey>) -> Self {
        Self::AlgoOrders(value)
    }
}