Skip to main content

rustrade/
execution.rs

1//! Framework-side execution: subscribes to the [`MarketDataBus`] and
2//! routes each event through the risk gates to the [`ExchangeClient`].
3//!
4//! # Pre-trade gate sequence
5//!
6//! For every non-`Hold` decision the brain emits, the gates run in this
7//! exact order. Each one that blocks emits a structured `tracing` event
8//! and the order is not placed.
9//!
10//! 1. `SessionPnl::is_session_halted(symbol)` — daily drawdown cap.
11//! 2. `CircuitBreaker::is_tripped(symbol)` — rolling-window loss breaker.
12//! 3. `PositionSizer::contracts(price, contract_value)` — `0` means the
13//!    sized order would be too small to send.
14//!
15//! The first three are *risk-layer* concerns and never raise an error
16//! that aborts the service. A `Decision::Close` for a flat position is
17//! also a silent skip (logged at debug level).
18
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicU64, Ordering};
22
23use async_trait::async_trait;
24use chrono::Utc;
25use rustrade_core::{
26    Brain, Capability, Decision, ExchangeClient, MarketDataBus, MarketDataEvent, Order, OrderKind,
27    Position, Price, Side, Signal, SignalBus, SignalType, SizeHint, StopAttachment, Symbol, Volume,
28};
29use rustrade_risk::{PortfolioState, PositionSizer, SizingConfig};
30use rustrade_supervisor::{RestartPolicy, TradingService};
31use tokio::sync::broadcast::error::RecvError;
32use tokio_util::sync::CancellationToken;
33
34use crate::pending::PendingEntryLedger;
35use crate::risk_state::{PortfolioRiskState, PositionCache, RiskStateMap};
36
37/// Shared inputs every [`ExecutionService`] needs.
38///
39/// Cheaply cloneable. Constructed once by [`Bot`](crate::Bot) and
40/// shared across all per-brain execution services.
41#[derive(Clone)]
42pub(crate) struct ExecutionContext {
43    pub exchange: Arc<dyn ExchangeClient>,
44    pub bus: MarketDataBus,
45    pub signals: SignalBus,
46    pub positions: PositionCache,
47    pub risk: RiskStateMap,
48    /// Account-wide portfolio risk: the pre-trade gate reads it (entries only);
49    /// the risk sweep maintains its daily-loss latch.
50    pub portfolio: PortfolioRiskState,
51    /// Per-symbol sizing resolver (default + overrides). The execution
52    /// service sizes each order with the config for the event's symbol.
53    pub sizing: Arc<SymbolSizing>,
54    /// Set when order tracking is wired (`Bot::with_order_tracking`) and the
55    /// adapter advertises `Capability::OrderTracking`. Resting orders the
56    /// service places are recorded here so the reaper can age them out.
57    pub order_tracker: Option<crate::order_tracker::OrderTracker>,
58    /// Set when bracket (SL+TP / OCO) orders are active — the adapter
59    /// supports `StopOrders` + `OrderTracking` and a fill source is wired.
60    /// A market entry carrying both `stop_price` and `take_profit_price`
61    /// then gets two reduce-only protective orders registered as an OCO
62    /// pair here; the `FillRoutingService` cancels the sibling on fill.
63    pub oco: Option<crate::order_tracker::OcoRegistry>,
64    /// What to do when a bracket entry fills but its stop-loss leg fails
65    /// to place (see [`BracketFailurePolicy`](crate::BracketFailurePolicy)).
66    pub bracket_failure_policy: crate::bot::BracketFailurePolicy,
67    /// Outstanding entry reservations: orders that passed the portfolio
68    /// gate but whose fills aren't visible in the position cache yet.
69    /// Makes the gate check-and-reserve, so concurrent brains can't both
70    /// slip through `max_concurrent_positions` / `max_gross_exposure`.
71    pub pending: PendingEntryLedger,
72}
73
74/// Resolves the [`SizingConfig`] to use for a given symbol: a per-symbol
75/// override if one exists, otherwise the bot-wide default.
76pub(crate) struct SymbolSizing {
77    default: SizingConfig,
78    per_symbol: HashMap<Symbol, SizingConfig>,
79}
80
81impl SymbolSizing {
82    pub(crate) fn new(default: SizingConfig, per_symbol: HashMap<Symbol, SizingConfig>) -> Self {
83        Self {
84            default,
85            per_symbol,
86        }
87    }
88
89    pub(crate) fn for_symbol(&self, symbol: &Symbol) -> &SizingConfig {
90        self.per_symbol.get(symbol).unwrap_or(&self.default)
91    }
92}
93
94/// Per-brain execution loop with full risk gating + order placement.
95pub struct ExecutionService {
96    name: String,
97    brain: Arc<dyn Brain>,
98    ctx: ExecutionContext,
99    /// Symbols this brain owns (`Brain::owned_symbols`), cached at
100    /// construction. `None` ⇒ the brain sees every symbol; `Some` ⇒ events
101    /// for other symbols are skipped before `on_event`.
102    owned: Option<std::collections::HashSet<Symbol>>,
103    events_processed: AtomicU64,
104    events_dropped: AtomicU64,
105    orders_placed: AtomicU64,
106    orders_blocked: AtomicU64,
107}
108
109impl ExecutionService {
110    pub(crate) fn new(brain: Arc<dyn Brain>, ctx: ExecutionContext) -> Self {
111        let name = format!("execution[{}]", brain.name());
112        let owned = brain
113            .owned_symbols()
114            .map(|syms| syms.into_iter().collect::<std::collections::HashSet<_>>());
115        Self {
116            name,
117            brain,
118            ctx,
119            owned,
120            events_processed: AtomicU64::new(0),
121            events_dropped: AtomicU64::new(0),
122            orders_placed: AtomicU64::new(0),
123            orders_blocked: AtomicU64::new(0),
124        }
125    }
126
127    /// Total events the brain has been called with.
128    pub fn events_processed(&self) -> u64 {
129        self.events_processed.load(Ordering::Relaxed)
130    }
131    /// Total events dropped by the broadcast bus due to slow consumption.
132    pub fn events_dropped(&self) -> u64 {
133        self.events_dropped.load(Ordering::Relaxed)
134    }
135    /// Total orders successfully passed to the exchange.
136    pub fn orders_placed(&self) -> u64 {
137        self.orders_placed.load(Ordering::Relaxed)
138    }
139    /// Total decisions blocked by a risk gate or by the sizer returning 0.
140    pub fn orders_blocked(&self) -> u64 {
141        self.orders_blocked.load(Ordering::Relaxed)
142    }
143
144    async fn position_for(&self, symbol: &Symbol) -> Position {
145        self.ctx
146            .positions
147            .read()
148            .await
149            .get(symbol)
150            .copied()
151            .unwrap_or(Position::FLAT)
152    }
153
154    async fn handle_event(&self, event: &MarketDataEvent) -> anyhow::Result<()> {
155        let symbol = event.symbol().clone();
156
157        // Ownership filter: a brain that declared `owned_symbols` only ever
158        // sees its own symbols — others are dropped before `on_event`, so
159        // two brains can't both act on one symbol (also enforced at startup).
160        if let Some(owned) = &self.owned
161            && !owned.contains(&symbol)
162        {
163            return Ok(());
164        }
165
166        let position = self.position_for(&symbol).await;
167
168        let decision = self.brain.on_event(event, &position).await?;
169        self.events_processed.fetch_add(1, Ordering::Relaxed);
170
171        let signal = decision.signal;
172        if matches!(signal, SignalType::Hold) {
173            return Ok(());
174        }
175
176        // Publish the brain's intent *before* gates run. Subscribers see
177        // the full signal stream; whether each was acted on is visible
178        // from order placement metrics.
179        let published = self.ctx.signals.publish(Signal {
180            id: format!("{}-{}", self.brain.name(), self.events_processed()),
181            symbol: symbol.as_str().to_string(),
182            kind: signal,
183            confidence: decision.confidence,
184            timestamp: Utc::now(),
185            source: self.brain.name().to_string(),
186            metadata: decision.metadata.clone(),
187        });
188        let _ = published;
189
190        // ── Gate 1: session PnL halt ──────────────────────────────────
191        if let Some(risk) = self.ctx.risk.read().await.get(&symbol)
192            && risk.session_pnl.is_session_halted()
193        {
194            self.orders_blocked.fetch_add(1, Ordering::Relaxed);
195            tracing::warn!(
196                service = %self.name,
197                symbol = %symbol,
198                signal = %signal,
199                "decision blocked: session PnL halted"
200            );
201            return Ok(());
202        }
203
204        // ── Gate 2: circuit breaker ───────────────────────────────────
205        if let Some(risk) = self.ctx.risk.read().await.get(&symbol)
206            && risk.circuit_breaker.is_tripped()
207        {
208            self.orders_blocked.fetch_add(1, Ordering::Relaxed);
209            tracing::warn!(
210                service = %self.name,
211                symbol = %symbol,
212                signal = %signal,
213                cooldown_secs = ?risk.circuit_breaker.cooldown_remaining(),
214                "decision blocked: circuit breaker tripped"
215            );
216            return Ok(());
217        }
218
219        // ── Build the order ───────────────────────────────────────────
220        let order = match self.build_order(event, &symbol, &position, &decision).await {
221            Some(o) => o,
222            None => return Ok(()),
223        };
224
225        // ── Place ────────────────────────────────────────────────────
226        match self.ctx.exchange.place_order(&order).await {
227            Ok(id) => {
228                self.orders_placed.fetch_add(1, Ordering::Relaxed);
229                // Track resting orders so the reaper can age them out. The
230                // tracker itself ignores market orders (they never rest).
231                if let Some(tracker) = &self.ctx.order_tracker {
232                    tracker.record(id.clone(), &order).await;
233                }
234                tracing::info!(
235                    service = %self.name,
236                    symbol = %symbol,
237                    side = ?order.side,
238                    size = %order.size,
239                    reduce_only = order.reduce_only,
240                    order_id = %id,
241                    "order placed"
242                );
243                // Bracket entry: place the SL + TP protective legs now that
244                // the market entry is accepted.
245                if self.is_bracket(&decision, order.kind)
246                    && let (Some(sl), Some(tp)) = (decision.stop_price, decision.take_profit_price)
247                {
248                    self.place_brackets(&symbol, order.side, order.size, sl, tp)
249                        .await;
250                }
251            }
252            Err(e) => {
253                // An entry's portfolio reservation must not outlive the
254                // rejected order (exits never reserve).
255                if matches!(signal, SignalType::Buy | SignalType::Sell) {
256                    self.ctx.pending.release(&symbol).await;
257                }
258                tracing::error!(
259                    service = %self.name,
260                    symbol = %symbol,
261                    error = %e,
262                    "exchange rejected order — risk state unchanged"
263                );
264            }
265        }
266        Ok(())
267    }
268
269    /// Atomic portfolio gate: assemble the aggregate account state —
270    /// open-position count + gross exposure (Σ `|qty|·entry·contract_value`)
271    /// from the position cache **plus outstanding pending-entry
272    /// reservations**, and the account net PnL (Σ per-symbol session net)
273    /// from the risk map — run [`check_entry`](rustrade_risk::PortfolioRisk::check_entry),
274    /// and on success record this entry's reservation, all under the
275    /// ledger lock. Concurrent brains serialise here, so two entries
276    /// can't both slip through `max_concurrent_positions` or the
277    /// gross-exposure cap before either fill is visible.
278    ///
279    /// The caller owns the reservation's lifecycle: it is released on
280    /// exchange rejection (here, in `handle_event`), on the fill-driven
281    /// position-cache refresh (`FillRoutingService`), or by TTL.
282    async fn check_and_reserve_entry(
283        &self,
284        symbol: &Symbol,
285        new_notional: f64,
286    ) -> Result<(), rustrade_risk::PortfolioBlock> {
287        let mut pending = self.ctx.pending.lock().await;
288
289        let (open_positions, gross_exposure, symbol_already_open) = {
290            let positions = self.ctx.positions.read().await;
291            let mut open = 0u32;
292            let mut gross = 0.0;
293            for (sym, p) in positions.iter() {
294                if p.is_flat() {
295                    continue;
296                }
297                open += 1;
298                let px = p.entry_price.unwrap_or(0.0);
299                gross += p.qty.abs() * px * self.ctx.exchange.contract_value(sym);
300            }
301            let cache_open = |s: &Symbol| positions.get(s).is_some_and(|p| !p.is_flat());
302            // Reservations whose symbol is already open in the cache add
303            // exposure but no new concurrency slot — same rule the gate
304            // applies to the entry under consideration.
305            let open = open + pending.new_slots(cache_open);
306            let already = cache_open(symbol) || pending.contains(symbol);
307            (open, gross + pending.gross_notional(), already)
308        };
309        let account_net_pnl = {
310            let risk = self.ctx.risk.read().await;
311            risk.values().map(|sr| sr.session_pnl.net_pnl()).sum()
312        };
313
314        self.ctx
315            .portfolio
316            .read()
317            .await
318            .check_entry(PortfolioState {
319                open_positions,
320                gross_exposure,
321                new_notional,
322                symbol_already_open,
323                account_net_pnl,
324            })?;
325
326        pending.reserve(symbol.clone(), new_notional);
327        Ok(())
328    }
329
330    async fn build_order(
331        &self,
332        event: &MarketDataEvent,
333        symbol: &Symbol,
334        position: &Position,
335        decision: &Decision,
336    ) -> Option<Order> {
337        match decision.signal {
338            SignalType::Hold => None,
339            SignalType::Close => {
340                // Use the actual position size; size hint is ignored.
341                let Some(close_side) = position.close_side() else {
342                    tracing::debug!(
343                        service = %self.name,
344                        symbol = %symbol,
345                        "decision=Close but position is flat — nothing to do"
346                    );
347                    return None;
348                };
349                let size = Volume(position.qty.abs());
350                Some(Order::market(symbol.clone(), close_side, size).with_reduce_only(true))
351            }
352            SignalType::Buy | SignalType::Sell => {
353                let side = if matches!(decision.signal, SignalType::Buy) {
354                    Side::Buy
355                } else {
356                    Side::Sell
357                };
358                let price = price_from_event(event)?;
359                // Instrument metadata: contract size for sizing, plus tick /
360                // min-notional used below. `contract_value` mirrors the spec.
361                let spec = self.ctx.exchange.instrument_spec(symbol);
362                let contract_value = spec.contract_value;
363                let contracts = size_decision(
364                    self.ctx.sizing.for_symbol(symbol),
365                    decision.size_hint,
366                    price,
367                    contract_value,
368                );
369
370                if contracts == 0 {
371                    self.orders_blocked.fetch_add(1, Ordering::Relaxed);
372                    tracing::warn!(
373                        service = %self.name,
374                        symbol = %symbol,
375                        signal = %decision.signal,
376                        price = price.value(),
377                        contract_value,
378                        "decision blocked: sizer returned 0 contracts"
379                    );
380                    return None;
381                }
382                let size = Volume(contracts as f64);
383
384                let new_notional = f64::from(contracts) * price.value() * contract_value;
385
386                // ── Instrument min-notional gate ──────────────────────
387                // Skip a dust order the venue would reject. No-op when the
388                // adapter reports no minimum (the default).
389                if !spec.meets_min_notional(new_notional) {
390                    self.orders_blocked.fetch_add(1, Ordering::Relaxed);
391                    tracing::warn!(
392                        service = %self.name,
393                        symbol = %symbol,
394                        signal = %decision.signal,
395                        notional = new_notional,
396                        min_notional = spec.min_notional,
397                        "decision blocked: order below instrument min notional"
398                    );
399                    return None;
400                }
401
402                // ── Order-kind capability gate ────────────────────────
403                // Block (don't silently downgrade) a kind the adapter
404                // can't honour — downgrading post-only / IOC / FOK would
405                // change the fill and fee semantics the brain relied on.
406                let kind = decision.order_kind;
407                if let Some(cap) = capability_for_kind(kind)
408                    && !self.ctx.exchange.supports(cap)
409                {
410                    self.orders_blocked.fetch_add(1, Ordering::Relaxed);
411                    tracing::warn!(
412                        service = %self.name,
413                        symbol = %symbol,
414                        signal = %decision.signal,
415                        ?kind,
416                        required = ?cap,
417                        "decision blocked: adapter does not support requested order kind"
418                    );
419                    return None;
420                }
421
422                // ── Gate 3: account-wide portfolio risk (entries only) ─
423                // Exits (the Close arm) are never gated here — only new
424                // risk. Runs LAST so the reservation it records is only
425                // taken when the order is actually about to be placed;
426                // the reservation is released on rejection, on the
427                // fill-driven cache refresh, or by TTL.
428                if let Err(block) = self.check_and_reserve_entry(symbol, new_notional).await {
429                    self.orders_blocked.fetch_add(1, Ordering::Relaxed);
430                    tracing::warn!(
431                        service = %self.name,
432                        symbol = %symbol,
433                        signal = %decision.signal,
434                        reason = %block,
435                        "decision blocked: portfolio risk"
436                    );
437                    return None;
438                }
439
440                // ── Build the base order for the requested kind ───────
441                let order = match kind {
442                    OrderKind::Market => Order::market(symbol.clone(), side, size),
443                    OrderKind::Limit | OrderKind::PostOnly | OrderKind::Ioc | OrderKind::Fok => {
444                        let limit = decision.limit_price.unwrap_or_else(|| {
445                            tracing::warn!(
446                                service = %self.name,
447                                symbol = %symbol,
448                                ?kind,
449                                fallback = price.value(),
450                                "non-market order kind without limit_price; \
451                                 falling back to event price"
452                            );
453                            price
454                        });
455                        // Snap to the venue's price tick (no-op when unknown).
456                        let limit = Price(spec.round_price(limit.value()));
457                        let mut o = Order::limit(symbol.clone(), side, size, limit);
458                        o.kind = kind;
459                        o
460                    }
461                };
462
463                // ── Protective handling ───────────────────────────────
464                // When a full bracket applies (both SL+TP, market entry,
465                // OCO active), the entry stays clean — the two protective
466                // orders are placed separately after the entry fills (see
467                // `place_brackets`). Otherwise attach a single stop/TP (or
468                // fall back / warn) as in 0.2b.
469                if self.is_bracket(decision, order.kind) {
470                    Some(order)
471                } else {
472                    Some(self.attach_protection(order, symbol, decision))
473                }
474            }
475        }
476    }
477
478    /// Does this decision warrant a full SL+TP bracket placed as two
479    /// separate OCO orders? Requires both prices, a market entry, and an
480    /// active OCO registry (which `Bot` only sets when the adapter supports
481    /// `StopOrders` + `OrderTracking` and a fill source is wired).
482    fn is_bracket(&self, decision: &Decision, kind: OrderKind) -> bool {
483        self.ctx.oco.is_some()
484            && matches!(kind, OrderKind::Market)
485            && decision.stop_price.is_some()
486            && decision.take_profit_price.is_some()
487    }
488
489    /// Place the two reduce-only protective legs for a bracket entry and
490    /// register them as an OCO pair. Called after the market entry is
491    /// accepted.
492    ///
493    /// # Failure handling
494    ///
495    /// - **Stop-loss leg fails:** the entry has no protection at all, so
496    ///   the configured [`BracketFailurePolicy`](crate::BracketFailurePolicy)
497    ///   applies — by default the entry is closed with a reduce-only
498    ///   market order.
499    /// - **Take-profit leg fails:** the stop-loss is **kept** (the
500    ///   position stays loss-protected) and the bracket degrades to
501    ///   stop-only, mirroring [`Self::attach_protection`]'s fallback.
502    async fn place_brackets(
503        &self,
504        symbol: &Symbol,
505        entry_side: Side,
506        size: Volume,
507        sl: Price,
508        tp: Price,
509    ) {
510        let Some(oco) = &self.ctx.oco else { return };
511        let close_side = entry_side.opposite();
512        let sl_order = Order::market(symbol.clone(), close_side, size)
513            .with_reduce_only(true)
514            .with_stop(StopAttachment::stop_market(sl));
515        let tp_order = Order::market(symbol.clone(), close_side, size)
516            .with_reduce_only(true)
517            .with_stop(StopAttachment::take_profit(tp));
518
519        let sl_id = match self.ctx.exchange.place_order(&sl_order).await {
520            Ok(id) => id,
521            Err(e) => {
522                tracing::error!(
523                    service = %self.name,
524                    symbol = %symbol,
525                    error = %e,
526                    policy = ?self.ctx.bracket_failure_policy,
527                    "bracket: stop-loss leg failed to place — entry has no protection"
528                );
529                self.handle_unprotected_entry(symbol, entry_side, size)
530                    .await;
531                return;
532            }
533        };
534        let tp_id = match self.ctx.exchange.place_order(&tp_order).await {
535            Ok(id) => id,
536            Err(e) => {
537                // The stop-loss is live, so the position is still
538                // loss-protected — keep it and degrade to stop-only
539                // rather than cancelling the SL and leaving the entry
540                // with nothing.
541                tracing::warn!(
542                    service = %self.name,
543                    symbol = %symbol,
544                    error = %e,
545                    sl_id = %sl_id,
546                    "bracket: take-profit leg failed; keeping the stop-loss \
547                     (degraded to stop-only protection)"
548                );
549                if let Some(tracker) = &self.ctx.order_tracker {
550                    tracker.record(sl_id.clone(), &sl_order).await;
551                }
552                self.orders_placed.fetch_add(1, Ordering::Relaxed);
553                return;
554            }
555        };
556
557        oco.register(symbol.clone(), sl_id.clone(), tp_id.clone())
558            .await;
559        if let Some(tracker) = &self.ctx.order_tracker {
560            tracker.record(sl_id.clone(), &sl_order).await;
561            tracker.record(tp_id.clone(), &tp_order).await;
562        }
563        self.orders_placed.fetch_add(2, Ordering::Relaxed);
564        tracing::info!(
565            service = %self.name,
566            symbol = %symbol,
567            close_side = ?close_side,
568            stop = sl.value(),
569            take_profit = tp.value(),
570            sl_id = %sl_id,
571            tp_id = %tp_id,
572            "bracket placed (SL + TP, OCO-linked)"
573        );
574    }
575
576    /// Apply the configured [`BracketFailurePolicy`](crate::BracketFailurePolicy)
577    /// to a bracket entry whose stop-loss leg could not be placed.
578    async fn handle_unprotected_entry(&self, symbol: &Symbol, entry_side: Side, size: Volume) {
579        use crate::bot::BracketFailurePolicy;
580        match self.ctx.bracket_failure_policy {
581            BracketFailurePolicy::CloseEntry => {
582                let close = Order::market(symbol.clone(), entry_side.opposite(), size)
583                    .with_reduce_only(true);
584                match self.ctx.exchange.place_order(&close).await {
585                    Ok(id) => {
586                        self.orders_placed.fetch_add(1, Ordering::Relaxed);
587                        tracing::warn!(
588                            service = %self.name,
589                            symbol = %symbol,
590                            close_id = %id,
591                            "bracket: closed the unprotected entry (BracketFailurePolicy::CloseEntry)"
592                        );
593                    }
594                    Err(e) => {
595                        tracing::error!(
596                            service = %self.name,
597                            symbol = %symbol,
598                            error = %e,
599                            "bracket: FAILED to close the unprotected entry — \
600                             an unprotected position is resting on the exchange; \
601                             manual intervention required"
602                        );
603                    }
604                }
605            }
606            BracketFailurePolicy::KeepUnprotected => {
607                tracing::error!(
608                    service = %self.name,
609                    symbol = %symbol,
610                    "bracket: entry left open WITHOUT protection \
611                     (BracketFailurePolicy::KeepUnprotected)"
612                );
613            }
614        }
615    }
616
617    /// Attach a **single** protective [`StopAttachment`] to the entry order,
618    /// gated on [`Capability::StopOrders`]. This is the fallback path used
619    /// when a full bracket doesn't apply — i.e. only one of SL/TP is set, the
620    /// entry is a limit order, or bracket prerequisites aren't met (no fill
621    /// source, or the adapter lacks `StopOrders`/`OrderTracking`).
622    ///
623    /// When *both* SL and TP are set but brackets are inactive, the single
624    /// `Order.stop` field can only carry one, so the protective **stop-loss**
625    /// takes priority and the take-profit is logged as dropped — full SL+TP
626    /// is handled by [`Self::place_brackets`] when brackets are active. When
627    /// the adapter lacks `StopOrders`, the order is placed **without**
628    /// protection and a warning is emitted (never silently dropped) — a brain
629    /// that requires stops can introspect `supports` itself.
630    fn attach_protection(&self, order: Order, symbol: &Symbol, decision: &Decision) -> Order {
631        let stop = match (decision.stop_price, decision.take_profit_price) {
632            (Some(sl), Some(_tp)) => {
633                tracing::warn!(
634                    service = %self.name,
635                    symbol = %symbol,
636                    "both stop_price and take_profit_price set but brackets inactive \
637                     (needs StopOrders + OrderTracking + a fill source); attaching \
638                     stop-loss only"
639                );
640                StopAttachment::stop_market(sl)
641            }
642            (Some(sl), None) => StopAttachment::stop_market(sl),
643            (None, Some(tp)) => StopAttachment::take_profit(tp),
644            (None, None) => return order,
645        };
646
647        if self.ctx.exchange.supports(Capability::StopOrders) {
648            order.with_stop(stop)
649        } else {
650            tracing::warn!(
651                service = %self.name,
652                symbol = %symbol,
653                "protective stop / take-profit requested but adapter lacks \
654                 Capability::StopOrders; placing order WITHOUT protection"
655            );
656            order
657        }
658    }
659}
660
661/// The adapter [`Capability`] a given [`OrderKind`] requires, if any.
662/// `Market` and `Limit` are assumed universally supported.
663fn capability_for_kind(kind: OrderKind) -> Option<Capability> {
664    match kind {
665        OrderKind::Market | OrderKind::Limit => None,
666        OrderKind::PostOnly => Some(Capability::PostOnly),
667        OrderKind::Ioc => Some(Capability::Ioc),
668        OrderKind::Fok => Some(Capability::Fok),
669    }
670}
671
672#[async_trait]
673impl TradingService for ExecutionService {
674    fn name(&self) -> &str {
675        &self.name
676    }
677
678    fn restart_policy(&self) -> RestartPolicy {
679        RestartPolicy::OnFailure
680    }
681
682    async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
683        let mut rx = self.ctx.bus.subscribe();
684        tracing::info!(service = %self.name, "execution service subscribed");
685
686        loop {
687            tokio::select! {
688                _ = cancel.cancelled() => {
689                    tracing::info!(
690                        service = %self.name,
691                        events = self.events_processed(),
692                        dropped = self.events_dropped(),
693                        placed = self.orders_placed(),
694                        blocked = self.orders_blocked(),
695                        "execution service shutting down"
696                    );
697                    return Ok(());
698                }
699                next = rx.recv() => match next {
700                    Ok(event) => {
701                        if let Err(e) = self.handle_event(&event).await {
702                            tracing::error!(
703                                service = %self.name,
704                                error = %e,
705                                "brain returned error from on_event — service continuing"
706                            );
707                        }
708                    }
709                    Err(RecvError::Lagged(skipped)) => {
710                        self.events_dropped.fetch_add(skipped, Ordering::Relaxed);
711                        tracing::warn!(
712                            service = %self.name,
713                            skipped,
714                            "market data bus lagged — events dropped"
715                        );
716                    }
717                    Err(RecvError::Closed) => {
718                        tracing::info!(service = %self.name, "market data bus closed");
719                        return Ok(());
720                    }
721                },
722            }
723        }
724    }
725}
726
727// ── Helpers ─────────────────────────────────────────────────────────────
728
729fn price_from_event(event: &MarketDataEvent) -> Option<Price> {
730    match event {
731        MarketDataEvent::Candle { candle, .. } => Some(Price(candle.close)),
732        MarketDataEvent::Ticker { tick, .. } => Some(tick.mid_price()),
733        MarketDataEvent::Trade { price, .. } => Some(Price(*price)),
734    }
735}
736
737/// Translate the brain's `SizeHint` into a contract count via the sizer.
738fn size_decision(sizing: &SizingConfig, hint: SizeHint, price: Price, contract_value: f64) -> u32 {
739    let sizer = PositionSizer::new(sizing.clone());
740    match hint {
741        SizeHint::Default => sizer.contracts(price.value(), contract_value),
742        SizeHint::MarginFraction(f) => {
743            // Fraction of the configured default margin — clamped to [0, 1].
744            let f = f.clamp(0.0, 1.0);
745            let margin = sizing.margin_per_trade * f;
746            sizer.contracts_with_margin(margin, price.value(), contract_value)
747        }
748        SizeHint::NotionalUsd(n) => {
749            // notional = margin × leverage  →  margin = notional / leverage
750            let leverage = sizing.leverage.max(1);
751            let margin = n / f64::from(leverage);
752            sizer.contracts_with_margin(margin, price.value(), contract_value)
753        }
754        SizeHint::Quantity(q) => {
755            // The brain knows what it wants — clamp to max and floor.
756            let raw = q.value().max(0.0).floor() as u32;
757            raw.min(sizing.max_contracts)
758        }
759    }
760}