Skip to main content

rustrade/
order_tracker.rs

1//! Order lifecycle tracking (0.2c).
2//!
3//! [`ExecutionService`](crate::execution::ExecutionService) returns an
4//! exchange order id from `place_order` and then forgets it. For market
5//! orders that's fine — they fill or reject immediately. But once a brain
6//! can place resting limit / post-only orders (0.2b), an order can sit on
7//! the book indefinitely: a stale limit ties up margin and may fill long
8//! after the signal that produced it is meaningless.
9//!
10//! The [`OrderTracker`] records every resting order the framework places.
11//! The [`OrderReaperService`] periodically:
12//!
13//! 1. **Reconciles** tracked state against the exchange's actual open
14//!    orders (so fills/cancels that happened out-of-band drop out of the
15//!    tracker), and
16//! 2. **Ages out** any tracked order older than a configured TTL by calling
17//!    [`ExchangeClient::cancel_order`].
18//!
19//! Both require [`Capability::OrderTracking`](rustrade_core::Capability);
20//! without it the reaper is never spawned (see `Bot::with_order_tracking`).
21
22use std::collections::HashMap;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicU64, Ordering};
25use std::time::Duration;
26
27use async_trait::async_trait;
28use chrono::{DateTime, Utc};
29use rustrade_core::{ExchangeClient, MetricsSink, Order, OrderKind, Symbol};
30use rustrade_supervisor::{RestartPolicy, TradingService};
31use tokio::sync::RwLock;
32use tokio_util::sync::CancellationToken;
33
34/// One order the framework is tracking on the book.
35#[derive(Debug, Clone, PartialEq)]
36pub struct TrackedOrder {
37    /// Exchange-assigned order id.
38    pub order_id: String,
39    /// Symbol the order is for.
40    pub symbol: Symbol,
41    /// When the framework first recorded the order (its own clock).
42    pub placed_at: DateTime<Utc>,
43}
44
45/// Shared, cheaply-cloneable record of resting orders the framework placed.
46///
47/// Keyed by exchange order id. Only orders that can *rest* are tracked —
48/// market orders fill or reject immediately and are never recorded.
49#[derive(Clone, Default)]
50pub struct OrderTracker {
51    inner: Arc<RwLock<HashMap<String, TrackedOrder>>>,
52}
53
54impl OrderTracker {
55    /// Create an empty tracker.
56    pub fn new() -> Self {
57        Self::default()
58    }
59
60    /// Record a freshly-placed order — unless it's a market order (those
61    /// never rest, so tracking them would only create churn). Called by the
62    /// [`ExecutionService`](crate::execution::ExecutionService) right after
63    /// a successful `place_order`.
64    pub(crate) async fn record(&self, order_id: String, order: &Order) {
65        if matches!(order.kind, OrderKind::Market) {
66            return;
67        }
68        self.inner.write().await.insert(
69            order_id.clone(),
70            TrackedOrder {
71                order_id,
72                symbol: order.symbol.clone(),
73                placed_at: Utc::now(),
74            },
75        );
76    }
77
78    /// Stop tracking an order (filled, cancelled, or reconciled away).
79    pub(crate) async fn forget(&self, order_id: &str) {
80        self.inner.write().await.remove(order_id);
81    }
82
83    /// Number of orders currently tracked.
84    pub async fn len(&self) -> usize {
85        self.inner.read().await.len()
86    }
87
88    /// `true` when no orders are tracked.
89    pub async fn is_empty(&self) -> bool {
90        self.inner.read().await.is_empty()
91    }
92
93    /// Snapshot of all tracked orders (for `BotHealth` / debugging).
94    pub async fn snapshot(&self) -> Vec<TrackedOrder> {
95        self.inner.read().await.values().cloned().collect()
96    }
97}
98
99/// One-cancels-other registry for bracket (SL + TP) protective pairs.
100///
101/// When a brain emits both `stop_price` and `take_profit_price`, the
102/// [`ExecutionService`](crate::execution::ExecutionService) places two
103/// reduce-only protective orders and registers them here. When either one
104/// fills, the [`FillRoutingService`](crate::FillRoutingService) cancels the
105/// sibling so the position is never closed twice.
106///
107/// Cheaply cloneable; both directions of each pair are stored so a fill on
108/// either leg finds its sibling.
109#[derive(Clone, Default)]
110pub struct OcoRegistry {
111    inner: Arc<RwLock<HashMap<String, OcoEntry>>>,
112}
113
114#[derive(Clone)]
115struct OcoEntry {
116    sibling: String,
117    symbol: Symbol,
118}
119
120impl OcoRegistry {
121    /// Create an empty registry.
122    pub fn new() -> Self {
123        Self::default()
124    }
125
126    /// Register a protective pair `(a, b)` for `symbol`. A fill on either
127    /// id will then yield the other via [`Self::take_sibling`].
128    pub(crate) async fn register(&self, symbol: Symbol, a: String, b: String) {
129        let mut map = self.inner.write().await;
130        map.insert(
131            a.clone(),
132            OcoEntry {
133                sibling: b.clone(),
134                symbol: symbol.clone(),
135            },
136        );
137        map.insert(b, OcoEntry { sibling: a, symbol });
138    }
139
140    /// If `order_id` is part of a registered pair, remove **both** legs and
141    /// return the sibling's `(symbol, order_id)` to cancel. Idempotent: a
142    /// second call for either leg returns `None`.
143    pub(crate) async fn take_sibling(&self, order_id: &str) -> Option<(Symbol, String)> {
144        let mut map = self.inner.write().await;
145        let entry = map.remove(order_id)?;
146        // Drop the reverse mapping too so the sibling's own fill is a no-op.
147        map.remove(&entry.sibling);
148        Some((entry.symbol, entry.sibling))
149    }
150
151    /// Number of individual legs currently registered (2 per live pair).
152    pub async fn len(&self) -> usize {
153        self.inner.read().await.len()
154    }
155
156    /// `true` when no pairs are registered.
157    pub async fn is_empty(&self) -> bool {
158        self.inner.read().await.is_empty()
159    }
160}
161
162/// Supervised service that reconciles tracked orders against the exchange
163/// and cancels any that outlive the configured TTL.
164///
165/// Spawned by `Bot::run_until_shutdown` only when order tracking is wired
166/// **and** the adapter advertises
167/// [`Capability::OrderTracking`](rustrade_core::Capability).
168pub struct OrderReaperService {
169    exchange: Arc<dyn ExchangeClient>,
170    tracker: OrderTracker,
171    symbols: Vec<Symbol>,
172    ttl: Duration,
173    poll_cadence: Duration,
174    metrics: Arc<dyn MetricsSink>,
175    cancelled: AtomicU64,
176    reconciled: AtomicU64,
177    sweeps: AtomicU64,
178}
179
180impl OrderReaperService {
181    pub(crate) fn new(
182        exchange: Arc<dyn ExchangeClient>,
183        tracker: OrderTracker,
184        symbols: Vec<Symbol>,
185        ttl: Duration,
186        poll_cadence: Duration,
187        metrics: Arc<dyn MetricsSink>,
188    ) -> Self {
189        Self {
190            exchange,
191            tracker,
192            symbols,
193            ttl,
194            poll_cadence,
195            metrics,
196            cancelled: AtomicU64::new(0),
197            reconciled: AtomicU64::new(0),
198            sweeps: AtomicU64::new(0),
199        }
200    }
201
202    /// Total resting orders cancelled for exceeding the TTL.
203    pub fn cancelled(&self) -> u64 {
204        self.cancelled.load(Ordering::Relaxed)
205    }
206    /// Total tracked orders dropped because the exchange no longer lists
207    /// them (filled or cancelled out-of-band).
208    pub fn reconciled(&self) -> u64 {
209        self.reconciled.load(Ordering::Relaxed)
210    }
211    /// Total reconcile/reap sweeps performed.
212    pub fn sweeps(&self) -> u64 {
213        self.sweeps.load(Ordering::Relaxed)
214    }
215
216    /// One reconcile + reap pass across all symbols. Factored out so it can
217    /// be unit-tested without the service loop.
218    pub(crate) async fn sweep_once(&self) {
219        self.sweeps.fetch_add(1, Ordering::Relaxed);
220        let now = Utc::now();
221
222        for symbol in &self.symbols {
223            let open = match self.exchange.get_open_orders(symbol).await {
224                Ok(o) => o,
225                Err(e) => {
226                    tracing::warn!(symbol = %symbol, error = %e, "get_open_orders failed; skipping sweep for symbol");
227                    continue;
228                }
229            };
230            // Index the exchange's live orders by id for O(1) membership.
231            let live: HashMap<&str, &rustrade_core::OpenOrder> =
232                open.iter().map(|o| (o.order_id.as_str(), o)).collect();
233
234            // Snapshot tracked orders for this symbol, then decide per order
235            // without holding the lock across the await for cancellation.
236            let tracked: Vec<TrackedOrder> = self
237                .tracker
238                .snapshot()
239                .await
240                .into_iter()
241                .filter(|t| &t.symbol == symbol)
242                .collect();
243
244            for t in tracked {
245                match live.get(t.order_id.as_str()) {
246                    None => {
247                        // Exchange no longer lists it → filled or cancelled
248                        // elsewhere. Drop from the tracker.
249                        self.tracker.forget(&t.order_id).await;
250                        self.reconciled.fetch_add(1, Ordering::Relaxed);
251                        tracing::debug!(symbol = %symbol, order_id = %t.order_id, "reconciled away (no longer open)");
252                    }
253                    Some(oo) => {
254                        // Still resting. Age it out if past TTL. Prefer the
255                        // exchange's created_at; fall back to when we first
256                        // saw it.
257                        let age_from = oo.created_at.unwrap_or(t.placed_at);
258                        let age = now.signed_duration_since(age_from);
259                        if age.num_milliseconds().max(0) as u128 >= self.ttl.as_millis() {
260                            match self.exchange.cancel_order(symbol, &t.order_id).await {
261                                Ok(_) => {
262                                    self.tracker.forget(&t.order_id).await;
263                                    self.cancelled.fetch_add(1, Ordering::Relaxed);
264                                    self.metrics.counter(
265                                        "rustrade_orders_cancelled_ttl_total",
266                                        &[("symbol", symbol.as_str())],
267                                        1,
268                                    );
269                                    tracing::info!(symbol = %symbol, order_id = %t.order_id, ttl_secs = self.ttl.as_secs(), "cancelled stale resting order (TTL)");
270                                }
271                                Err(e) => {
272                                    tracing::warn!(symbol = %symbol, order_id = %t.order_id, error = %e, "TTL cancel failed; will retry next sweep")
273                                }
274                            }
275                        }
276                    }
277                }
278            }
279        }
280    }
281}
282
283#[async_trait]
284impl TradingService for OrderReaperService {
285    fn name(&self) -> &str {
286        "order-reaper"
287    }
288
289    fn restart_policy(&self) -> RestartPolicy {
290        RestartPolicy::OnFailure
291    }
292
293    async fn run(&self, cancel: CancellationToken) -> anyhow::Result<()> {
294        tracing::info!(
295            ttl_secs = self.ttl.as_secs(),
296            cadence_secs = self.poll_cadence.as_secs(),
297            symbols = self.symbols.len(),
298            "order-reaper starting"
299        );
300        loop {
301            tokio::select! {
302                _ = cancel.cancelled() => {
303                    tracing::info!(
304                        sweeps = self.sweeps(),
305                        cancelled = self.cancelled(),
306                        reconciled = self.reconciled(),
307                        "order-reaper shutting down"
308                    );
309                    return Ok(());
310                }
311                _ = tokio::time::sleep(self.poll_cadence) => {
312                    self.sweep_once().await;
313                }
314            }
315        }
316    }
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322    use rustrade_core::{Capability, NoopSink, Position, Price, Result, Side, Volume};
323
324    fn limit(symbol: &str) -> Order {
325        Order::limit(symbol, Side::Buy, Volume(1.0), Price(100.0))
326    }
327
328    #[tokio::test]
329    async fn tracker_ignores_market_orders() {
330        let t = OrderTracker::new();
331        t.record(
332            "m1".into(),
333            &Order::market("BTCUSDT", Side::Buy, Volume(1.0)),
334        )
335        .await;
336        assert!(t.is_empty().await, "market orders must not be tracked");
337
338        t.record("l1".into(), &limit("BTCUSDT")).await;
339        assert_eq!(t.len().await, 1);
340    }
341
342    #[tokio::test]
343    async fn tracker_forget_removes() {
344        let t = OrderTracker::new();
345        t.record("l1".into(), &limit("BTCUSDT")).await;
346        t.forget("l1").await;
347        assert!(t.is_empty().await);
348    }
349
350    // Mock exchange: configurable open orders + records cancel calls.
351    struct MockEx {
352        open: std::sync::Mutex<Vec<rustrade_core::OpenOrder>>,
353        cancels: std::sync::Mutex<Vec<String>>,
354    }
355    impl MockEx {
356        fn new(open: Vec<rustrade_core::OpenOrder>) -> Arc<Self> {
357            Arc::new(Self {
358                open: std::sync::Mutex::new(open),
359                cancels: std::sync::Mutex::new(Vec::new()),
360            })
361        }
362    }
363    #[async_trait]
364    impl ExchangeClient for MockEx {
365        fn name(&self) -> &str {
366            "mock"
367        }
368        async fn place_order(&self, _o: &Order) -> Result<String> {
369            Ok("x".into())
370        }
371        async fn cancel_all(&self, _s: &Symbol) -> Result<usize> {
372            Ok(0)
373        }
374        async fn close_position(&self, _s: &Symbol, _p: &Position) -> Result<String> {
375            Ok("c".into())
376        }
377        async fn get_position(&self, _s: &Symbol) -> Result<Position> {
378            Ok(Position::FLAT)
379        }
380        async fn get_balance(&self, _c: &str) -> Result<f64> {
381            Ok(0.0)
382        }
383        fn supports(&self, c: Capability) -> bool {
384            matches!(c, Capability::OrderTracking)
385        }
386        async fn get_open_orders(&self, _s: &Symbol) -> Result<Vec<rustrade_core::OpenOrder>> {
387            Ok(self.open.lock().unwrap().clone())
388        }
389        async fn cancel_order(&self, _s: &Symbol, order_id: &str) -> Result<bool> {
390            self.cancels.lock().unwrap().push(order_id.to_string());
391            Ok(true)
392        }
393    }
394
395    fn open_order(id: &str, created_at: Option<DateTime<Utc>>) -> rustrade_core::OpenOrder {
396        rustrade_core::OpenOrder {
397            order_id: id.into(),
398            client_id: None,
399            symbol: Symbol::from("BTCUSDT"),
400            side: Side::Buy,
401            kind: OrderKind::Limit,
402            limit_price: Some(Price(100.0)),
403            size: Volume(1.0),
404            filled: Volume(0.0),
405            status: rustrade_core::OrderStatus::Open,
406            created_at,
407        }
408    }
409
410    fn reaper(ex: Arc<MockEx>, tracker: OrderTracker, ttl: Duration) -> OrderReaperService {
411        OrderReaperService::new(
412            ex,
413            tracker,
414            vec![Symbol::from("BTCUSDT")],
415            ttl,
416            Duration::from_secs(60),
417            Arc::new(NoopSink),
418        )
419    }
420
421    #[tokio::test]
422    async fn sweep_reconciles_away_vanished_order() {
423        // Tracked but the exchange reports no open orders → forgotten.
424        let tracker = OrderTracker::new();
425        tracker.record("gone".into(), &limit("BTCUSDT")).await;
426        let ex = MockEx::new(vec![]);
427        let svc = reaper(ex.clone(), tracker.clone(), Duration::from_secs(3600));
428
429        svc.sweep_once().await;
430        assert!(
431            tracker.is_empty().await,
432            "vanished order should be reconciled away"
433        );
434        assert_eq!(svc.reconciled(), 1);
435        assert_eq!(svc.cancelled(), 0);
436        assert!(ex.cancels.lock().unwrap().is_empty());
437    }
438
439    #[tokio::test]
440    async fn sweep_keeps_fresh_resting_order() {
441        // Resting and well within TTL → left alone.
442        let tracker = OrderTracker::new();
443        tracker.record("fresh".into(), &limit("BTCUSDT")).await;
444        let ex = MockEx::new(vec![open_order("fresh", Some(Utc::now()))]);
445        let svc = reaper(ex.clone(), tracker.clone(), Duration::from_secs(3600));
446
447        svc.sweep_once().await;
448        assert_eq!(tracker.len().await, 1, "fresh order should remain tracked");
449        assert_eq!(svc.cancelled(), 0);
450        assert!(ex.cancels.lock().unwrap().is_empty());
451    }
452
453    #[tokio::test]
454    async fn sweep_cancels_order_past_ttl() {
455        // Resting, created an hour ago, TTL 1s → cancelled + forgotten.
456        let tracker = OrderTracker::new();
457        tracker.record("stale".into(), &limit("BTCUSDT")).await;
458        let created = Utc::now() - chrono::Duration::hours(1);
459        let ex = MockEx::new(vec![open_order("stale", Some(created))]);
460        let svc = reaper(ex.clone(), tracker.clone(), Duration::from_secs(1));
461
462        svc.sweep_once().await;
463        assert_eq!(svc.cancelled(), 1, "stale order should be cancelled");
464        assert!(
465            tracker.is_empty().await,
466            "cancelled order should be forgotten"
467        );
468        assert_eq!(
469            ex.cancels.lock().unwrap().as_slice(),
470            &["stale".to_string()]
471        );
472    }
473
474    #[tokio::test]
475    async fn oco_register_and_take_sibling_is_symmetric() {
476        let oco = OcoRegistry::new();
477        let sym = Symbol::from("BTCUSDT");
478        oco.register(sym.clone(), "sl".into(), "tp".into()).await;
479        assert_eq!(oco.len().await, 2);
480
481        // A fill on the SL leg yields the TP sibling and clears both legs.
482        let sib = oco.take_sibling("sl").await;
483        assert_eq!(sib, Some((sym, "tp".to_string())));
484        assert!(oco.is_empty().await, "both legs cleared after one fills");
485
486        // The sibling's own (later) fill is now a no-op.
487        assert!(oco.take_sibling("tp").await.is_none());
488    }
489
490    #[tokio::test]
491    async fn oco_take_sibling_from_either_leg() {
492        let oco = OcoRegistry::new();
493        let sym = Symbol::from("ETHUSDT");
494        oco.register(sym.clone(), "a".into(), "b".into()).await;
495        // Filling the TP leg ("b") yields the SL leg ("a").
496        assert_eq!(oco.take_sibling("b").await, Some((sym, "a".to_string())));
497        assert!(oco.is_empty().await);
498    }
499
500    #[tokio::test]
501    async fn oco_unknown_id_is_none() {
502        let oco = OcoRegistry::new();
503        assert!(oco.take_sibling("nope").await.is_none());
504    }
505}