Skip to main content

rustrade/
execution.rs

1//! Framework-side execution: subscribes to the [`MarketDataBus`] and
2//! routes each event through the risk gates to the [`ExchangeClient`].
3//!
4//! # Pre-trade gate sequence
5//!
6//! For every non-`Hold` decision the brain emits, the gates run in this
7//! exact order. Each one that blocks emits a structured `tracing` event
8//! and the order is not placed.
9//!
10//! 1. `SessionPnl::is_session_halted(symbol)` — daily drawdown cap.
11//! 2. `CircuitBreaker::is_tripped(symbol)` — rolling-window loss breaker.
12//! 3. `PositionSizer::contracts(price, contract_value)` — `0` means the
13//!    sized order would be too small to send.
14//!
15//! The first three are *risk-layer* concerns and never raise an error
16//! that aborts the service. A `Decision::Close` for a flat position is
17//! also a silent skip (logged at debug level).
18
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicU64, Ordering};
22
23use async_trait::async_trait;
24use chrono::Utc;
25use rustrade_core::{
26    Brain, Capability, Decision, ExchangeClient, MarketDataBus, MarketDataEvent, Order, OrderKind,
27    Position, Price, Side, Signal, SignalBus, SignalType, SizeHint, StopAttachment, Symbol, Volume,
28};
29use rustrade_risk::{PortfolioState, PositionSizer, SizingConfig};
30use rustrade_supervisor::{RestartPolicy, TradingService};
31use tokio::sync::broadcast::error::RecvError;
32use tokio_util::sync::CancellationToken;
33
34use crate::risk_state::{PortfolioRiskState, PositionCache, RiskStateMap};
35
36/// Shared inputs every [`ExecutionService`] needs.
37///
38/// Cheaply cloneable. Constructed once by [`Bot`](crate::Bot) and
39/// shared across all per-brain execution services.
40#[derive(Clone)]
41pub(crate) struct ExecutionContext {
42    pub exchange: Arc<dyn ExchangeClient>,
43    pub bus: MarketDataBus,
44    pub signals: SignalBus,
45    pub positions: PositionCache,
46    pub risk: RiskStateMap,
47    /// Account-wide portfolio risk: the pre-trade gate reads it (entries only);
48    /// the risk sweep maintains its daily-loss latch.
49    pub portfolio: PortfolioRiskState,
50    /// Per-symbol sizing resolver (default + overrides). The execution
51    /// service sizes each order with the config for the event's symbol.
52    pub sizing: Arc<SymbolSizing>,
53    /// Set when order tracking is wired (`Bot::with_order_tracking`) and the
54    /// adapter advertises `Capability::OrderTracking`. Resting orders the
55    /// service places are recorded here so the reaper can age them out.
56    pub order_tracker: Option<crate::order_tracker::OrderTracker>,
57    /// Set when bracket (SL+TP / OCO) orders are active — the adapter
58    /// supports `StopOrders` + `OrderTracking` and a fill source is wired.
59    /// A market entry carrying both `stop_price` and `take_profit_price`
60    /// then gets two reduce-only protective orders registered as an OCO
61    /// pair here; the `FillRoutingService` cancels the sibling on fill.
62    pub oco: Option<crate::order_tracker::OcoRegistry>,
63}
64
65/// Resolves the [`SizingConfig`] to use for a given symbol: a per-symbol
66/// override if one exists, otherwise the bot-wide default.
67pub(crate) struct SymbolSizing {
68    default: SizingConfig,
69    per_symbol: HashMap<Symbol, SizingConfig>,
70}
71
72impl SymbolSizing {
73    pub(crate) fn new(default: SizingConfig, per_symbol: HashMap<Symbol, SizingConfig>) -> Self {
74        Self {
75            default,
76            per_symbol,
77        }
78    }
79
80    pub(crate) fn for_symbol(&self, symbol: &Symbol) -> &SizingConfig {
81        self.per_symbol.get(symbol).unwrap_or(&self.default)
82    }
83}
84
85/// Per-brain execution loop with full risk gating + order placement.
86pub struct ExecutionService {
87    name: String,
88    brain: Arc<dyn Brain>,
89    ctx: ExecutionContext,
90    /// Symbols this brain owns (`Brain::owned_symbols`), cached at
91    /// construction. `None` ⇒ the brain sees every symbol; `Some` ⇒ events
92    /// for other symbols are skipped before `on_event`.
93    owned: Option<std::collections::HashSet<Symbol>>,
94    events_processed: AtomicU64,
95    events_dropped: AtomicU64,
96    orders_placed: AtomicU64,
97    orders_blocked: AtomicU64,
98}
99
100impl ExecutionService {
101    pub(crate) fn new(brain: Arc<dyn Brain>, ctx: ExecutionContext) -> Self {
102        let name = format!("execution[{}]", brain.name());
103        let owned = brain
104            .owned_symbols()
105            .map(|syms| syms.into_iter().collect::<std::collections::HashSet<_>>());
106        Self {
107            name,
108            brain,
109            ctx,
110            owned,
111            events_processed: AtomicU64::new(0),
112            events_dropped: AtomicU64::new(0),
113            orders_placed: AtomicU64::new(0),
114            orders_blocked: AtomicU64::new(0),
115        }
116    }
117
118    /// Total events the brain has been called with.
119    pub fn events_processed(&self) -> u64 {
120        self.events_processed.load(Ordering::Relaxed)
121    }
122    /// Total events dropped by the broadcast bus due to slow consumption.
123    pub fn events_dropped(&self) -> u64 {
124        self.events_dropped.load(Ordering::Relaxed)
125    }
126    /// Total orders successfully passed to the exchange.
127    pub fn orders_placed(&self) -> u64 {
128        self.orders_placed.load(Ordering::Relaxed)
129    }
130    /// Total decisions blocked by a risk gate or by the sizer returning 0.
131    pub fn orders_blocked(&self) -> u64 {
132        self.orders_blocked.load(Ordering::Relaxed)
133    }
134
135    async fn position_for(&self, symbol: &Symbol) -> Position {
136        self.ctx
137            .positions
138            .read()
139            .await
140            .get(symbol)
141            .copied()
142            .unwrap_or(Position::FLAT)
143    }
144
145    async fn handle_event(&self, event: &MarketDataEvent) -> anyhow::Result<()> {
146        let symbol = event.symbol().clone();
147
148        // Ownership filter: a brain that declared `owned_symbols` only ever
149        // sees its own symbols — others are dropped before `on_event`, so
150        // two brains can't both act on one symbol (also enforced at startup).
151        if let Some(owned) = &self.owned
152            && !owned.contains(&symbol)
153        {
154            return Ok(());
155        }
156
157        let position = self.position_for(&symbol).await;
158
159        let decision = self.brain.on_event(event, &position).await?;
160        self.events_processed.fetch_add(1, Ordering::Relaxed);
161
162        let signal = decision.signal;
163        if matches!(signal, SignalType::Hold) {
164            return Ok(());
165        }
166
167        // Publish the brain's intent *before* gates run. Subscribers see
168        // the full signal stream; whether each was acted on is visible
169        // from order placement metrics.
170        let published = self.ctx.signals.publish(Signal {
171            id: format!("{}-{}", self.brain.name(), self.events_processed()),
172            symbol: symbol.as_str().to_string(),
173            kind: signal,
174            confidence: decision.confidence,
175            timestamp: Utc::now(),
176            source: self.brain.name().to_string(),
177            metadata: decision.metadata.clone(),
178        });
179        let _ = published;
180
181        // ── Gate 1: session PnL halt ──────────────────────────────────
182        if let Some(risk) = self.ctx.risk.read().await.get(&symbol)
183            && risk.session_pnl.is_session_halted()
184        {
185            self.orders_blocked.fetch_add(1, Ordering::Relaxed);
186            tracing::warn!(
187                service = %self.name,
188                symbol = %symbol,
189                signal = %signal,
190                "decision blocked: session PnL halted"
191            );
192            return Ok(());
193        }
194
195        // ── Gate 2: circuit breaker ───────────────────────────────────
196        if let Some(risk) = self.ctx.risk.read().await.get(&symbol)
197            && risk.circuit_breaker.is_tripped()
198        {
199            self.orders_blocked.fetch_add(1, Ordering::Relaxed);
200            tracing::warn!(
201                service = %self.name,
202                symbol = %symbol,
203                signal = %signal,
204                cooldown_secs = ?risk.circuit_breaker.cooldown_remaining(),
205                "decision blocked: circuit breaker tripped"
206            );
207            return Ok(());
208        }
209
210        // ── Build the order ───────────────────────────────────────────
211        let order = match self.build_order(event, &symbol, &position, &decision).await {
212            Some(o) => o,
213            None => return Ok(()),
214        };
215
216        // ── Place ────────────────────────────────────────────────────
217        match self.ctx.exchange.place_order(&order).await {
218            Ok(id) => {
219                self.orders_placed.fetch_add(1, Ordering::Relaxed);
220                // Track resting orders so the reaper can age them out. The
221                // tracker itself ignores market orders (they never rest).
222                if let Some(tracker) = &self.ctx.order_tracker {
223                    tracker.record(id.clone(), &order).await;
224                }
225                tracing::info!(
226                    service = %self.name,
227                    symbol = %symbol,
228                    side = ?order.side,
229                    size = %order.size,
230                    reduce_only = order.reduce_only,
231                    order_id = %id,
232                    "order placed"
233                );
234                // Bracket entry: place the SL + TP protective legs now that
235                // the market entry is accepted.
236                if self.is_bracket(&decision, order.kind)
237                    && let (Some(sl), Some(tp)) = (decision.stop_price, decision.take_profit_price)
238                {
239                    self.place_brackets(&symbol, order.side, order.size, sl, tp)
240                        .await;
241                }
242            }
243            Err(e) => {
244                tracing::error!(
245                    service = %self.name,
246                    symbol = %symbol,
247                    error = %e,
248                    "exchange rejected order — risk state unchanged"
249                );
250            }
251        }
252        Ok(())
253    }
254
255    /// Assemble the aggregate account state for the portfolio gate: open-position
256    /// count + gross exposure (Σ `|qty|·entry·contract_value`) and whether
257    /// `symbol` is already open, from the position cache; plus the account net
258    /// PnL (Σ per-symbol session net) from the risk map. `new_notional` is the
259    /// quote-currency size of the entry under consideration.
260    async fn portfolio_state(&self, symbol: &Symbol, new_notional: f64) -> PortfolioState {
261        let (open_positions, gross_exposure, symbol_already_open) = {
262            let positions = self.ctx.positions.read().await;
263            let mut open = 0u32;
264            let mut gross = 0.0;
265            for (sym, p) in positions.iter() {
266                if p.is_flat() {
267                    continue;
268                }
269                open += 1;
270                let px = p.entry_price.unwrap_or(0.0);
271                gross += p.qty.abs() * px * self.ctx.exchange.contract_value(sym);
272            }
273            let already = positions.get(symbol).is_some_and(|p| !p.is_flat());
274            (open, gross, already)
275        };
276        let account_net_pnl = {
277            let risk = self.ctx.risk.read().await;
278            risk.values().map(|sr| sr.session_pnl.net_pnl()).sum()
279        };
280        PortfolioState {
281            open_positions,
282            gross_exposure,
283            new_notional,
284            symbol_already_open,
285            account_net_pnl,
286        }
287    }
288
289    async fn build_order(
290        &self,
291        event: &MarketDataEvent,
292        symbol: &Symbol,
293        position: &Position,
294        decision: &Decision,
295    ) -> Option<Order> {
296        match decision.signal {
297            SignalType::Hold => None,
298            SignalType::Close => {
299                // Use the actual position size; size hint is ignored.
300                let Some(close_side) = position.close_side() else {
301                    tracing::debug!(
302                        service = %self.name,
303                        symbol = %symbol,
304                        "decision=Close but position is flat — nothing to do"
305                    );
306                    return None;
307                };
308                let size = Volume(position.qty.abs());
309                Some(Order::market(symbol.clone(), close_side, size).with_reduce_only(true))
310            }
311            SignalType::Buy | SignalType::Sell => {
312                let side = if matches!(decision.signal, SignalType::Buy) {
313                    Side::Buy
314                } else {
315                    Side::Sell
316                };
317                let price = price_from_event(event)?;
318                // Instrument metadata: contract size for sizing, plus tick /
319                // min-notional used below. `contract_value` mirrors the spec.
320                let spec = self.ctx.exchange.instrument_spec(symbol);
321                let contract_value = spec.contract_value;
322                let contracts = size_decision(
323                    self.ctx.sizing.for_symbol(symbol),
324                    decision.size_hint,
325                    price,
326                    contract_value,
327                );
328
329                if contracts == 0 {
330                    self.orders_blocked.fetch_add(1, Ordering::Relaxed);
331                    tracing::warn!(
332                        service = %self.name,
333                        symbol = %symbol,
334                        signal = %decision.signal,
335                        price = price.value(),
336                        contract_value,
337                        "decision blocked: sizer returned 0 contracts"
338                    );
339                    return None;
340                }
341                let size = Volume(contracts as f64);
342
343                // ── Gate 3: account-wide portfolio risk (entries only) ─
344                // Exits (the Close arm) are never gated here — only new risk.
345                let new_notional = f64::from(contracts) * price.value() * contract_value;
346                let pf_state = self.portfolio_state(symbol, new_notional).await;
347                if let Err(block) = self.ctx.portfolio.read().await.check_entry(pf_state) {
348                    self.orders_blocked.fetch_add(1, Ordering::Relaxed);
349                    tracing::warn!(
350                        service = %self.name,
351                        symbol = %symbol,
352                        signal = %decision.signal,
353                        reason = %block,
354                        "decision blocked: portfolio risk"
355                    );
356                    return None;
357                }
358
359                // ── Instrument min-notional gate ──────────────────────
360                // Skip a dust order the venue would reject. No-op when the
361                // adapter reports no minimum (the default).
362                if !spec.meets_min_notional(new_notional) {
363                    self.orders_blocked.fetch_add(1, Ordering::Relaxed);
364                    tracing::warn!(
365                        service = %self.name,
366                        symbol = %symbol,
367                        signal = %decision.signal,
368                        notional = new_notional,
369                        min_notional = spec.min_notional,
370                        "decision blocked: order below instrument min notional"
371                    );
372                    return None;
373                }
374
375                // ── Order-kind capability gate ────────────────────────
376                // Block (don't silently downgrade) a kind the adapter
377                // can't honour — downgrading post-only / IOC / FOK would
378                // change the fill and fee semantics the brain relied on.
379                let kind = decision.order_kind;
380                if let Some(cap) = capability_for_kind(kind)
381                    && !self.ctx.exchange.supports(cap)
382                {
383                    self.orders_blocked.fetch_add(1, Ordering::Relaxed);
384                    tracing::warn!(
385                        service = %self.name,
386                        symbol = %symbol,
387                        signal = %decision.signal,
388                        ?kind,
389                        required = ?cap,
390                        "decision blocked: adapter does not support requested order kind"
391                    );
392                    return None;
393                }
394
395                // ── Build the base order for the requested kind ───────
396                let order = match kind {
397                    OrderKind::Market => Order::market(symbol.clone(), side, size),
398                    OrderKind::Limit | OrderKind::PostOnly | OrderKind::Ioc | OrderKind::Fok => {
399                        let limit = decision.limit_price.unwrap_or_else(|| {
400                            tracing::warn!(
401                                service = %self.name,
402                                symbol = %symbol,
403                                ?kind,
404                                fallback = price.value(),
405                                "non-market order kind without limit_price; \
406                                 falling back to event price"
407                            );
408                            price
409                        });
410                        // Snap to the venue's price tick (no-op when unknown).
411                        let limit = Price(spec.round_price(limit.value()));
412                        let mut o = Order::limit(symbol.clone(), side, size, limit);
413                        o.kind = kind;
414                        o
415                    }
416                };
417
418                // ── Protective handling ───────────────────────────────
419                // When a full bracket applies (both SL+TP, market entry,
420                // OCO active), the entry stays clean — the two protective
421                // orders are placed separately after the entry fills (see
422                // `place_brackets`). Otherwise attach a single stop/TP (or
423                // fall back / warn) as in 0.2b.
424                if self.is_bracket(decision, order.kind) {
425                    Some(order)
426                } else {
427                    Some(self.attach_protection(order, symbol, decision))
428                }
429            }
430        }
431    }
432
433    /// Does this decision warrant a full SL+TP bracket placed as two
434    /// separate OCO orders? Requires both prices, a market entry, and an
435    /// active OCO registry (which `Bot` only sets when the adapter supports
436    /// `StopOrders` + `OrderTracking` and a fill source is wired).
437    fn is_bracket(&self, decision: &Decision, kind: OrderKind) -> bool {
438        self.ctx.oco.is_some()
439            && matches!(kind, OrderKind::Market)
440            && decision.stop_price.is_some()
441            && decision.take_profit_price.is_some()
442    }
443
444    /// Place the two reduce-only protective legs for a bracket entry and
445    /// register them as an OCO pair. Called after the market entry is
446    /// accepted. Best-effort: if the second leg fails to place, the first is
447    /// cancelled so no orphaned protective order is left resting.
448    async fn place_brackets(
449        &self,
450        symbol: &Symbol,
451        entry_side: Side,
452        size: Volume,
453        sl: Price,
454        tp: Price,
455    ) {
456        let Some(oco) = &self.ctx.oco else { return };
457        let close_side = entry_side.opposite();
458        let sl_order = Order::market(symbol.clone(), close_side, size)
459            .with_reduce_only(true)
460            .with_stop(StopAttachment::stop_market(sl));
461        let tp_order = Order::market(symbol.clone(), close_side, size)
462            .with_reduce_only(true)
463            .with_stop(StopAttachment::take_profit(tp));
464
465        let sl_id = match self.ctx.exchange.place_order(&sl_order).await {
466            Ok(id) => id,
467            Err(e) => {
468                tracing::error!(service = %self.name, symbol = %symbol, error = %e, "bracket: stop-loss leg failed to place; entry is UNPROTECTED");
469                return;
470            }
471        };
472        let tp_id = match self.ctx.exchange.place_order(&tp_order).await {
473            Ok(id) => id,
474            Err(e) => {
475                tracing::error!(service = %self.name, symbol = %symbol, error = %e, "bracket: take-profit leg failed; cancelling the stop-loss leg to avoid an orphan");
476                let _ = self.ctx.exchange.cancel_order(symbol, &sl_id).await;
477                return;
478            }
479        };
480
481        oco.register(symbol.clone(), sl_id.clone(), tp_id.clone())
482            .await;
483        if let Some(tracker) = &self.ctx.order_tracker {
484            tracker.record(sl_id.clone(), &sl_order).await;
485            tracker.record(tp_id.clone(), &tp_order).await;
486        }
487        self.orders_placed.fetch_add(2, Ordering::Relaxed);
488        tracing::info!(
489            service = %self.name,
490            symbol = %symbol,
491            close_side = ?close_side,
492            stop = sl.value(),
493            take_profit = tp.value(),
494            sl_id = %sl_id,
495            tp_id = %tp_id,
496            "bracket placed (SL + TP, OCO-linked)"
497        );
498    }
499
500    /// Attach a **single** protective [`StopAttachment`] to the entry order,
501    /// gated on [`Capability::StopOrders`]. This is the fallback path used
502    /// when a full bracket doesn't apply — i.e. only one of SL/TP is set, the
503    /// entry is a limit order, or bracket prerequisites aren't met (no fill
504    /// source, or the adapter lacks `StopOrders`/`OrderTracking`).
505    ///
506    /// When *both* SL and TP are set but brackets are inactive, the single
507    /// `Order.stop` field can only carry one, so the protective **stop-loss**
508    /// takes priority and the take-profit is logged as dropped — full SL+TP
509    /// is handled by [`Self::place_brackets`] when brackets are active. When
510    /// the adapter lacks `StopOrders`, the order is placed **without**
511    /// protection and a warning is emitted (never silently dropped) — a brain
512    /// that requires stops can introspect `supports` itself.
513    fn attach_protection(&self, order: Order, symbol: &Symbol, decision: &Decision) -> Order {
514        let stop = match (decision.stop_price, decision.take_profit_price) {
515            (Some(sl), Some(_tp)) => {
516                tracing::warn!(
517                    service = %self.name,
518                    symbol = %symbol,
519                    "both stop_price and take_profit_price set but brackets inactive \
520                     (needs StopOrders + OrderTracking + a fill source); attaching \
521                     stop-loss only"
522                );
523                StopAttachment::stop_market(sl)
524            }
525            (Some(sl), None) => StopAttachment::stop_market(sl),
526            (None, Some(tp)) => StopAttachment::take_profit(tp),
527            (None, None) => return order,
528        };
529
530        if self.ctx.exchange.supports(Capability::StopOrders) {
531            order.with_stop(stop)
532        } else {
533            tracing::warn!(
534                service = %self.name,
535                symbol = %symbol,
536                "protective stop / take-profit requested but adapter lacks \
537                 Capability::StopOrders; placing order WITHOUT protection"
538            );
539            order
540        }
541    }
542}
543
544/// The adapter [`Capability`] a given [`OrderKind`] requires, if any.
545/// `Market` and `Limit` are assumed universally supported.
546fn capability_for_kind(kind: OrderKind) -> Option<Capability> {
547    match kind {
548        OrderKind::Market | OrderKind::Limit => None,
549        OrderKind::PostOnly => Some(Capability::PostOnly),
550        OrderKind::Ioc => Some(Capability::Ioc),
551        OrderKind::Fok => Some(Capability::Fok),
552    }
553}
554
555#[async_trait]
556impl TradingService for ExecutionService {
557    fn name(&self) -> &str {
558        &self.name
559    }
560
561    fn restart_policy(&self) -> RestartPolicy {
562        RestartPolicy::OnFailure
563    }
564
565    async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
566        let mut rx = self.ctx.bus.subscribe();
567        tracing::info!(service = %self.name, "execution service subscribed");
568
569        loop {
570            tokio::select! {
571                _ = cancel.cancelled() => {
572                    tracing::info!(
573                        service = %self.name,
574                        events = self.events_processed(),
575                        dropped = self.events_dropped(),
576                        placed = self.orders_placed(),
577                        blocked = self.orders_blocked(),
578                        "execution service shutting down"
579                    );
580                    return Ok(());
581                }
582                next = rx.recv() => match next {
583                    Ok(event) => {
584                        if let Err(e) = self.handle_event(&event).await {
585                            tracing::error!(
586                                service = %self.name,
587                                error = %e,
588                                "brain returned error from on_event — service continuing"
589                            );
590                        }
591                    }
592                    Err(RecvError::Lagged(skipped)) => {
593                        self.events_dropped.fetch_add(skipped, Ordering::Relaxed);
594                        tracing::warn!(
595                            service = %self.name,
596                            skipped,
597                            "market data bus lagged — events dropped"
598                        );
599                    }
600                    Err(RecvError::Closed) => {
601                        tracing::info!(service = %self.name, "market data bus closed");
602                        return Ok(());
603                    }
604                },
605            }
606        }
607    }
608}
609
610// ── Helpers ─────────────────────────────────────────────────────────────
611
612fn price_from_event(event: &MarketDataEvent) -> Option<Price> {
613    match event {
614        MarketDataEvent::Candle { candle, .. } => Some(Price(candle.close)),
615        MarketDataEvent::Ticker { tick, .. } => Some(tick.mid_price()),
616        MarketDataEvent::Trade { price, .. } => Some(Price(*price)),
617    }
618}
619
620/// Translate the brain's `SizeHint` into a contract count via the sizer.
621fn size_decision(sizing: &SizingConfig, hint: SizeHint, price: Price, contract_value: f64) -> u32 {
622    let sizer = PositionSizer::new(sizing.clone());
623    match hint {
624        SizeHint::Default => sizer.contracts(price.value(), contract_value),
625        SizeHint::MarginFraction(f) => {
626            // Fraction of the configured default margin — clamped to [0, 1].
627            let f = f.clamp(0.0, 1.0);
628            let margin = sizing.margin_per_trade * f;
629            sizer.contracts_with_margin(margin, price.value(), contract_value)
630        }
631        SizeHint::NotionalUsd(n) => {
632            // notional = margin × leverage  →  margin = notional / leverage
633            let leverage = sizing.leverage.max(1);
634            let margin = n / f64::from(leverage);
635            sizer.contracts_with_margin(margin, price.value(), contract_value)
636        }
637        SizeHint::Quantity(q) => {
638            // The brain knows what it wants — clamp to max and floor.
639            let raw = q.value().max(0.0).floor() as u32;
640            raw.min(sizing.max_contracts)
641        }
642    }
643}