1use std::collections::HashMap;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicU64, Ordering};
22
23use async_trait::async_trait;
24use chrono::Utc;
25use rustrade_core::{
26 Brain, Capability, Decision, ExchangeClient, MarketDataBus, MarketDataEvent, Order, OrderKind,
27 Position, Price, Side, Signal, SignalBus, SignalType, SizeHint, StopAttachment, Symbol, Volume,
28};
29use rustrade_risk::{PortfolioState, PositionSizer, SizingConfig};
30use rustrade_supervisor::{RestartPolicy, TradingService};
31use tokio::sync::broadcast::error::RecvError;
32use tokio_util::sync::CancellationToken;
33
34use crate::pending::PendingEntryLedger;
35use crate::risk_state::{PortfolioRiskState, PositionCache, RiskStateMap};
36
37#[derive(Clone)]
42pub(crate) struct ExecutionContext {
43 pub exchange: Arc<dyn ExchangeClient>,
44 pub bus: MarketDataBus,
45 pub signals: SignalBus,
46 pub positions: PositionCache,
47 pub risk: RiskStateMap,
48 pub portfolio: PortfolioRiskState,
51 pub sizing: Arc<SymbolSizing>,
54 pub order_tracker: Option<crate::order_tracker::OrderTracker>,
58 pub oco: Option<crate::order_tracker::OcoRegistry>,
64 pub bracket_failure_policy: crate::bot::BracketFailurePolicy,
67 pub pending: PendingEntryLedger,
72}
73
74pub(crate) struct SymbolSizing {
77 default: SizingConfig,
78 per_symbol: HashMap<Symbol, SizingConfig>,
79}
80
81impl SymbolSizing {
82 pub(crate) fn new(default: SizingConfig, per_symbol: HashMap<Symbol, SizingConfig>) -> Self {
83 Self {
84 default,
85 per_symbol,
86 }
87 }
88
89 pub(crate) fn for_symbol(&self, symbol: &Symbol) -> &SizingConfig {
90 self.per_symbol.get(symbol).unwrap_or(&self.default)
91 }
92}
93
94pub struct ExecutionService {
96 name: String,
97 brain: Arc<dyn Brain>,
98 ctx: ExecutionContext,
99 owned: Option<std::collections::HashSet<Symbol>>,
103 events_processed: AtomicU64,
104 events_dropped: AtomicU64,
105 orders_placed: AtomicU64,
106 orders_blocked: AtomicU64,
107}
108
109impl ExecutionService {
110 pub(crate) fn new(brain: Arc<dyn Brain>, ctx: ExecutionContext) -> Self {
111 let name = format!("execution[{}]", brain.name());
112 let owned = brain
113 .owned_symbols()
114 .map(|syms| syms.into_iter().collect::<std::collections::HashSet<_>>());
115 Self {
116 name,
117 brain,
118 ctx,
119 owned,
120 events_processed: AtomicU64::new(0),
121 events_dropped: AtomicU64::new(0),
122 orders_placed: AtomicU64::new(0),
123 orders_blocked: AtomicU64::new(0),
124 }
125 }
126
127 pub fn events_processed(&self) -> u64 {
129 self.events_processed.load(Ordering::Relaxed)
130 }
131 pub fn events_dropped(&self) -> u64 {
133 self.events_dropped.load(Ordering::Relaxed)
134 }
135 pub fn orders_placed(&self) -> u64 {
137 self.orders_placed.load(Ordering::Relaxed)
138 }
139 pub fn orders_blocked(&self) -> u64 {
141 self.orders_blocked.load(Ordering::Relaxed)
142 }
143
144 async fn position_for(&self, symbol: &Symbol) -> Position {
145 self.ctx
146 .positions
147 .read()
148 .await
149 .get(symbol)
150 .copied()
151 .unwrap_or(Position::FLAT)
152 }
153
154 async fn handle_event(&self, event: &MarketDataEvent) -> anyhow::Result<()> {
155 let symbol = event.symbol().clone();
156
157 if let Some(owned) = &self.owned
161 && !owned.contains(&symbol)
162 {
163 return Ok(());
164 }
165
166 let position = self.position_for(&symbol).await;
167
168 let decision = self.brain.on_event(event, &position).await?;
169 self.events_processed.fetch_add(1, Ordering::Relaxed);
170
171 let signal = decision.signal;
172 if matches!(signal, SignalType::Hold) {
173 return Ok(());
174 }
175
176 let published = self.ctx.signals.publish(Signal {
180 id: format!("{}-{}", self.brain.name(), self.events_processed()),
181 symbol: symbol.as_str().to_string(),
182 kind: signal,
183 confidence: decision.confidence,
184 timestamp: Utc::now(),
185 source: self.brain.name().to_string(),
186 metadata: decision.metadata.clone(),
187 });
188 let _ = published;
189
190 if let Some(risk) = self.ctx.risk.read().await.get(&symbol)
192 && risk.session_pnl.is_session_halted()
193 {
194 self.orders_blocked.fetch_add(1, Ordering::Relaxed);
195 tracing::warn!(
196 service = %self.name,
197 symbol = %symbol,
198 signal = %signal,
199 "decision blocked: session PnL halted"
200 );
201 return Ok(());
202 }
203
204 if let Some(risk) = self.ctx.risk.read().await.get(&symbol)
206 && risk.circuit_breaker.is_tripped()
207 {
208 self.orders_blocked.fetch_add(1, Ordering::Relaxed);
209 tracing::warn!(
210 service = %self.name,
211 symbol = %symbol,
212 signal = %signal,
213 cooldown_secs = ?risk.circuit_breaker.cooldown_remaining(),
214 "decision blocked: circuit breaker tripped"
215 );
216 return Ok(());
217 }
218
219 let order = match self.build_order(event, &symbol, &position, &decision).await {
221 Some(o) => o,
222 None => return Ok(()),
223 };
224
225 match self.ctx.exchange.place_order(&order).await {
227 Ok(id) => {
228 self.orders_placed.fetch_add(1, Ordering::Relaxed);
229 if let Some(tracker) = &self.ctx.order_tracker {
232 tracker.record(id.clone(), &order).await;
233 }
234 tracing::info!(
235 service = %self.name,
236 symbol = %symbol,
237 side = ?order.side,
238 size = %order.size,
239 reduce_only = order.reduce_only,
240 order_id = %id,
241 "order placed"
242 );
243 if self.is_bracket(&decision, order.kind)
246 && let (Some(sl), Some(tp)) = (decision.stop_price, decision.take_profit_price)
247 {
248 self.place_brackets(&symbol, order.side, order.size, sl, tp)
249 .await;
250 }
251 }
252 Err(e) => {
253 if matches!(signal, SignalType::Buy | SignalType::Sell) {
256 self.ctx.pending.release(&symbol).await;
257 }
258 tracing::error!(
259 service = %self.name,
260 symbol = %symbol,
261 error = %e,
262 "exchange rejected order — risk state unchanged"
263 );
264 }
265 }
266 Ok(())
267 }
268
269 async fn check_and_reserve_entry(
283 &self,
284 symbol: &Symbol,
285 new_notional: f64,
286 ) -> Result<(), rustrade_risk::PortfolioBlock> {
287 let mut pending = self.ctx.pending.lock().await;
288
289 let (open_positions, gross_exposure, symbol_already_open) = {
290 let positions = self.ctx.positions.read().await;
291 let mut open = 0u32;
292 let mut gross = 0.0;
293 for (sym, p) in positions.iter() {
294 if p.is_flat() {
295 continue;
296 }
297 open += 1;
298 let px = p.entry_price.unwrap_or(0.0);
299 gross += p.qty.abs() * px * self.ctx.exchange.contract_value(sym);
300 }
301 let cache_open = |s: &Symbol| positions.get(s).is_some_and(|p| !p.is_flat());
302 let open = open + pending.new_slots(cache_open);
306 let already = cache_open(symbol) || pending.contains(symbol);
307 (open, gross + pending.gross_notional(), already)
308 };
309 let account_net_pnl = {
310 let risk = self.ctx.risk.read().await;
311 risk.values().map(|sr| sr.session_pnl.net_pnl()).sum()
312 };
313
314 self.ctx
315 .portfolio
316 .read()
317 .await
318 .check_entry(PortfolioState {
319 open_positions,
320 gross_exposure,
321 new_notional,
322 symbol_already_open,
323 account_net_pnl,
324 })?;
325
326 pending.reserve(symbol.clone(), new_notional);
327 Ok(())
328 }
329
330 async fn build_order(
331 &self,
332 event: &MarketDataEvent,
333 symbol: &Symbol,
334 position: &Position,
335 decision: &Decision,
336 ) -> Option<Order> {
337 match decision.signal {
338 SignalType::Hold => None,
339 SignalType::Close => {
340 let Some(close_side) = position.close_side() else {
342 tracing::debug!(
343 service = %self.name,
344 symbol = %symbol,
345 "decision=Close but position is flat — nothing to do"
346 );
347 return None;
348 };
349 let size = Volume(position.qty.abs());
350 Some(Order::market(symbol.clone(), close_side, size).with_reduce_only(true))
351 }
352 SignalType::Buy | SignalType::Sell => {
353 let side = if matches!(decision.signal, SignalType::Buy) {
354 Side::Buy
355 } else {
356 Side::Sell
357 };
358 let price = price_from_event(event)?;
359 let spec = self.ctx.exchange.instrument_spec(symbol);
362 let contract_value = spec.contract_value;
363 let contracts = size_decision(
364 self.ctx.sizing.for_symbol(symbol),
365 decision.size_hint,
366 price,
367 contract_value,
368 );
369
370 if contracts == 0 {
371 self.orders_blocked.fetch_add(1, Ordering::Relaxed);
372 tracing::warn!(
373 service = %self.name,
374 symbol = %symbol,
375 signal = %decision.signal,
376 price = price.value(),
377 contract_value,
378 "decision blocked: sizer returned 0 contracts"
379 );
380 return None;
381 }
382 let size = Volume(contracts as f64);
383
384 let new_notional = f64::from(contracts) * price.value() * contract_value;
385
386 if !spec.meets_min_notional(new_notional) {
390 self.orders_blocked.fetch_add(1, Ordering::Relaxed);
391 tracing::warn!(
392 service = %self.name,
393 symbol = %symbol,
394 signal = %decision.signal,
395 notional = new_notional,
396 min_notional = spec.min_notional,
397 "decision blocked: order below instrument min notional"
398 );
399 return None;
400 }
401
402 let kind = decision.order_kind;
407 if let Some(cap) = capability_for_kind(kind)
408 && !self.ctx.exchange.supports(cap)
409 {
410 self.orders_blocked.fetch_add(1, Ordering::Relaxed);
411 tracing::warn!(
412 service = %self.name,
413 symbol = %symbol,
414 signal = %decision.signal,
415 ?kind,
416 required = ?cap,
417 "decision blocked: adapter does not support requested order kind"
418 );
419 return None;
420 }
421
422 if let Err(block) = self.check_and_reserve_entry(symbol, new_notional).await {
429 self.orders_blocked.fetch_add(1, Ordering::Relaxed);
430 tracing::warn!(
431 service = %self.name,
432 symbol = %symbol,
433 signal = %decision.signal,
434 reason = %block,
435 "decision blocked: portfolio risk"
436 );
437 return None;
438 }
439
440 let order = match kind {
442 OrderKind::Market => Order::market(symbol.clone(), side, size),
443 OrderKind::Limit | OrderKind::PostOnly | OrderKind::Ioc | OrderKind::Fok => {
444 let limit = decision.limit_price.unwrap_or_else(|| {
445 tracing::warn!(
446 service = %self.name,
447 symbol = %symbol,
448 ?kind,
449 fallback = price.value(),
450 "non-market order kind without limit_price; \
451 falling back to event price"
452 );
453 price
454 });
455 let limit = Price(spec.round_price(limit.value()));
457 let mut o = Order::limit(symbol.clone(), side, size, limit);
458 o.kind = kind;
459 o
460 }
461 };
462
463 if self.is_bracket(decision, order.kind) {
470 Some(order)
471 } else {
472 Some(self.attach_protection(order, symbol, decision))
473 }
474 }
475 }
476 }
477
478 fn is_bracket(&self, decision: &Decision, kind: OrderKind) -> bool {
483 self.ctx.oco.is_some()
484 && matches!(kind, OrderKind::Market)
485 && decision.stop_price.is_some()
486 && decision.take_profit_price.is_some()
487 }
488
489 async fn place_brackets(
503 &self,
504 symbol: &Symbol,
505 entry_side: Side,
506 size: Volume,
507 sl: Price,
508 tp: Price,
509 ) {
510 let Some(oco) = &self.ctx.oco else { return };
511 let close_side = entry_side.opposite();
512 let sl_order = Order::market(symbol.clone(), close_side, size)
513 .with_reduce_only(true)
514 .with_stop(StopAttachment::stop_market(sl));
515 let tp_order = Order::market(symbol.clone(), close_side, size)
516 .with_reduce_only(true)
517 .with_stop(StopAttachment::take_profit(tp));
518
519 let sl_id = match self.ctx.exchange.place_order(&sl_order).await {
520 Ok(id) => id,
521 Err(e) => {
522 tracing::error!(
523 service = %self.name,
524 symbol = %symbol,
525 error = %e,
526 policy = ?self.ctx.bracket_failure_policy,
527 "bracket: stop-loss leg failed to place — entry has no protection"
528 );
529 self.handle_unprotected_entry(symbol, entry_side, size)
530 .await;
531 return;
532 }
533 };
534 let tp_id = match self.ctx.exchange.place_order(&tp_order).await {
535 Ok(id) => id,
536 Err(e) => {
537 tracing::warn!(
542 service = %self.name,
543 symbol = %symbol,
544 error = %e,
545 sl_id = %sl_id,
546 "bracket: take-profit leg failed; keeping the stop-loss \
547 (degraded to stop-only protection)"
548 );
549 if let Some(tracker) = &self.ctx.order_tracker {
550 tracker.record(sl_id.clone(), &sl_order).await;
551 }
552 self.orders_placed.fetch_add(1, Ordering::Relaxed);
553 return;
554 }
555 };
556
557 oco.register(symbol.clone(), sl_id.clone(), tp_id.clone())
558 .await;
559 if let Some(tracker) = &self.ctx.order_tracker {
560 tracker.record(sl_id.clone(), &sl_order).await;
561 tracker.record(tp_id.clone(), &tp_order).await;
562 }
563 self.orders_placed.fetch_add(2, Ordering::Relaxed);
564 tracing::info!(
565 service = %self.name,
566 symbol = %symbol,
567 close_side = ?close_side,
568 stop = sl.value(),
569 take_profit = tp.value(),
570 sl_id = %sl_id,
571 tp_id = %tp_id,
572 "bracket placed (SL + TP, OCO-linked)"
573 );
574 }
575
576 async fn handle_unprotected_entry(&self, symbol: &Symbol, entry_side: Side, size: Volume) {
579 use crate::bot::BracketFailurePolicy;
580 match self.ctx.bracket_failure_policy {
581 BracketFailurePolicy::CloseEntry => {
582 let close = Order::market(symbol.clone(), entry_side.opposite(), size)
583 .with_reduce_only(true);
584 match self.ctx.exchange.place_order(&close).await {
585 Ok(id) => {
586 self.orders_placed.fetch_add(1, Ordering::Relaxed);
587 tracing::warn!(
588 service = %self.name,
589 symbol = %symbol,
590 close_id = %id,
591 "bracket: closed the unprotected entry (BracketFailurePolicy::CloseEntry)"
592 );
593 }
594 Err(e) => {
595 tracing::error!(
596 service = %self.name,
597 symbol = %symbol,
598 error = %e,
599 "bracket: FAILED to close the unprotected entry — \
600 an unprotected position is resting on the exchange; \
601 manual intervention required"
602 );
603 }
604 }
605 }
606 BracketFailurePolicy::KeepUnprotected => {
607 tracing::error!(
608 service = %self.name,
609 symbol = %symbol,
610 "bracket: entry left open WITHOUT protection \
611 (BracketFailurePolicy::KeepUnprotected)"
612 );
613 }
614 }
615 }
616
617 fn attach_protection(&self, order: Order, symbol: &Symbol, decision: &Decision) -> Order {
631 let stop = match (decision.stop_price, decision.take_profit_price) {
632 (Some(sl), Some(_tp)) => {
633 tracing::warn!(
634 service = %self.name,
635 symbol = %symbol,
636 "both stop_price and take_profit_price set but brackets inactive \
637 (needs StopOrders + OrderTracking + a fill source); attaching \
638 stop-loss only"
639 );
640 StopAttachment::stop_market(sl)
641 }
642 (Some(sl), None) => StopAttachment::stop_market(sl),
643 (None, Some(tp)) => StopAttachment::take_profit(tp),
644 (None, None) => return order,
645 };
646
647 if self.ctx.exchange.supports(Capability::StopOrders) {
648 order.with_stop(stop)
649 } else {
650 tracing::warn!(
651 service = %self.name,
652 symbol = %symbol,
653 "protective stop / take-profit requested but adapter lacks \
654 Capability::StopOrders; placing order WITHOUT protection"
655 );
656 order
657 }
658 }
659}
660
661fn capability_for_kind(kind: OrderKind) -> Option<Capability> {
664 match kind {
665 OrderKind::Market | OrderKind::Limit => None,
666 OrderKind::PostOnly => Some(Capability::PostOnly),
667 OrderKind::Ioc => Some(Capability::Ioc),
668 OrderKind::Fok => Some(Capability::Fok),
669 }
670}
671
672#[async_trait]
673impl TradingService for ExecutionService {
674 fn name(&self) -> &str {
675 &self.name
676 }
677
678 fn restart_policy(&self) -> RestartPolicy {
679 RestartPolicy::OnFailure
680 }
681
682 async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
683 let mut rx = self.ctx.bus.subscribe();
684 tracing::info!(service = %self.name, "execution service subscribed");
685
686 loop {
687 tokio::select! {
688 _ = cancel.cancelled() => {
689 tracing::info!(
690 service = %self.name,
691 events = self.events_processed(),
692 dropped = self.events_dropped(),
693 placed = self.orders_placed(),
694 blocked = self.orders_blocked(),
695 "execution service shutting down"
696 );
697 return Ok(());
698 }
699 next = rx.recv() => match next {
700 Ok(event) => {
701 if let Err(e) = self.handle_event(&event).await {
702 tracing::error!(
703 service = %self.name,
704 error = %e,
705 "brain returned error from on_event — service continuing"
706 );
707 }
708 }
709 Err(RecvError::Lagged(skipped)) => {
710 self.events_dropped.fetch_add(skipped, Ordering::Relaxed);
711 tracing::warn!(
712 service = %self.name,
713 skipped,
714 "market data bus lagged — events dropped"
715 );
716 }
717 Err(RecvError::Closed) => {
718 tracing::info!(service = %self.name, "market data bus closed");
719 return Ok(());
720 }
721 },
722 }
723 }
724 }
725}
726
727fn price_from_event(event: &MarketDataEvent) -> Option<Price> {
730 match event {
731 MarketDataEvent::Candle { candle, .. } => Some(Price(candle.close)),
732 MarketDataEvent::Ticker { tick, .. } => Some(tick.mid_price()),
733 MarketDataEvent::Trade { price, .. } => Some(Price(*price)),
734 }
735}
736
737fn size_decision(sizing: &SizingConfig, hint: SizeHint, price: Price, contract_value: f64) -> u32 {
739 let sizer = PositionSizer::new(sizing.clone());
740 match hint {
741 SizeHint::Default => sizer.contracts(price.value(), contract_value),
742 SizeHint::MarginFraction(f) => {
743 let f = f.clamp(0.0, 1.0);
745 let margin = sizing.margin_per_trade * f;
746 sizer.contracts_with_margin(margin, price.value(), contract_value)
747 }
748 SizeHint::NotionalUsd(n) => {
749 let leverage = sizing.leverage.max(1);
751 let margin = n / f64::from(leverage);
752 sizer.contracts_with_margin(margin, price.value(), contract_value)
753 }
754 SizeHint::Quantity(q) => {
755 let raw = q.value().max(0.0).floor() as u32;
757 raw.min(sizing.max_contracts)
758 }
759 }
760}