Skip to main content

rustrade/
execution.rs

1//! Framework-side execution: subscribes to the [`MarketDataBus`] and
2//! routes each event through the risk gates to the [`ExchangeClient`].
3//!
4//! # Pre-trade gate sequence
5//!
6//! For every non-`Hold` decision the brain emits, the gates run in this
7//! exact order. Each one that blocks emits a structured `tracing` event
8//! and the order is not placed.
9//!
10//! 1. `SessionPnl::is_session_halted(symbol)` — daily drawdown cap.
11//! 2. `CircuitBreaker::is_tripped(symbol)` — rolling-window loss breaker.
12//! 3. `PositionSizer::contracts(price, contract_value)` — `0` means the
13//!    sized order would be too small to send.
14//!
15//! The first three are *risk-layer* concerns and never raise an error
16//! that aborts the service. A `Decision::Close` for a flat position is
17//! also a silent skip (logged at debug level).
18
19use std::sync::Arc;
20use std::sync::atomic::{AtomicU64, Ordering};
21
22use async_trait::async_trait;
23use chrono::Utc;
24use rustrade_core::{
25    Brain, Capability, Decision, ExchangeClient, MarketDataBus, MarketDataEvent, Order, OrderKind,
26    Position, Price, Side, Signal, SignalBus, SignalType, SizeHint, StopAttachment, Symbol, Volume,
27};
28use rustrade_risk::{PositionSizer, SizingConfig};
29use rustrade_supervisor::{RestartPolicy, TradingService};
30use tokio::sync::broadcast::error::RecvError;
31use tokio_util::sync::CancellationToken;
32
33use crate::risk_state::{PositionCache, RiskStateMap};
34
35/// Shared inputs every [`ExecutionService`] needs.
36///
37/// Cheaply cloneable. Constructed once by [`Bot`](crate::Bot) and
38/// shared across all per-brain execution services.
39#[derive(Clone)]
40pub(crate) struct ExecutionContext {
41    pub exchange: Arc<dyn ExchangeClient>,
42    pub bus: MarketDataBus,
43    pub signals: SignalBus,
44    pub positions: PositionCache,
45    pub risk: RiskStateMap,
46    pub sizing: Arc<SizingConfig>,
47    /// Set when order tracking is wired (`Bot::with_order_tracking`) and the
48    /// adapter advertises `Capability::OrderTracking`. Resting orders the
49    /// service places are recorded here so the reaper can age them out.
50    pub order_tracker: Option<crate::order_tracker::OrderTracker>,
51}
52
53/// Per-brain execution loop with full risk gating + order placement.
54pub struct ExecutionService {
55    name: String,
56    brain: Arc<dyn Brain>,
57    ctx: ExecutionContext,
58    events_processed: AtomicU64,
59    events_dropped: AtomicU64,
60    orders_placed: AtomicU64,
61    orders_blocked: AtomicU64,
62}
63
64impl ExecutionService {
65    pub(crate) fn new(brain: Arc<dyn Brain>, ctx: ExecutionContext) -> Self {
66        let name = format!("execution[{}]", brain.name());
67        Self {
68            name,
69            brain,
70            ctx,
71            events_processed: AtomicU64::new(0),
72            events_dropped: AtomicU64::new(0),
73            orders_placed: AtomicU64::new(0),
74            orders_blocked: AtomicU64::new(0),
75        }
76    }
77
78    /// Total events the brain has been called with.
79    pub fn events_processed(&self) -> u64 {
80        self.events_processed.load(Ordering::Relaxed)
81    }
82    /// Total events dropped by the broadcast bus due to slow consumption.
83    pub fn events_dropped(&self) -> u64 {
84        self.events_dropped.load(Ordering::Relaxed)
85    }
86    /// Total orders successfully passed to the exchange.
87    pub fn orders_placed(&self) -> u64 {
88        self.orders_placed.load(Ordering::Relaxed)
89    }
90    /// Total decisions blocked by a risk gate or by the sizer returning 0.
91    pub fn orders_blocked(&self) -> u64 {
92        self.orders_blocked.load(Ordering::Relaxed)
93    }
94
95    async fn position_for(&self, symbol: &Symbol) -> Position {
96        self.ctx
97            .positions
98            .read()
99            .await
100            .get(symbol)
101            .copied()
102            .unwrap_or(Position::FLAT)
103    }
104
105    async fn handle_event(&self, event: &MarketDataEvent) -> anyhow::Result<()> {
106        let symbol = event.symbol().clone();
107        let position = self.position_for(&symbol).await;
108
109        let decision = self.brain.on_event(event, &position).await?;
110        self.events_processed.fetch_add(1, Ordering::Relaxed);
111
112        let signal = decision.signal;
113        if matches!(signal, SignalType::Hold) {
114            return Ok(());
115        }
116
117        // Publish the brain's intent *before* gates run. Subscribers see
118        // the full signal stream; whether each was acted on is visible
119        // from order placement metrics.
120        let published = self.ctx.signals.publish(Signal {
121            id: format!("{}-{}", self.brain.name(), self.events_processed()),
122            symbol: symbol.as_str().to_string(),
123            kind: signal,
124            confidence: decision.confidence,
125            timestamp: Utc::now(),
126            source: self.brain.name().to_string(),
127            metadata: decision.metadata.clone(),
128        });
129        let _ = published;
130
131        // ── Gate 1: session PnL halt ──────────────────────────────────
132        if let Some(risk) = self.ctx.risk.read().await.get(&symbol)
133            && risk.session_pnl.is_session_halted()
134        {
135            self.orders_blocked.fetch_add(1, Ordering::Relaxed);
136            tracing::warn!(
137                service = %self.name,
138                symbol = %symbol,
139                signal = %signal,
140                "decision blocked: session PnL halted"
141            );
142            return Ok(());
143        }
144
145        // ── Gate 2: circuit breaker ───────────────────────────────────
146        if let Some(risk) = self.ctx.risk.read().await.get(&symbol)
147            && risk.circuit_breaker.is_tripped()
148        {
149            self.orders_blocked.fetch_add(1, Ordering::Relaxed);
150            tracing::warn!(
151                service = %self.name,
152                symbol = %symbol,
153                signal = %signal,
154                cooldown_secs = ?risk.circuit_breaker.cooldown_remaining(),
155                "decision blocked: circuit breaker tripped"
156            );
157            return Ok(());
158        }
159
160        // ── Build the order ───────────────────────────────────────────
161        let order = match self.build_order(event, &symbol, &position, &decision).await {
162            Some(o) => o,
163            None => return Ok(()),
164        };
165
166        // ── Place ────────────────────────────────────────────────────
167        match self.ctx.exchange.place_order(&order).await {
168            Ok(id) => {
169                self.orders_placed.fetch_add(1, Ordering::Relaxed);
170                // Track resting orders so the reaper can age them out. The
171                // tracker itself ignores market orders (they never rest).
172                if let Some(tracker) = &self.ctx.order_tracker {
173                    tracker.record(id.clone(), &order).await;
174                }
175                tracing::info!(
176                    service = %self.name,
177                    symbol = %symbol,
178                    side = ?order.side,
179                    size = %order.size,
180                    reduce_only = order.reduce_only,
181                    order_id = %id,
182                    "order placed"
183                );
184            }
185            Err(e) => {
186                tracing::error!(
187                    service = %self.name,
188                    symbol = %symbol,
189                    error = %e,
190                    "exchange rejected order — risk state unchanged"
191                );
192            }
193        }
194        Ok(())
195    }
196
197    async fn build_order(
198        &self,
199        event: &MarketDataEvent,
200        symbol: &Symbol,
201        position: &Position,
202        decision: &Decision,
203    ) -> Option<Order> {
204        match decision.signal {
205            SignalType::Hold => None,
206            SignalType::Close => {
207                // Use the actual position size; size hint is ignored.
208                let Some(close_side) = position.close_side() else {
209                    tracing::debug!(
210                        service = %self.name,
211                        symbol = %symbol,
212                        "decision=Close but position is flat — nothing to do"
213                    );
214                    return None;
215                };
216                let size = Volume(position.qty.abs());
217                Some(Order::market(symbol.clone(), close_side, size).with_reduce_only(true))
218            }
219            SignalType::Buy | SignalType::Sell => {
220                let side = if matches!(decision.signal, SignalType::Buy) {
221                    Side::Buy
222                } else {
223                    Side::Sell
224                };
225                let price = price_from_event(event)?;
226                let contract_value = self.ctx.exchange.contract_value(symbol);
227                let contracts =
228                    size_decision(&self.ctx.sizing, decision.size_hint, price, contract_value);
229
230                if contracts == 0 {
231                    self.orders_blocked.fetch_add(1, Ordering::Relaxed);
232                    tracing::warn!(
233                        service = %self.name,
234                        symbol = %symbol,
235                        signal = %decision.signal,
236                        price = price.value(),
237                        contract_value,
238                        "decision blocked: sizer returned 0 contracts"
239                    );
240                    return None;
241                }
242                let size = Volume(contracts as f64);
243
244                // ── Order-kind capability gate ────────────────────────
245                // Block (don't silently downgrade) a kind the adapter
246                // can't honour — downgrading post-only / IOC / FOK would
247                // change the fill and fee semantics the brain relied on.
248                let kind = decision.order_kind;
249                if let Some(cap) = capability_for_kind(kind)
250                    && !self.ctx.exchange.supports(cap)
251                {
252                    self.orders_blocked.fetch_add(1, Ordering::Relaxed);
253                    tracing::warn!(
254                        service = %self.name,
255                        symbol = %symbol,
256                        signal = %decision.signal,
257                        ?kind,
258                        required = ?cap,
259                        "decision blocked: adapter does not support requested order kind"
260                    );
261                    return None;
262                }
263
264                // ── Build the base order for the requested kind ───────
265                let order = match kind {
266                    OrderKind::Market => Order::market(symbol.clone(), side, size),
267                    OrderKind::Limit | OrderKind::PostOnly | OrderKind::Ioc | OrderKind::Fok => {
268                        let limit = decision.limit_price.unwrap_or_else(|| {
269                            tracing::warn!(
270                                service = %self.name,
271                                symbol = %symbol,
272                                ?kind,
273                                fallback = price.value(),
274                                "non-market order kind without limit_price; \
275                                 falling back to event price"
276                            );
277                            price
278                        });
279                        let mut o = Order::limit(symbol.clone(), side, size, limit);
280                        o.kind = kind;
281                        o
282                    }
283                };
284
285                // ── Attach protective stop / take-profit if requested ──
286                Some(self.attach_protection(order, symbol, decision))
287            }
288        }
289    }
290
291    /// Attach a protective [`StopAttachment`] derived from
292    /// `decision.stop_price` / `take_profit_price`, gated on
293    /// [`Capability::StopOrders`].
294    ///
295    /// The current [`Order`] model carries a single attachment, so a full
296    /// bracket (stop-loss **and** take-profit on one order) can't be
297    /// expressed yet — stop-loss takes priority and the take-profit is
298    /// logged as dropped. Simultaneous SL+TP (OCO / bracket) is a follow-up
299    /// once the order tracker (0.2c) can manage paired orders. When the
300    /// adapter lacks `StopOrders`, the order is placed **without**
301    /// protection and a warning is emitted (never silently dropped) — a
302    /// brain that requires stops can introspect `supports` itself.
303    fn attach_protection(&self, order: Order, symbol: &Symbol, decision: &Decision) -> Order {
304        let stop = match (decision.stop_price, decision.take_profit_price) {
305            (Some(sl), Some(_tp)) => {
306                tracing::warn!(
307                    service = %self.name,
308                    symbol = %symbol,
309                    "both stop_price and take_profit_price set; attaching stop-loss only \
310                     (bracket / OCO awaits the order tracker)"
311                );
312                StopAttachment::stop_market(sl)
313            }
314            (Some(sl), None) => StopAttachment::stop_market(sl),
315            (None, Some(tp)) => StopAttachment::take_profit(tp),
316            (None, None) => return order,
317        };
318
319        if self.ctx.exchange.supports(Capability::StopOrders) {
320            order.with_stop(stop)
321        } else {
322            tracing::warn!(
323                service = %self.name,
324                symbol = %symbol,
325                "protective stop / take-profit requested but adapter lacks \
326                 Capability::StopOrders; placing order WITHOUT protection"
327            );
328            order
329        }
330    }
331}
332
333/// The adapter [`Capability`] a given [`OrderKind`] requires, if any.
334/// `Market` and `Limit` are assumed universally supported.
335fn capability_for_kind(kind: OrderKind) -> Option<Capability> {
336    match kind {
337        OrderKind::Market | OrderKind::Limit => None,
338        OrderKind::PostOnly => Some(Capability::PostOnly),
339        OrderKind::Ioc => Some(Capability::Ioc),
340        OrderKind::Fok => Some(Capability::Fok),
341    }
342}
343
344#[async_trait]
345impl TradingService for ExecutionService {
346    fn name(&self) -> &str {
347        &self.name
348    }
349
350    fn restart_policy(&self) -> RestartPolicy {
351        RestartPolicy::OnFailure
352    }
353
354    async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
355        let mut rx = self.ctx.bus.subscribe();
356        tracing::info!(service = %self.name, "execution service subscribed");
357
358        loop {
359            tokio::select! {
360                _ = cancel.cancelled() => {
361                    tracing::info!(
362                        service = %self.name,
363                        events = self.events_processed(),
364                        dropped = self.events_dropped(),
365                        placed = self.orders_placed(),
366                        blocked = self.orders_blocked(),
367                        "execution service shutting down"
368                    );
369                    return Ok(());
370                }
371                next = rx.recv() => match next {
372                    Ok(event) => {
373                        if let Err(e) = self.handle_event(&event).await {
374                            tracing::error!(
375                                service = %self.name,
376                                error = %e,
377                                "brain returned error from on_event — service continuing"
378                            );
379                        }
380                    }
381                    Err(RecvError::Lagged(skipped)) => {
382                        self.events_dropped.fetch_add(skipped, Ordering::Relaxed);
383                        tracing::warn!(
384                            service = %self.name,
385                            skipped,
386                            "market data bus lagged — events dropped"
387                        );
388                    }
389                    Err(RecvError::Closed) => {
390                        tracing::info!(service = %self.name, "market data bus closed");
391                        return Ok(());
392                    }
393                },
394            }
395        }
396    }
397}
398
399// ── Helpers ─────────────────────────────────────────────────────────────
400
401fn price_from_event(event: &MarketDataEvent) -> Option<Price> {
402    match event {
403        MarketDataEvent::Candle { candle, .. } => Some(Price(candle.close)),
404        MarketDataEvent::Ticker { tick, .. } => Some(tick.mid_price()),
405        MarketDataEvent::Trade { price, .. } => Some(Price(*price)),
406    }
407}
408
409/// Translate the brain's `SizeHint` into a contract count via the sizer.
410fn size_decision(sizing: &SizingConfig, hint: SizeHint, price: Price, contract_value: f64) -> u32 {
411    let sizer = PositionSizer::new(sizing.clone());
412    match hint {
413        SizeHint::Default => sizer.contracts(price.value(), contract_value),
414        SizeHint::MarginFraction(f) => {
415            // Fraction of the configured default margin — clamped to [0, 1].
416            let f = f.clamp(0.0, 1.0);
417            let margin = sizing.margin_per_trade * f;
418            sizer.contracts_with_margin(margin, price.value(), contract_value)
419        }
420        SizeHint::NotionalUsd(n) => {
421            // notional = margin × leverage  →  margin = notional / leverage
422            let leverage = sizing.leverage.max(1);
423            let margin = n / f64::from(leverage);
424            sizer.contracts_with_margin(margin, price.value(), contract_value)
425        }
426        SizeHint::Quantity(q) => {
427            // The brain knows what it wants — clamp to max and floor.
428            let raw = q.value().max(0.0).floor() as u32;
429            raw.min(sizing.max_contracts)
430        }
431    }
432}