rustrade/services.rs
1//! Optional framework-side services wired in via builder methods on
2//! [`Bot`](crate::Bot):
3//!
4//! - [`MarketFeedService`] — `Bot::with_market_source(...)`. Drives a
5//! [`MarketSource`] under supervisor control; the source publishes
6//! events to the in-process `MarketDataBus` (the bus reference is the
7//! source implementor's responsibility — typically obtained via
8//! `bot.market_data_bus().clone()` before construction).
9//! - [`FillRoutingService`] — `Bot::with_fill_source(...)`. Polls a
10//! [`FillSource`], calls [`Brain::on_fill`] on each brain, refreshes
11//! the per-symbol position cache from the exchange, and auto-feeds
12//! realised PnL into the risk gates using weighted-average entry
13//! accounting.
14//! - [`CandlePollerService`] — `Bot::with_candle_poller(...)`. Periodic
15//! poll of a [`CandleSource`]; publishes the newest closed candle for
16//! each `(symbol, interval)` pair to the market-data bus.
17
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::time::Duration;
21
22use async_trait::async_trait;
23use rustrade_core::{
24 Brain, CandleSource, Exchange, ExchangeClient, Fill, FillSource, MarketDataBus,
25 MarketDataEvent, MarketSource, MetricsSink, Side, Symbol,
26};
27use rustrade_supervisor::{RestartPolicy, TradingService};
28use tokio_util::sync::CancellationToken;
29
30use crate::pending::PendingEntryLedger;
31use crate::risk_state::{PositionCache, RiskPersister, RiskStateMap};
32
33// ───────────────────────────────────────────────────────────────────────
34// MarketFeedService
35// ───────────────────────────────────────────────────────────────────────
36
37/// Drives a [`MarketSource`] under supervisor control.
38///
39/// The wrapper does not interact with the bus directly — the source's
40/// `run` method is expected to publish events to whatever bus it was
41/// constructed with. This service just makes the source restartable and
42/// drop-safe under the supervisor's cancellation contract.
43pub struct MarketFeedService {
44 name: String,
45 source: Arc<dyn MarketSource>,
46}
47
48impl MarketFeedService {
49 /// Wrap a [`MarketSource`] into a [`TradingService`].
50 pub fn new(source: Arc<dyn MarketSource>) -> Self {
51 let name = format!("market-feed[{}]", source.name());
52 Self { name, source }
53 }
54}
55
56#[async_trait]
57impl TradingService for MarketFeedService {
58 fn name(&self) -> &str {
59 &self.name
60 }
61
62 fn restart_policy(&self) -> RestartPolicy {
63 RestartPolicy::OnFailure
64 }
65
66 async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
67 tracing::info!(service = %self.name, "market feed starting");
68 tokio::select! {
69 _ = cancel.cancelled() => {
70 tracing::info!(service = %self.name, "market feed cancelled");
71 Ok(())
72 }
73 r = self.source.run() => {
74 match &r {
75 Ok(()) => tracing::info!(service = %self.name, "market feed exited cleanly"),
76 Err(e) => tracing::warn!(service = %self.name, error = %e, "market feed exited with error"),
77 }
78 r.map_err(|e| anyhow::anyhow!("market source error: {e}"))
79 }
80 }
81 }
82}
83
84// ───────────────────────────────────────────────────────────────────────
85// FillRoutingService
86// ───────────────────────────────────────────────────────────────────────
87
88/// Routes fills from a [`FillSource`] to every brain, refreshes the
89/// position cache, and auto-feeds realised PnL into the risk state.
90///
91/// # PnL accounting
92///
93/// The service uses a **weighted-average entry** model (the same model
94/// the backtest engine uses). It reads the cached `Position` *before*
95/// refreshing it from the exchange, so the `entry_price` available is
96/// the pre-fill average. From that:
97///
98/// - A fill in the same direction as the open position **adds** to it.
99/// No realised PnL emitted; the post-refresh average from
100/// `exchange.get_position` becomes the new entry.
101/// - A fill in the opposite direction **reduces** the position. Gross
102/// PnL = `(fill_price - entry) * closed_qty * direction`. The
103/// service calls `BotHandle::record_trade_outcome` on the closed
104/// portion to feed `SessionPnl` + `CircuitBreaker`.
105/// - A fill that **flips** the position emits realised PnL for the
106/// closed portion only; the opening leg is left for the next
107/// reducing fill.
108///
109/// Fees come from `Fill.fee`. Hosts that need a different accounting
110/// model (FIFO, LIFO, tax-lot) should compute PnL themselves and call
111/// `BotHandle::record_trade_outcome` directly — but cannot also wire a
112/// `FillRoutingService`, since the two would double-count.
113pub struct FillRoutingService {
114 source: Arc<dyn FillSource>,
115 brains: Arc<Vec<Arc<dyn Brain>>>,
116 exchange: Arc<dyn ExchangeClient>,
117 positions: PositionCache,
118 risk: RiskStateMap,
119 metrics: Arc<dyn MetricsSink>,
120 persister: Option<RiskPersister>,
121 oco: Option<crate::order_tracker::OcoRegistry>,
122 pending: PendingEntryLedger,
123 fills_routed: AtomicU64,
124 refresh_errors: AtomicU64,
125 trades_recorded: AtomicU64,
126 oco_cancels: AtomicU64,
127}
128
129impl FillRoutingService {
130 #[allow(clippy::too_many_arguments)]
131 pub(crate) fn new(
132 source: Arc<dyn FillSource>,
133 brains: Arc<Vec<Arc<dyn Brain>>>,
134 exchange: Arc<dyn ExchangeClient>,
135 positions: PositionCache,
136 risk: RiskStateMap,
137 metrics: Arc<dyn MetricsSink>,
138 persister: Option<RiskPersister>,
139 oco: Option<crate::order_tracker::OcoRegistry>,
140 pending: PendingEntryLedger,
141 ) -> Self {
142 Self {
143 source,
144 brains,
145 exchange,
146 positions,
147 risk,
148 metrics,
149 persister,
150 oco,
151 pending,
152 fills_routed: AtomicU64::new(0),
153 refresh_errors: AtomicU64::new(0),
154 trades_recorded: AtomicU64::new(0),
155 oco_cancels: AtomicU64::new(0),
156 }
157 }
158
159 /// Total OCO siblings cancelled in response to a bracket leg filling.
160 pub fn oco_cancels(&self) -> u64 {
161 self.oco_cancels.load(Ordering::Relaxed)
162 }
163
164 /// Total fills delivered to brains since service start.
165 pub fn fills_routed(&self) -> u64 {
166 self.fills_routed.load(Ordering::Relaxed)
167 }
168
169 /// Total `exchange.get_position` failures during cache refresh.
170 pub fn refresh_errors(&self) -> u64 {
171 self.refresh_errors.load(Ordering::Relaxed)
172 }
173
174 /// Total realised-PnL closures fed into the risk state.
175 pub fn trades_recorded(&self) -> u64 {
176 self.trades_recorded.load(Ordering::Relaxed)
177 }
178
179 /// Compute realised PnL from a reducing fill and feed the risk state.
180 /// Returns the gross PnL portion attributable to this fill.
181 async fn maybe_record_pnl(&self, fill: &Fill, prior_qty: f64, prior_entry: Option<f64>) {
182 // Only reducing or flipping fills produce realised PnL.
183 let signed_fill_qty = match fill.side {
184 Side::Buy => fill.size.value(),
185 Side::Sell => -fill.size.value(),
186 };
187 if prior_qty == 0.0 || prior_qty.signum() == signed_fill_qty.signum() {
188 return;
189 }
190 let Some(entry) = prior_entry else {
191 // Reducing fill but no entry price recorded — can't compute
192 // PnL. Log and skip.
193 tracing::debug!(
194 symbol = %fill.symbol,
195 "reducing fill but cached position has no entry price; skipping auto-PnL"
196 );
197 return;
198 };
199 let closed_qty = prior_qty.abs().min(fill.size.value());
200 if closed_qty <= 0.0 {
201 return;
202 }
203 let direction = prior_qty.signum();
204 let gross = (fill.price.value() - entry) * direction * closed_qty;
205 // Apportion fee by closing fraction so a flip fill charges
206 // fees pro-rata to the closing portion.
207 let fee_share = if fill.size.value() > 0.0 {
208 fill.fee * (closed_qty / fill.size.value())
209 } else {
210 0.0
211 };
212
213 // The fill itself is validated at ingestion, but `entry` comes from
214 // the position cache (ultimately `exchange.get_position`) and could
215 // still be non-finite. A NaN fed into `record_close` would disable
216 // the loss-limit gate, so refuse it here.
217 if !gross.is_finite() || !fee_share.is_finite() {
218 tracing::error!(
219 symbol = %fill.symbol,
220 gross,
221 fee_share,
222 entry,
223 "auto-PnL: computed non-finite realised PnL — NOT recorded \
224 (risk gates unchanged)"
225 );
226 return;
227 }
228
229 // Update the per-symbol risk state directly.
230 let recorded = {
231 let mut map = self.risk.write().await;
232 if let Some(risk) = map.get_mut(&fill.symbol) {
233 risk.session_pnl.record_close(gross, fee_share);
234 let net = gross - fee_share;
235 if net > 0.0 {
236 risk.circuit_breaker.record_win();
237 } else if net < 0.0 {
238 risk.circuit_breaker.record_loss();
239 }
240 self.trades_recorded.fetch_add(1, Ordering::Relaxed);
241 self.metrics.histogram(
242 "rustrade_realised_pnl_quote",
243 &[("symbol", fill.symbol.as_str())],
244 net,
245 );
246 true
247 } else {
248 // A fill the risk layer never sees: its losses won't count
249 // toward the session PnL halt or the circuit breaker. Loud
250 // by design — this usually means a symbol was traded that
251 // isn't in `BotConfig.symbols`.
252 self.metrics.counter(
253 "rustrade_unrecorded_fills_total",
254 &[("symbol", fill.symbol.as_str())],
255 1,
256 );
257 tracing::warn!(
258 symbol = %fill.symbol,
259 "auto-PnL: fill for a symbol not in the risk-state map — \
260 realised PnL NOT recorded by any risk gate \
261 (is it missing from BotConfig.symbols?)"
262 );
263 false
264 }
265 };
266
267 // Persist the updated risk state (lock released) if a store is wired.
268 if recorded && let Some(persister) = &self.persister {
269 persister.persist_symbol(&self.risk, &fill.symbol).await;
270 }
271 }
272}
273
274#[async_trait]
275impl TradingService for FillRoutingService {
276 fn name(&self) -> &str {
277 "fill-routing"
278 }
279
280 fn restart_policy(&self) -> RestartPolicy {
281 RestartPolicy::OnFailure
282 }
283
284 async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
285 tracing::info!("fill-routing service starting");
286 loop {
287 tokio::select! {
288 _ = cancel.cancelled() => {
289 tracing::info!(
290 routed = self.fills_routed(),
291 refresh_errors = self.refresh_errors(),
292 trades_recorded = self.trades_recorded(),
293 "fill-routing service shutting down"
294 );
295 return Ok(());
296 }
297 next = self.source.next_fill() => {
298 let Some(fill) = next else {
299 tracing::info!("fill source closed; exiting");
300 return Ok(());
301 };
302
303 // Ingestion-boundary validation: a non-finite price /
304 // size / fee would poison the weighted-average PnL and
305 // — because every NaN comparison is false — silently
306 // disable the session-PnL halt. Drop the fill entirely
307 // (not routed, not recorded) and say so loudly.
308 if !fill_is_finite(&fill) {
309 self.metrics.counter(
310 "rustrade_invalid_fills_total",
311 &[("symbol", fill.symbol.as_str())],
312 1,
313 );
314 tracing::error!(
315 symbol = %fill.symbol,
316 order_id = %fill.order_id,
317 price = fill.price.value(),
318 size = fill.size.value(),
319 fee = fill.fee,
320 "fill source produced a non-finite fill — dropped \
321 (not routed to brains, not recorded in risk state)"
322 );
323 continue;
324 }
325
326 let symbol = fill.symbol.clone();
327
328 // OCO: if this fill belongs to a bracket leg, cancel its
329 // sibling so the position isn't closed twice.
330 if let Some(oco) = &self.oco
331 && let Some((sym, sibling)) = oco.take_sibling(&fill.order_id).await
332 {
333 match self.exchange.cancel_order(&sym, &sibling).await {
334 Ok(_) => {
335 self.oco_cancels.fetch_add(1, Ordering::Relaxed);
336 self.metrics.inc("rustrade_oco_cancels_total");
337 tracing::info!(symbol = %sym, filled = %fill.order_id, cancelled = %sibling, "OCO: cancelled sibling after bracket leg filled");
338 }
339 Err(e) => tracing::warn!(symbol = %sym, sibling = %sibling, error = %e, "OCO: failed to cancel sibling (it may already be gone)"),
340 }
341 }
342
343 // Snapshot the pre-fill position so we can compute
344 // realised PnL before the exchange refreshes the
345 // entry price.
346 let (prior_qty, prior_entry) = {
347 let map = self.positions.read().await;
348 let p = map.get(&symbol).copied().unwrap_or(rustrade_core::Position::FLAT);
349 (p.qty, p.entry_price)
350 };
351
352 // Route to every brain. Errors are logged but don't
353 // stop the service — the brain's on_fill is
354 // informational by contract.
355 for brain in self.brains.iter() {
356 if let Err(e) = brain.on_fill(&fill).await {
357 tracing::warn!(
358 brain = brain.name(),
359 error = %e,
360 "brain on_fill returned error"
361 );
362 }
363 }
364
365 self.maybe_record_pnl(&fill, prior_qty, prior_entry).await;
366
367 // Refresh position cache from the exchange.
368 match self.exchange.get_position(&symbol).await {
369 Ok(p) if p.qty.is_finite()
370 && p.entry_price.is_none_or(f64::is_finite) =>
371 {
372 self.positions.write().await.insert(symbol.clone(), p);
373 // The position is visible in the cache now, so
374 // the portfolio gate no longer needs the
375 // pending-entry reservation for this symbol.
376 self.pending.release(&symbol).await;
377 tracing::debug!(symbol = %symbol, qty = p.qty, "refreshed position");
378 }
379 Ok(p) => {
380 // A non-finite qty/entry would poison every
381 // PnL computed from the cache — keep the old
382 // snapshot instead.
383 self.refresh_errors.fetch_add(1, Ordering::Relaxed);
384 self.metrics.inc("rustrade_position_refresh_errors_total");
385 tracing::error!(
386 symbol = %symbol,
387 qty = p.qty,
388 entry = ?p.entry_price,
389 "exchange returned a non-finite position — cache NOT updated"
390 );
391 }
392 Err(e) => {
393 self.refresh_errors.fetch_add(1, Ordering::Relaxed);
394 self.metrics.inc("rustrade_position_refresh_errors_total");
395 tracing::warn!(
396 symbol = %symbol,
397 error = %e,
398 "failed to refresh position after fill"
399 );
400 }
401 }
402
403 self.fills_routed.fetch_add(1, Ordering::Relaxed);
404 self.metrics.counter(
405 "rustrade_fills_routed_total",
406 &[("symbol", symbol.as_str())],
407 1,
408 );
409 }
410 }
411 }
412 }
413}
414
415/// Every numeric field a [`Fill`] carries must be finite (and the size
416/// non-negative) before the framework will route or record it.
417fn fill_is_finite(f: &Fill) -> bool {
418 f.price.value().is_finite()
419 && f.size.value().is_finite()
420 && f.size.value() >= 0.0
421 && f.fee.is_finite()
422}
423
424// ───────────────────────────────────────────────────────────────────────
425// CandlePollerService
426// ───────────────────────────────────────────────────────────────────────
427
428/// Periodic poll of a [`CandleSource`] for a single `(symbol, interval)`
429/// pair. Publishes each newly-closed candle to the
430/// [`MarketDataBus`].
431///
432/// Per-symbol cadences are achieved by spawning multiple services —
433/// `Bot::with_candle_poller(...)` accepts repeated calls and spawns one
434/// service per registered tuple.
435///
436/// # Deduplication
437///
438/// The service tracks the highest `Candle::time` it has already
439/// published; only candles with a strictly greater timestamp are
440/// re-published. This is robust against exchanges that return overlapping
441/// windows on consecutive polls.
442pub struct CandlePollerService {
443 name: String,
444 source: Arc<dyn CandleSource>,
445 symbol: Symbol,
446 interval: Duration,
447 poll_cadence: Duration,
448 limit: usize,
449 bus: MarketDataBus,
450 metrics: Arc<dyn MetricsSink>,
451 last_time: std::sync::Mutex<i64>,
452 polled: AtomicU64,
453 poll_errors: AtomicU64,
454 published: AtomicU64,
455}
456
457impl CandlePollerService {
458 pub(crate) fn new(
459 source: Arc<dyn CandleSource>,
460 symbol: Symbol,
461 interval: Duration,
462 poll_cadence: Duration,
463 limit: usize,
464 bus: MarketDataBus,
465 metrics: Arc<dyn MetricsSink>,
466 ) -> Self {
467 let name = format!("candle-poller[{}@{}s]", symbol.as_str(), interval.as_secs());
468 Self {
469 name,
470 source,
471 symbol,
472 interval,
473 poll_cadence,
474 limit,
475 bus,
476 metrics,
477 last_time: std::sync::Mutex::new(i64::MIN),
478 polled: AtomicU64::new(0),
479 poll_errors: AtomicU64::new(0),
480 published: AtomicU64::new(0),
481 }
482 }
483
484 /// Total successful polls.
485 pub fn polled(&self) -> u64 {
486 self.polled.load(Ordering::Relaxed)
487 }
488 /// Total failed polls.
489 pub fn poll_errors(&self) -> u64 {
490 self.poll_errors.load(Ordering::Relaxed)
491 }
492 /// Total candles published (deduplicated).
493 pub fn published(&self) -> u64 {
494 self.published.load(Ordering::Relaxed)
495 }
496}
497
498#[async_trait]
499impl TradingService for CandlePollerService {
500 fn name(&self) -> &str {
501 &self.name
502 }
503
504 fn restart_policy(&self) -> RestartPolicy {
505 RestartPolicy::OnFailure
506 }
507
508 async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
509 tracing::info!(service = %self.name, "candle poller starting");
510 let exchange = Exchange::from(self.source.name());
511
512 loop {
513 tokio::select! {
514 _ = cancel.cancelled() => {
515 tracing::info!(
516 service = %self.name,
517 polled = self.polled(),
518 published = self.published(),
519 errors = self.poll_errors(),
520 "candle poller shutting down"
521 );
522 return Ok(());
523 }
524 _ = tokio::time::sleep(self.poll_cadence) => {
525 match self.source.poll(&self.symbol, self.interval, self.limit).await {
526 Ok(candles) => {
527 self.polled.fetch_add(1, Ordering::Relaxed);
528 let mut last = self.last_time.lock().expect("last_time poisoned");
529 let mut new_high = *last;
530 for candle in candles {
531 if candle.time <= *last {
532 continue;
533 }
534 new_high = new_high.max(candle.time);
535 self.bus.publish(MarketDataEvent::Candle {
536 exchange: exchange.clone(),
537 symbol: self.symbol.clone(),
538 candle,
539 });
540 self.published.fetch_add(1, Ordering::Relaxed);
541 self.metrics.counter(
542 "rustrade_candles_published_total",
543 &[("symbol", self.symbol.as_str())],
544 1,
545 );
546 }
547 *last = new_high;
548 }
549 Err(e) => {
550 self.poll_errors.fetch_add(1, Ordering::Relaxed);
551 self.metrics.inc("rustrade_candle_poll_errors_total");
552 tracing::warn!(
553 service = %self.name,
554 error = %e,
555 "candle poll failed"
556 );
557 }
558 }
559 }
560 }
561 }
562 }
563}