Skip to main content

rustrade/
services.rs

1//! Optional framework-side services wired in via builder methods on
2//! [`Bot`](crate::Bot):
3//!
4//! - [`MarketFeedService`] — `Bot::with_market_source(...)`. Drives a
5//!   [`MarketSource`] under supervisor control; the source publishes
6//!   events to the in-process `MarketDataBus` (the bus reference is the
7//!   source implementor's responsibility — typically obtained via
8//!   `bot.market_data_bus().clone()` before construction).
9//! - [`FillRoutingService`] — `Bot::with_fill_source(...)`. Polls a
10//!   [`FillSource`], calls [`Brain::on_fill`] on each brain, refreshes
11//!   the per-symbol position cache from the exchange, and auto-feeds
12//!   realised PnL into the risk gates using weighted-average entry
13//!   accounting.
14//! - [`CandlePollerService`] — `Bot::with_candle_poller(...)`. Periodic
15//!   poll of a [`CandleSource`]; publishes the newest closed candle for
16//!   each `(symbol, interval)` pair to the market-data bus.
17
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::time::Duration;
21
22use async_trait::async_trait;
23use rustrade_core::{
24    Brain, CandleSource, Exchange, ExchangeClient, Fill, FillSource, MarketDataBus,
25    MarketDataEvent, MarketSource, MetricsSink, Side, Symbol,
26};
27use rustrade_supervisor::{RestartPolicy, TradingService};
28use tokio_util::sync::CancellationToken;
29
30use crate::pending::PendingEntryLedger;
31use crate::risk_state::{PositionCache, RiskPersister, RiskStateMap};
32
33// ───────────────────────────────────────────────────────────────────────
34// MarketFeedService
35// ───────────────────────────────────────────────────────────────────────
36
37/// Drives a [`MarketSource`] under supervisor control.
38///
39/// The wrapper does not interact with the bus directly — the source's
40/// `run` method is expected to publish events to whatever bus it was
41/// constructed with. This service just makes the source restartable and
42/// drop-safe under the supervisor's cancellation contract.
43pub struct MarketFeedService {
44    name: String,
45    source: Arc<dyn MarketSource>,
46}
47
48impl MarketFeedService {
49    /// Wrap a [`MarketSource`] into a [`TradingService`].
50    pub fn new(source: Arc<dyn MarketSource>) -> Self {
51        let name = format!("market-feed[{}]", source.name());
52        Self { name, source }
53    }
54}
55
56#[async_trait]
57impl TradingService for MarketFeedService {
58    fn name(&self) -> &str {
59        &self.name
60    }
61
62    fn restart_policy(&self) -> RestartPolicy {
63        RestartPolicy::OnFailure
64    }
65
66    async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
67        tracing::info!(service = %self.name, "market feed starting");
68        tokio::select! {
69            _ = cancel.cancelled() => {
70                tracing::info!(service = %self.name, "market feed cancelled");
71                Ok(())
72            }
73            r = self.source.run() => {
74                match &r {
75                    Ok(()) => tracing::info!(service = %self.name, "market feed exited cleanly"),
76                    Err(e) => tracing::warn!(service = %self.name, error = %e, "market feed exited with error"),
77                }
78                r.map_err(|e| anyhow::anyhow!("market source error: {e}"))
79            }
80        }
81    }
82}
83
84// ───────────────────────────────────────────────────────────────────────
85// FillRoutingService
86// ───────────────────────────────────────────────────────────────────────
87
88/// Routes fills from a [`FillSource`] to every brain, refreshes the
89/// position cache, and auto-feeds realised PnL into the risk state.
90///
91/// # PnL accounting
92///
93/// The service uses a **weighted-average entry** model (the same model
94/// the backtest engine uses). It reads the cached `Position` *before*
95/// refreshing it from the exchange, so the `entry_price` available is
96/// the pre-fill average. From that:
97///
98/// - A fill in the same direction as the open position **adds** to it.
99///   No realised PnL emitted; the post-refresh average from
100///   `exchange.get_position` becomes the new entry.
101/// - A fill in the opposite direction **reduces** the position. Gross
102///   PnL = `(fill_price - entry) * closed_qty * direction`. The
103///   service calls `BotHandle::record_trade_outcome` on the closed
104///   portion to feed `SessionPnl` + `CircuitBreaker`.
105/// - A fill that **flips** the position emits realised PnL for the
106///   closed portion only; the opening leg is left for the next
107///   reducing fill.
108///
109/// Fees come from `Fill.fee`. Hosts that need a different accounting
110/// model (FIFO, LIFO, tax-lot) should compute PnL themselves and call
111/// `BotHandle::record_trade_outcome` directly — but cannot also wire a
112/// `FillRoutingService`, since the two would double-count.
113pub struct FillRoutingService {
114    source: Arc<dyn FillSource>,
115    brains: Arc<Vec<Arc<dyn Brain>>>,
116    exchange: Arc<dyn ExchangeClient>,
117    positions: PositionCache,
118    risk: RiskStateMap,
119    metrics: Arc<dyn MetricsSink>,
120    persister: Option<RiskPersister>,
121    oco: Option<crate::order_tracker::OcoRegistry>,
122    pending: PendingEntryLedger,
123    fills_routed: AtomicU64,
124    refresh_errors: AtomicU64,
125    trades_recorded: AtomicU64,
126    oco_cancels: AtomicU64,
127}
128
129impl FillRoutingService {
130    #[allow(clippy::too_many_arguments)]
131    pub(crate) fn new(
132        source: Arc<dyn FillSource>,
133        brains: Arc<Vec<Arc<dyn Brain>>>,
134        exchange: Arc<dyn ExchangeClient>,
135        positions: PositionCache,
136        risk: RiskStateMap,
137        metrics: Arc<dyn MetricsSink>,
138        persister: Option<RiskPersister>,
139        oco: Option<crate::order_tracker::OcoRegistry>,
140        pending: PendingEntryLedger,
141    ) -> Self {
142        Self {
143            source,
144            brains,
145            exchange,
146            positions,
147            risk,
148            metrics,
149            persister,
150            oco,
151            pending,
152            fills_routed: AtomicU64::new(0),
153            refresh_errors: AtomicU64::new(0),
154            trades_recorded: AtomicU64::new(0),
155            oco_cancels: AtomicU64::new(0),
156        }
157    }
158
159    /// Total OCO siblings cancelled in response to a bracket leg filling.
160    pub fn oco_cancels(&self) -> u64 {
161        self.oco_cancels.load(Ordering::Relaxed)
162    }
163
164    /// Total fills delivered to brains since service start.
165    pub fn fills_routed(&self) -> u64 {
166        self.fills_routed.load(Ordering::Relaxed)
167    }
168
169    /// Total `exchange.get_position` failures during cache refresh.
170    pub fn refresh_errors(&self) -> u64 {
171        self.refresh_errors.load(Ordering::Relaxed)
172    }
173
174    /// Total realised-PnL closures fed into the risk state.
175    pub fn trades_recorded(&self) -> u64 {
176        self.trades_recorded.load(Ordering::Relaxed)
177    }
178
179    /// Compute realised PnL from a reducing fill and feed the risk state.
180    /// Returns the gross PnL portion attributable to this fill.
181    async fn maybe_record_pnl(&self, fill: &Fill, prior_qty: f64, prior_entry: Option<f64>) {
182        // Only reducing or flipping fills produce realised PnL.
183        let signed_fill_qty = match fill.side {
184            Side::Buy => fill.size.value(),
185            Side::Sell => -fill.size.value(),
186        };
187        if prior_qty == 0.0 || prior_qty.signum() == signed_fill_qty.signum() {
188            return;
189        }
190        let Some(entry) = prior_entry else {
191            // Reducing fill but no entry price recorded — can't compute
192            // PnL. Log and skip.
193            tracing::debug!(
194                symbol = %fill.symbol,
195                "reducing fill but cached position has no entry price; skipping auto-PnL"
196            );
197            return;
198        };
199        let closed_qty = prior_qty.abs().min(fill.size.value());
200        if closed_qty <= 0.0 {
201            return;
202        }
203        let direction = prior_qty.signum();
204        let gross = (fill.price.value() - entry) * direction * closed_qty;
205        // Apportion fee by closing fraction so a flip fill charges
206        // fees pro-rata to the closing portion.
207        let fee_share = if fill.size.value() > 0.0 {
208            fill.fee * (closed_qty / fill.size.value())
209        } else {
210            0.0
211        };
212
213        // The fill itself is validated at ingestion, but `entry` comes from
214        // the position cache (ultimately `exchange.get_position`) and could
215        // still be non-finite. A NaN fed into `record_close` would disable
216        // the loss-limit gate, so refuse it here.
217        if !gross.is_finite() || !fee_share.is_finite() {
218            tracing::error!(
219                symbol = %fill.symbol,
220                gross,
221                fee_share,
222                entry,
223                "auto-PnL: computed non-finite realised PnL — NOT recorded \
224                 (risk gates unchanged)"
225            );
226            return;
227        }
228
229        // Update the per-symbol risk state directly.
230        let recorded = {
231            let mut map = self.risk.write().await;
232            if let Some(risk) = map.get_mut(&fill.symbol) {
233                risk.session_pnl.record_close(gross, fee_share);
234                let net = gross - fee_share;
235                if net > 0.0 {
236                    risk.circuit_breaker.record_win();
237                } else if net < 0.0 {
238                    risk.circuit_breaker.record_loss();
239                }
240                self.trades_recorded.fetch_add(1, Ordering::Relaxed);
241                self.metrics.histogram(
242                    "rustrade_realised_pnl_quote",
243                    &[("symbol", fill.symbol.as_str())],
244                    net,
245                );
246                true
247            } else {
248                // A fill the risk layer never sees: its losses won't count
249                // toward the session PnL halt or the circuit breaker. Loud
250                // by design — this usually means a symbol was traded that
251                // isn't in `BotConfig.symbols`.
252                self.metrics.counter(
253                    "rustrade_unrecorded_fills_total",
254                    &[("symbol", fill.symbol.as_str())],
255                    1,
256                );
257                tracing::warn!(
258                    symbol = %fill.symbol,
259                    "auto-PnL: fill for a symbol not in the risk-state map — \
260                     realised PnL NOT recorded by any risk gate \
261                     (is it missing from BotConfig.symbols?)"
262                );
263                false
264            }
265        };
266
267        // Persist the updated risk state (lock released) if a store is wired.
268        if recorded && let Some(persister) = &self.persister {
269            persister.persist_symbol(&self.risk, &fill.symbol).await;
270        }
271    }
272}
273
274#[async_trait]
275impl TradingService for FillRoutingService {
276    fn name(&self) -> &str {
277        "fill-routing"
278    }
279
280    fn restart_policy(&self) -> RestartPolicy {
281        RestartPolicy::OnFailure
282    }
283
284    async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
285        tracing::info!("fill-routing service starting");
286        loop {
287            tokio::select! {
288                _ = cancel.cancelled() => {
289                    tracing::info!(
290                        routed = self.fills_routed(),
291                        refresh_errors = self.refresh_errors(),
292                        trades_recorded = self.trades_recorded(),
293                        "fill-routing service shutting down"
294                    );
295                    return Ok(());
296                }
297                next = self.source.next_fill() => {
298                    let Some(fill) = next else {
299                        tracing::info!("fill source closed; exiting");
300                        return Ok(());
301                    };
302
303                    // Ingestion-boundary validation: a non-finite price /
304                    // size / fee would poison the weighted-average PnL and
305                    // — because every NaN comparison is false — silently
306                    // disable the session-PnL halt. Drop the fill entirely
307                    // (not routed, not recorded) and say so loudly.
308                    if !fill_is_finite(&fill) {
309                        self.metrics.counter(
310                            "rustrade_invalid_fills_total",
311                            &[("symbol", fill.symbol.as_str())],
312                            1,
313                        );
314                        tracing::error!(
315                            symbol = %fill.symbol,
316                            order_id = %fill.order_id,
317                            price = fill.price.value(),
318                            size = fill.size.value(),
319                            fee = fill.fee,
320                            "fill source produced a non-finite fill — dropped \
321                             (not routed to brains, not recorded in risk state)"
322                        );
323                        continue;
324                    }
325
326                    let symbol = fill.symbol.clone();
327
328                    // OCO: if this fill belongs to a bracket leg, cancel its
329                    // sibling so the position isn't closed twice.
330                    if let Some(oco) = &self.oco
331                        && let Some((sym, sibling)) = oco.take_sibling(&fill.order_id).await
332                    {
333                        match self.exchange.cancel_order(&sym, &sibling).await {
334                            Ok(_) => {
335                                self.oco_cancels.fetch_add(1, Ordering::Relaxed);
336                                self.metrics.inc("rustrade_oco_cancels_total");
337                                tracing::info!(symbol = %sym, filled = %fill.order_id, cancelled = %sibling, "OCO: cancelled sibling after bracket leg filled");
338                            }
339                            Err(e) => tracing::warn!(symbol = %sym, sibling = %sibling, error = %e, "OCO: failed to cancel sibling (it may already be gone)"),
340                        }
341                    }
342
343                    // Snapshot the pre-fill position so we can compute
344                    // realised PnL before the exchange refreshes the
345                    // entry price.
346                    let (prior_qty, prior_entry) = {
347                        let map = self.positions.read().await;
348                        let p = map.get(&symbol).copied().unwrap_or(rustrade_core::Position::FLAT);
349                        (p.qty, p.entry_price)
350                    };
351
352                    // Route to every brain. Errors are logged but don't
353                    // stop the service — the brain's on_fill is
354                    // informational by contract.
355                    for brain in self.brains.iter() {
356                        if let Err(e) = brain.on_fill(&fill).await {
357                            tracing::warn!(
358                                brain = brain.name(),
359                                error = %e,
360                                "brain on_fill returned error"
361                            );
362                        }
363                    }
364
365                    self.maybe_record_pnl(&fill, prior_qty, prior_entry).await;
366
367                    // Refresh position cache from the exchange.
368                    match self.exchange.get_position(&symbol).await {
369                        Ok(p) if p.qty.is_finite()
370                            && p.entry_price.is_none_or(f64::is_finite) =>
371                        {
372                            self.positions.write().await.insert(symbol.clone(), p);
373                            // The position is visible in the cache now, so
374                            // the portfolio gate no longer needs the
375                            // pending-entry reservation for this symbol.
376                            self.pending.release(&symbol).await;
377                            tracing::debug!(symbol = %symbol, qty = p.qty, "refreshed position");
378                        }
379                        Ok(p) => {
380                            // A non-finite qty/entry would poison every
381                            // PnL computed from the cache — keep the old
382                            // snapshot instead.
383                            self.refresh_errors.fetch_add(1, Ordering::Relaxed);
384                            self.metrics.inc("rustrade_position_refresh_errors_total");
385                            tracing::error!(
386                                symbol = %symbol,
387                                qty = p.qty,
388                                entry = ?p.entry_price,
389                                "exchange returned a non-finite position — cache NOT updated"
390                            );
391                        }
392                        Err(e) => {
393                            self.refresh_errors.fetch_add(1, Ordering::Relaxed);
394                            self.metrics.inc("rustrade_position_refresh_errors_total");
395                            tracing::warn!(
396                                symbol = %symbol,
397                                error = %e,
398                                "failed to refresh position after fill"
399                            );
400                        }
401                    }
402
403                    self.fills_routed.fetch_add(1, Ordering::Relaxed);
404                    self.metrics.counter(
405                        "rustrade_fills_routed_total",
406                        &[("symbol", symbol.as_str())],
407                        1,
408                    );
409                }
410            }
411        }
412    }
413}
414
415/// Every numeric field a [`Fill`] carries must be finite (and the size
416/// non-negative) before the framework will route or record it.
417fn fill_is_finite(f: &Fill) -> bool {
418    f.price.value().is_finite()
419        && f.size.value().is_finite()
420        && f.size.value() >= 0.0
421        && f.fee.is_finite()
422}
423
424// ───────────────────────────────────────────────────────────────────────
425// CandlePollerService
426// ───────────────────────────────────────────────────────────────────────
427
428/// Periodic poll of a [`CandleSource`] for a single `(symbol, interval)`
429/// pair. Publishes each newly-closed candle to the
430/// [`MarketDataBus`].
431///
432/// Per-symbol cadences are achieved by spawning multiple services —
433/// `Bot::with_candle_poller(...)` accepts repeated calls and spawns one
434/// service per registered tuple.
435///
436/// # Deduplication
437///
438/// The service tracks the highest `Candle::time` it has already
439/// published; only candles with a strictly greater timestamp are
440/// re-published. This is robust against exchanges that return overlapping
441/// windows on consecutive polls.
442pub struct CandlePollerService {
443    name: String,
444    source: Arc<dyn CandleSource>,
445    symbol: Symbol,
446    interval: Duration,
447    poll_cadence: Duration,
448    limit: usize,
449    bus: MarketDataBus,
450    metrics: Arc<dyn MetricsSink>,
451    last_time: std::sync::Mutex<i64>,
452    polled: AtomicU64,
453    poll_errors: AtomicU64,
454    published: AtomicU64,
455}
456
457impl CandlePollerService {
458    pub(crate) fn new(
459        source: Arc<dyn CandleSource>,
460        symbol: Symbol,
461        interval: Duration,
462        poll_cadence: Duration,
463        limit: usize,
464        bus: MarketDataBus,
465        metrics: Arc<dyn MetricsSink>,
466    ) -> Self {
467        let name = format!("candle-poller[{}@{}s]", symbol.as_str(), interval.as_secs());
468        Self {
469            name,
470            source,
471            symbol,
472            interval,
473            poll_cadence,
474            limit,
475            bus,
476            metrics,
477            last_time: std::sync::Mutex::new(i64::MIN),
478            polled: AtomicU64::new(0),
479            poll_errors: AtomicU64::new(0),
480            published: AtomicU64::new(0),
481        }
482    }
483
484    /// Total successful polls.
485    pub fn polled(&self) -> u64 {
486        self.polled.load(Ordering::Relaxed)
487    }
488    /// Total failed polls.
489    pub fn poll_errors(&self) -> u64 {
490        self.poll_errors.load(Ordering::Relaxed)
491    }
492    /// Total candles published (deduplicated).
493    pub fn published(&self) -> u64 {
494        self.published.load(Ordering::Relaxed)
495    }
496}
497
498#[async_trait]
499impl TradingService for CandlePollerService {
500    fn name(&self) -> &str {
501        &self.name
502    }
503
504    fn restart_policy(&self) -> RestartPolicy {
505        RestartPolicy::OnFailure
506    }
507
508    async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
509        tracing::info!(service = %self.name, "candle poller starting");
510        let exchange = Exchange::from(self.source.name());
511
512        loop {
513            tokio::select! {
514                _ = cancel.cancelled() => {
515                    tracing::info!(
516                        service = %self.name,
517                        polled = self.polled(),
518                        published = self.published(),
519                        errors = self.poll_errors(),
520                        "candle poller shutting down"
521                    );
522                    return Ok(());
523                }
524                _ = tokio::time::sleep(self.poll_cadence) => {
525                    match self.source.poll(&self.symbol, self.interval, self.limit).await {
526                        Ok(candles) => {
527                            self.polled.fetch_add(1, Ordering::Relaxed);
528                            let mut last = self.last_time.lock().expect("last_time poisoned");
529                            let mut new_high = *last;
530                            for candle in candles {
531                                if candle.time <= *last {
532                                    continue;
533                                }
534                                new_high = new_high.max(candle.time);
535                                self.bus.publish(MarketDataEvent::Candle {
536                                    exchange: exchange.clone(),
537                                    symbol: self.symbol.clone(),
538                                    candle,
539                                });
540                                self.published.fetch_add(1, Ordering::Relaxed);
541                                self.metrics.counter(
542                                    "rustrade_candles_published_total",
543                                    &[("symbol", self.symbol.as_str())],
544                                    1,
545                                );
546                            }
547                            *last = new_high;
548                        }
549                        Err(e) => {
550                            self.poll_errors.fetch_add(1, Ordering::Relaxed);
551                            self.metrics.inc("rustrade_candle_poll_errors_total");
552                            tracing::warn!(
553                                service = %self.name,
554                                error = %e,
555                                "candle poll failed"
556                            );
557                        }
558                    }
559                }
560            }
561        }
562    }
563}