1use std::sync::Arc;
20use std::sync::atomic::{AtomicU64, Ordering};
21
22use async_trait::async_trait;
23use chrono::Utc;
24use rustrade_core::{
25 Brain, Capability, Decision, ExchangeClient, MarketDataBus, MarketDataEvent, Order, OrderKind,
26 Position, Price, Side, Signal, SignalBus, SignalType, SizeHint, StopAttachment, Symbol, Volume,
27};
28use rustrade_risk::{PositionSizer, SizingConfig};
29use rustrade_supervisor::{RestartPolicy, TradingService};
30use tokio::sync::broadcast::error::RecvError;
31use tokio_util::sync::CancellationToken;
32
33use crate::risk_state::{PositionCache, RiskStateMap};
34
35#[derive(Clone)]
40pub(crate) struct ExecutionContext {
41 pub exchange: Arc<dyn ExchangeClient>,
42 pub bus: MarketDataBus,
43 pub signals: SignalBus,
44 pub positions: PositionCache,
45 pub risk: RiskStateMap,
46 pub sizing: Arc<SizingConfig>,
47 pub order_tracker: Option<crate::order_tracker::OrderTracker>,
51}
52
53pub struct ExecutionService {
55 name: String,
56 brain: Arc<dyn Brain>,
57 ctx: ExecutionContext,
58 events_processed: AtomicU64,
59 events_dropped: AtomicU64,
60 orders_placed: AtomicU64,
61 orders_blocked: AtomicU64,
62}
63
64impl ExecutionService {
65 pub(crate) fn new(brain: Arc<dyn Brain>, ctx: ExecutionContext) -> Self {
66 let name = format!("execution[{}]", brain.name());
67 Self {
68 name,
69 brain,
70 ctx,
71 events_processed: AtomicU64::new(0),
72 events_dropped: AtomicU64::new(0),
73 orders_placed: AtomicU64::new(0),
74 orders_blocked: AtomicU64::new(0),
75 }
76 }
77
78 pub fn events_processed(&self) -> u64 {
80 self.events_processed.load(Ordering::Relaxed)
81 }
82 pub fn events_dropped(&self) -> u64 {
84 self.events_dropped.load(Ordering::Relaxed)
85 }
86 pub fn orders_placed(&self) -> u64 {
88 self.orders_placed.load(Ordering::Relaxed)
89 }
90 pub fn orders_blocked(&self) -> u64 {
92 self.orders_blocked.load(Ordering::Relaxed)
93 }
94
95 async fn position_for(&self, symbol: &Symbol) -> Position {
96 self.ctx
97 .positions
98 .read()
99 .await
100 .get(symbol)
101 .copied()
102 .unwrap_or(Position::FLAT)
103 }
104
105 async fn handle_event(&self, event: &MarketDataEvent) -> anyhow::Result<()> {
106 let symbol = event.symbol().clone();
107 let position = self.position_for(&symbol).await;
108
109 let decision = self.brain.on_event(event, &position).await?;
110 self.events_processed.fetch_add(1, Ordering::Relaxed);
111
112 let signal = decision.signal;
113 if matches!(signal, SignalType::Hold) {
114 return Ok(());
115 }
116
117 let published = self.ctx.signals.publish(Signal {
121 id: format!("{}-{}", self.brain.name(), self.events_processed()),
122 symbol: symbol.as_str().to_string(),
123 kind: signal,
124 confidence: decision.confidence,
125 timestamp: Utc::now(),
126 source: self.brain.name().to_string(),
127 metadata: decision.metadata.clone(),
128 });
129 let _ = published;
130
131 if let Some(risk) = self.ctx.risk.read().await.get(&symbol)
133 && risk.session_pnl.is_session_halted()
134 {
135 self.orders_blocked.fetch_add(1, Ordering::Relaxed);
136 tracing::warn!(
137 service = %self.name,
138 symbol = %symbol,
139 signal = %signal,
140 "decision blocked: session PnL halted"
141 );
142 return Ok(());
143 }
144
145 if let Some(risk) = self.ctx.risk.read().await.get(&symbol)
147 && risk.circuit_breaker.is_tripped()
148 {
149 self.orders_blocked.fetch_add(1, Ordering::Relaxed);
150 tracing::warn!(
151 service = %self.name,
152 symbol = %symbol,
153 signal = %signal,
154 cooldown_secs = ?risk.circuit_breaker.cooldown_remaining(),
155 "decision blocked: circuit breaker tripped"
156 );
157 return Ok(());
158 }
159
160 let order = match self.build_order(event, &symbol, &position, &decision).await {
162 Some(o) => o,
163 None => return Ok(()),
164 };
165
166 match self.ctx.exchange.place_order(&order).await {
168 Ok(id) => {
169 self.orders_placed.fetch_add(1, Ordering::Relaxed);
170 if let Some(tracker) = &self.ctx.order_tracker {
173 tracker.record(id.clone(), &order).await;
174 }
175 tracing::info!(
176 service = %self.name,
177 symbol = %symbol,
178 side = ?order.side,
179 size = %order.size,
180 reduce_only = order.reduce_only,
181 order_id = %id,
182 "order placed"
183 );
184 }
185 Err(e) => {
186 tracing::error!(
187 service = %self.name,
188 symbol = %symbol,
189 error = %e,
190 "exchange rejected order — risk state unchanged"
191 );
192 }
193 }
194 Ok(())
195 }
196
197 async fn build_order(
198 &self,
199 event: &MarketDataEvent,
200 symbol: &Symbol,
201 position: &Position,
202 decision: &Decision,
203 ) -> Option<Order> {
204 match decision.signal {
205 SignalType::Hold => None,
206 SignalType::Close => {
207 let Some(close_side) = position.close_side() else {
209 tracing::debug!(
210 service = %self.name,
211 symbol = %symbol,
212 "decision=Close but position is flat — nothing to do"
213 );
214 return None;
215 };
216 let size = Volume(position.qty.abs());
217 Some(Order::market(symbol.clone(), close_side, size).with_reduce_only(true))
218 }
219 SignalType::Buy | SignalType::Sell => {
220 let side = if matches!(decision.signal, SignalType::Buy) {
221 Side::Buy
222 } else {
223 Side::Sell
224 };
225 let price = price_from_event(event)?;
226 let contract_value = self.ctx.exchange.contract_value(symbol);
227 let contracts =
228 size_decision(&self.ctx.sizing, decision.size_hint, price, contract_value);
229
230 if contracts == 0 {
231 self.orders_blocked.fetch_add(1, Ordering::Relaxed);
232 tracing::warn!(
233 service = %self.name,
234 symbol = %symbol,
235 signal = %decision.signal,
236 price = price.value(),
237 contract_value,
238 "decision blocked: sizer returned 0 contracts"
239 );
240 return None;
241 }
242 let size = Volume(contracts as f64);
243
244 let kind = decision.order_kind;
249 if let Some(cap) = capability_for_kind(kind)
250 && !self.ctx.exchange.supports(cap)
251 {
252 self.orders_blocked.fetch_add(1, Ordering::Relaxed);
253 tracing::warn!(
254 service = %self.name,
255 symbol = %symbol,
256 signal = %decision.signal,
257 ?kind,
258 required = ?cap,
259 "decision blocked: adapter does not support requested order kind"
260 );
261 return None;
262 }
263
264 let order = match kind {
266 OrderKind::Market => Order::market(symbol.clone(), side, size),
267 OrderKind::Limit | OrderKind::PostOnly | OrderKind::Ioc | OrderKind::Fok => {
268 let limit = decision.limit_price.unwrap_or_else(|| {
269 tracing::warn!(
270 service = %self.name,
271 symbol = %symbol,
272 ?kind,
273 fallback = price.value(),
274 "non-market order kind without limit_price; \
275 falling back to event price"
276 );
277 price
278 });
279 let mut o = Order::limit(symbol.clone(), side, size, limit);
280 o.kind = kind;
281 o
282 }
283 };
284
285 Some(self.attach_protection(order, symbol, decision))
287 }
288 }
289 }
290
291 fn attach_protection(&self, order: Order, symbol: &Symbol, decision: &Decision) -> Order {
304 let stop = match (decision.stop_price, decision.take_profit_price) {
305 (Some(sl), Some(_tp)) => {
306 tracing::warn!(
307 service = %self.name,
308 symbol = %symbol,
309 "both stop_price and take_profit_price set; attaching stop-loss only \
310 (bracket / OCO awaits the order tracker)"
311 );
312 StopAttachment::stop_market(sl)
313 }
314 (Some(sl), None) => StopAttachment::stop_market(sl),
315 (None, Some(tp)) => StopAttachment::take_profit(tp),
316 (None, None) => return order,
317 };
318
319 if self.ctx.exchange.supports(Capability::StopOrders) {
320 order.with_stop(stop)
321 } else {
322 tracing::warn!(
323 service = %self.name,
324 symbol = %symbol,
325 "protective stop / take-profit requested but adapter lacks \
326 Capability::StopOrders; placing order WITHOUT protection"
327 );
328 order
329 }
330 }
331}
332
333fn capability_for_kind(kind: OrderKind) -> Option<Capability> {
336 match kind {
337 OrderKind::Market | OrderKind::Limit => None,
338 OrderKind::PostOnly => Some(Capability::PostOnly),
339 OrderKind::Ioc => Some(Capability::Ioc),
340 OrderKind::Fok => Some(Capability::Fok),
341 }
342}
343
344#[async_trait]
345impl TradingService for ExecutionService {
346 fn name(&self) -> &str {
347 &self.name
348 }
349
350 fn restart_policy(&self) -> RestartPolicy {
351 RestartPolicy::OnFailure
352 }
353
354 async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
355 let mut rx = self.ctx.bus.subscribe();
356 tracing::info!(service = %self.name, "execution service subscribed");
357
358 loop {
359 tokio::select! {
360 _ = cancel.cancelled() => {
361 tracing::info!(
362 service = %self.name,
363 events = self.events_processed(),
364 dropped = self.events_dropped(),
365 placed = self.orders_placed(),
366 blocked = self.orders_blocked(),
367 "execution service shutting down"
368 );
369 return Ok(());
370 }
371 next = rx.recv() => match next {
372 Ok(event) => {
373 if let Err(e) = self.handle_event(&event).await {
374 tracing::error!(
375 service = %self.name,
376 error = %e,
377 "brain returned error from on_event — service continuing"
378 );
379 }
380 }
381 Err(RecvError::Lagged(skipped)) => {
382 self.events_dropped.fetch_add(skipped, Ordering::Relaxed);
383 tracing::warn!(
384 service = %self.name,
385 skipped,
386 "market data bus lagged — events dropped"
387 );
388 }
389 Err(RecvError::Closed) => {
390 tracing::info!(service = %self.name, "market data bus closed");
391 return Ok(());
392 }
393 },
394 }
395 }
396 }
397}
398
399fn price_from_event(event: &MarketDataEvent) -> Option<Price> {
402 match event {
403 MarketDataEvent::Candle { candle, .. } => Some(Price(candle.close)),
404 MarketDataEvent::Ticker { tick, .. } => Some(tick.mid_price()),
405 MarketDataEvent::Trade { price, .. } => Some(Price(*price)),
406 }
407}
408
409fn size_decision(sizing: &SizingConfig, hint: SizeHint, price: Price, contract_value: f64) -> u32 {
411 let sizer = PositionSizer::new(sizing.clone());
412 match hint {
413 SizeHint::Default => sizer.contracts(price.value(), contract_value),
414 SizeHint::MarginFraction(f) => {
415 let f = f.clamp(0.0, 1.0);
417 let margin = sizing.margin_per_trade * f;
418 sizer.contracts_with_margin(margin, price.value(), contract_value)
419 }
420 SizeHint::NotionalUsd(n) => {
421 let leverage = sizing.leverage.max(1);
423 let margin = n / f64::from(leverage);
424 sizer.contracts_with_margin(margin, price.value(), contract_value)
425 }
426 SizeHint::Quantity(q) => {
427 let raw = q.value().max(0.0).floor() as u32;
429 raw.min(sizing.max_contracts)
430 }
431 }
432}