1use std::collections::HashMap;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicU64, Ordering};
22
23use async_trait::async_trait;
24use chrono::Utc;
25use rustrade_core::{
26 Brain, Capability, Decision, ExchangeClient, MarketDataBus, MarketDataEvent, Order, OrderKind,
27 Position, Price, Side, Signal, SignalBus, SignalType, SizeHint, StopAttachment, Symbol, Volume,
28};
29use rustrade_risk::{PortfolioState, PositionSizer, SizingConfig};
30use rustrade_supervisor::{RestartPolicy, TradingService};
31use tokio::sync::broadcast::error::RecvError;
32use tokio_util::sync::CancellationToken;
33
34use crate::risk_state::{PortfolioRiskState, PositionCache, RiskStateMap};
35
36#[derive(Clone)]
41pub(crate) struct ExecutionContext {
42 pub exchange: Arc<dyn ExchangeClient>,
43 pub bus: MarketDataBus,
44 pub signals: SignalBus,
45 pub positions: PositionCache,
46 pub risk: RiskStateMap,
47 pub portfolio: PortfolioRiskState,
50 pub sizing: Arc<SymbolSizing>,
53 pub order_tracker: Option<crate::order_tracker::OrderTracker>,
57 pub oco: Option<crate::order_tracker::OcoRegistry>,
63}
64
65pub(crate) struct SymbolSizing {
68 default: SizingConfig,
69 per_symbol: HashMap<Symbol, SizingConfig>,
70}
71
72impl SymbolSizing {
73 pub(crate) fn new(default: SizingConfig, per_symbol: HashMap<Symbol, SizingConfig>) -> Self {
74 Self {
75 default,
76 per_symbol,
77 }
78 }
79
80 pub(crate) fn for_symbol(&self, symbol: &Symbol) -> &SizingConfig {
81 self.per_symbol.get(symbol).unwrap_or(&self.default)
82 }
83}
84
85pub struct ExecutionService {
87 name: String,
88 brain: Arc<dyn Brain>,
89 ctx: ExecutionContext,
90 owned: Option<std::collections::HashSet<Symbol>>,
94 events_processed: AtomicU64,
95 events_dropped: AtomicU64,
96 orders_placed: AtomicU64,
97 orders_blocked: AtomicU64,
98}
99
100impl ExecutionService {
101 pub(crate) fn new(brain: Arc<dyn Brain>, ctx: ExecutionContext) -> Self {
102 let name = format!("execution[{}]", brain.name());
103 let owned = brain
104 .owned_symbols()
105 .map(|syms| syms.into_iter().collect::<std::collections::HashSet<_>>());
106 Self {
107 name,
108 brain,
109 ctx,
110 owned,
111 events_processed: AtomicU64::new(0),
112 events_dropped: AtomicU64::new(0),
113 orders_placed: AtomicU64::new(0),
114 orders_blocked: AtomicU64::new(0),
115 }
116 }
117
118 pub fn events_processed(&self) -> u64 {
120 self.events_processed.load(Ordering::Relaxed)
121 }
122 pub fn events_dropped(&self) -> u64 {
124 self.events_dropped.load(Ordering::Relaxed)
125 }
126 pub fn orders_placed(&self) -> u64 {
128 self.orders_placed.load(Ordering::Relaxed)
129 }
130 pub fn orders_blocked(&self) -> u64 {
132 self.orders_blocked.load(Ordering::Relaxed)
133 }
134
135 async fn position_for(&self, symbol: &Symbol) -> Position {
136 self.ctx
137 .positions
138 .read()
139 .await
140 .get(symbol)
141 .copied()
142 .unwrap_or(Position::FLAT)
143 }
144
145 async fn handle_event(&self, event: &MarketDataEvent) -> anyhow::Result<()> {
146 let symbol = event.symbol().clone();
147
148 if let Some(owned) = &self.owned
152 && !owned.contains(&symbol)
153 {
154 return Ok(());
155 }
156
157 let position = self.position_for(&symbol).await;
158
159 let decision = self.brain.on_event(event, &position).await?;
160 self.events_processed.fetch_add(1, Ordering::Relaxed);
161
162 let signal = decision.signal;
163 if matches!(signal, SignalType::Hold) {
164 return Ok(());
165 }
166
167 let published = self.ctx.signals.publish(Signal {
171 id: format!("{}-{}", self.brain.name(), self.events_processed()),
172 symbol: symbol.as_str().to_string(),
173 kind: signal,
174 confidence: decision.confidence,
175 timestamp: Utc::now(),
176 source: self.brain.name().to_string(),
177 metadata: decision.metadata.clone(),
178 });
179 let _ = published;
180
181 if let Some(risk) = self.ctx.risk.read().await.get(&symbol)
183 && risk.session_pnl.is_session_halted()
184 {
185 self.orders_blocked.fetch_add(1, Ordering::Relaxed);
186 tracing::warn!(
187 service = %self.name,
188 symbol = %symbol,
189 signal = %signal,
190 "decision blocked: session PnL halted"
191 );
192 return Ok(());
193 }
194
195 if let Some(risk) = self.ctx.risk.read().await.get(&symbol)
197 && risk.circuit_breaker.is_tripped()
198 {
199 self.orders_blocked.fetch_add(1, Ordering::Relaxed);
200 tracing::warn!(
201 service = %self.name,
202 symbol = %symbol,
203 signal = %signal,
204 cooldown_secs = ?risk.circuit_breaker.cooldown_remaining(),
205 "decision blocked: circuit breaker tripped"
206 );
207 return Ok(());
208 }
209
210 let order = match self.build_order(event, &symbol, &position, &decision).await {
212 Some(o) => o,
213 None => return Ok(()),
214 };
215
216 match self.ctx.exchange.place_order(&order).await {
218 Ok(id) => {
219 self.orders_placed.fetch_add(1, Ordering::Relaxed);
220 if let Some(tracker) = &self.ctx.order_tracker {
223 tracker.record(id.clone(), &order).await;
224 }
225 tracing::info!(
226 service = %self.name,
227 symbol = %symbol,
228 side = ?order.side,
229 size = %order.size,
230 reduce_only = order.reduce_only,
231 order_id = %id,
232 "order placed"
233 );
234 if self.is_bracket(&decision, order.kind)
237 && let (Some(sl), Some(tp)) = (decision.stop_price, decision.take_profit_price)
238 {
239 self.place_brackets(&symbol, order.side, order.size, sl, tp)
240 .await;
241 }
242 }
243 Err(e) => {
244 tracing::error!(
245 service = %self.name,
246 symbol = %symbol,
247 error = %e,
248 "exchange rejected order — risk state unchanged"
249 );
250 }
251 }
252 Ok(())
253 }
254
255 async fn portfolio_state(&self, symbol: &Symbol, new_notional: f64) -> PortfolioState {
261 let (open_positions, gross_exposure, symbol_already_open) = {
262 let positions = self.ctx.positions.read().await;
263 let mut open = 0u32;
264 let mut gross = 0.0;
265 for (sym, p) in positions.iter() {
266 if p.is_flat() {
267 continue;
268 }
269 open += 1;
270 let px = p.entry_price.unwrap_or(0.0);
271 gross += p.qty.abs() * px * self.ctx.exchange.contract_value(sym);
272 }
273 let already = positions.get(symbol).is_some_and(|p| !p.is_flat());
274 (open, gross, already)
275 };
276 let account_net_pnl = {
277 let risk = self.ctx.risk.read().await;
278 risk.values().map(|sr| sr.session_pnl.net_pnl()).sum()
279 };
280 PortfolioState {
281 open_positions,
282 gross_exposure,
283 new_notional,
284 symbol_already_open,
285 account_net_pnl,
286 }
287 }
288
289 async fn build_order(
290 &self,
291 event: &MarketDataEvent,
292 symbol: &Symbol,
293 position: &Position,
294 decision: &Decision,
295 ) -> Option<Order> {
296 match decision.signal {
297 SignalType::Hold => None,
298 SignalType::Close => {
299 let Some(close_side) = position.close_side() else {
301 tracing::debug!(
302 service = %self.name,
303 symbol = %symbol,
304 "decision=Close but position is flat — nothing to do"
305 );
306 return None;
307 };
308 let size = Volume(position.qty.abs());
309 Some(Order::market(symbol.clone(), close_side, size).with_reduce_only(true))
310 }
311 SignalType::Buy | SignalType::Sell => {
312 let side = if matches!(decision.signal, SignalType::Buy) {
313 Side::Buy
314 } else {
315 Side::Sell
316 };
317 let price = price_from_event(event)?;
318 let spec = self.ctx.exchange.instrument_spec(symbol);
321 let contract_value = spec.contract_value;
322 let contracts = size_decision(
323 self.ctx.sizing.for_symbol(symbol),
324 decision.size_hint,
325 price,
326 contract_value,
327 );
328
329 if contracts == 0 {
330 self.orders_blocked.fetch_add(1, Ordering::Relaxed);
331 tracing::warn!(
332 service = %self.name,
333 symbol = %symbol,
334 signal = %decision.signal,
335 price = price.value(),
336 contract_value,
337 "decision blocked: sizer returned 0 contracts"
338 );
339 return None;
340 }
341 let size = Volume(contracts as f64);
342
343 let new_notional = f64::from(contracts) * price.value() * contract_value;
346 let pf_state = self.portfolio_state(symbol, new_notional).await;
347 if let Err(block) = self.ctx.portfolio.read().await.check_entry(pf_state) {
348 self.orders_blocked.fetch_add(1, Ordering::Relaxed);
349 tracing::warn!(
350 service = %self.name,
351 symbol = %symbol,
352 signal = %decision.signal,
353 reason = %block,
354 "decision blocked: portfolio risk"
355 );
356 return None;
357 }
358
359 if !spec.meets_min_notional(new_notional) {
363 self.orders_blocked.fetch_add(1, Ordering::Relaxed);
364 tracing::warn!(
365 service = %self.name,
366 symbol = %symbol,
367 signal = %decision.signal,
368 notional = new_notional,
369 min_notional = spec.min_notional,
370 "decision blocked: order below instrument min notional"
371 );
372 return None;
373 }
374
375 let kind = decision.order_kind;
380 if let Some(cap) = capability_for_kind(kind)
381 && !self.ctx.exchange.supports(cap)
382 {
383 self.orders_blocked.fetch_add(1, Ordering::Relaxed);
384 tracing::warn!(
385 service = %self.name,
386 symbol = %symbol,
387 signal = %decision.signal,
388 ?kind,
389 required = ?cap,
390 "decision blocked: adapter does not support requested order kind"
391 );
392 return None;
393 }
394
395 let order = match kind {
397 OrderKind::Market => Order::market(symbol.clone(), side, size),
398 OrderKind::Limit | OrderKind::PostOnly | OrderKind::Ioc | OrderKind::Fok => {
399 let limit = decision.limit_price.unwrap_or_else(|| {
400 tracing::warn!(
401 service = %self.name,
402 symbol = %symbol,
403 ?kind,
404 fallback = price.value(),
405 "non-market order kind without limit_price; \
406 falling back to event price"
407 );
408 price
409 });
410 let limit = Price(spec.round_price(limit.value()));
412 let mut o = Order::limit(symbol.clone(), side, size, limit);
413 o.kind = kind;
414 o
415 }
416 };
417
418 if self.is_bracket(decision, order.kind) {
425 Some(order)
426 } else {
427 Some(self.attach_protection(order, symbol, decision))
428 }
429 }
430 }
431 }
432
433 fn is_bracket(&self, decision: &Decision, kind: OrderKind) -> bool {
438 self.ctx.oco.is_some()
439 && matches!(kind, OrderKind::Market)
440 && decision.stop_price.is_some()
441 && decision.take_profit_price.is_some()
442 }
443
444 async fn place_brackets(
449 &self,
450 symbol: &Symbol,
451 entry_side: Side,
452 size: Volume,
453 sl: Price,
454 tp: Price,
455 ) {
456 let Some(oco) = &self.ctx.oco else { return };
457 let close_side = entry_side.opposite();
458 let sl_order = Order::market(symbol.clone(), close_side, size)
459 .with_reduce_only(true)
460 .with_stop(StopAttachment::stop_market(sl));
461 let tp_order = Order::market(symbol.clone(), close_side, size)
462 .with_reduce_only(true)
463 .with_stop(StopAttachment::take_profit(tp));
464
465 let sl_id = match self.ctx.exchange.place_order(&sl_order).await {
466 Ok(id) => id,
467 Err(e) => {
468 tracing::error!(service = %self.name, symbol = %symbol, error = %e, "bracket: stop-loss leg failed to place; entry is UNPROTECTED");
469 return;
470 }
471 };
472 let tp_id = match self.ctx.exchange.place_order(&tp_order).await {
473 Ok(id) => id,
474 Err(e) => {
475 tracing::error!(service = %self.name, symbol = %symbol, error = %e, "bracket: take-profit leg failed; cancelling the stop-loss leg to avoid an orphan");
476 let _ = self.ctx.exchange.cancel_order(symbol, &sl_id).await;
477 return;
478 }
479 };
480
481 oco.register(symbol.clone(), sl_id.clone(), tp_id.clone())
482 .await;
483 if let Some(tracker) = &self.ctx.order_tracker {
484 tracker.record(sl_id.clone(), &sl_order).await;
485 tracker.record(tp_id.clone(), &tp_order).await;
486 }
487 self.orders_placed.fetch_add(2, Ordering::Relaxed);
488 tracing::info!(
489 service = %self.name,
490 symbol = %symbol,
491 close_side = ?close_side,
492 stop = sl.value(),
493 take_profit = tp.value(),
494 sl_id = %sl_id,
495 tp_id = %tp_id,
496 "bracket placed (SL + TP, OCO-linked)"
497 );
498 }
499
500 fn attach_protection(&self, order: Order, symbol: &Symbol, decision: &Decision) -> Order {
514 let stop = match (decision.stop_price, decision.take_profit_price) {
515 (Some(sl), Some(_tp)) => {
516 tracing::warn!(
517 service = %self.name,
518 symbol = %symbol,
519 "both stop_price and take_profit_price set but brackets inactive \
520 (needs StopOrders + OrderTracking + a fill source); attaching \
521 stop-loss only"
522 );
523 StopAttachment::stop_market(sl)
524 }
525 (Some(sl), None) => StopAttachment::stop_market(sl),
526 (None, Some(tp)) => StopAttachment::take_profit(tp),
527 (None, None) => return order,
528 };
529
530 if self.ctx.exchange.supports(Capability::StopOrders) {
531 order.with_stop(stop)
532 } else {
533 tracing::warn!(
534 service = %self.name,
535 symbol = %symbol,
536 "protective stop / take-profit requested but adapter lacks \
537 Capability::StopOrders; placing order WITHOUT protection"
538 );
539 order
540 }
541 }
542}
543
544fn capability_for_kind(kind: OrderKind) -> Option<Capability> {
547 match kind {
548 OrderKind::Market | OrderKind::Limit => None,
549 OrderKind::PostOnly => Some(Capability::PostOnly),
550 OrderKind::Ioc => Some(Capability::Ioc),
551 OrderKind::Fok => Some(Capability::Fok),
552 }
553}
554
555#[async_trait]
556impl TradingService for ExecutionService {
557 fn name(&self) -> &str {
558 &self.name
559 }
560
561 fn restart_policy(&self) -> RestartPolicy {
562 RestartPolicy::OnFailure
563 }
564
565 async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
566 let mut rx = self.ctx.bus.subscribe();
567 tracing::info!(service = %self.name, "execution service subscribed");
568
569 loop {
570 tokio::select! {
571 _ = cancel.cancelled() => {
572 tracing::info!(
573 service = %self.name,
574 events = self.events_processed(),
575 dropped = self.events_dropped(),
576 placed = self.orders_placed(),
577 blocked = self.orders_blocked(),
578 "execution service shutting down"
579 );
580 return Ok(());
581 }
582 next = rx.recv() => match next {
583 Ok(event) => {
584 if let Err(e) = self.handle_event(&event).await {
585 tracing::error!(
586 service = %self.name,
587 error = %e,
588 "brain returned error from on_event — service continuing"
589 );
590 }
591 }
592 Err(RecvError::Lagged(skipped)) => {
593 self.events_dropped.fetch_add(skipped, Ordering::Relaxed);
594 tracing::warn!(
595 service = %self.name,
596 skipped,
597 "market data bus lagged — events dropped"
598 );
599 }
600 Err(RecvError::Closed) => {
601 tracing::info!(service = %self.name, "market data bus closed");
602 return Ok(());
603 }
604 },
605 }
606 }
607 }
608}
609
610fn price_from_event(event: &MarketDataEvent) -> Option<Price> {
613 match event {
614 MarketDataEvent::Candle { candle, .. } => Some(Price(candle.close)),
615 MarketDataEvent::Ticker { tick, .. } => Some(tick.mid_price()),
616 MarketDataEvent::Trade { price, .. } => Some(Price(*price)),
617 }
618}
619
620fn size_decision(sizing: &SizingConfig, hint: SizeHint, price: Price, contract_value: f64) -> u32 {
622 let sizer = PositionSizer::new(sizing.clone());
623 match hint {
624 SizeHint::Default => sizer.contracts(price.value(), contract_value),
625 SizeHint::MarginFraction(f) => {
626 let f = f.clamp(0.0, 1.0);
628 let margin = sizing.margin_per_trade * f;
629 sizer.contracts_with_margin(margin, price.value(), contract_value)
630 }
631 SizeHint::NotionalUsd(n) => {
632 let leverage = sizing.leverage.max(1);
634 let margin = n / f64::from(leverage);
635 sizer.contracts_with_margin(margin, price.value(), contract_value)
636 }
637 SizeHint::Quantity(q) => {
638 let raw = q.value().max(0.0).floor() as u32;
640 raw.min(sizing.max_contracts)
641 }
642 }
643}