Skip to main content

rustrade_core/
exchange.rs

1//! Trait contracts for exchange integrations.
2//!
3//! Concrete exchange clients (KuCoin, Binance, …) implement [`ExchangeClient`]
4//! so the bot framework can stay exchange-agnostic. A client crate like
5//! `exchange-apiws` already provides most of this — these traits are the
6//! framework-side view.
7
8use std::time::Duration;
9
10use async_trait::async_trait;
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13
14use crate::error::Result;
15use crate::market::{MarketDataEvent, Side, Symbol};
16use crate::types::{Candle, Fill, Order, OrderKind, Position, Price, Volume};
17
18/// Optional adapter capabilities, queried via [`ExchangeClient::supports`].
19///
20/// The framework consults this to degrade gracefully when an adapter
21/// doesn't implement a feature an [`Order`] or strategy requests — e.g.
22/// rejecting an order with `Order.stop = Some(...)` against an adapter
23/// that returns `false` for [`Capability::StopOrders`] rather than
24/// silently dropping the attachment.
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
26#[non_exhaustive]
27pub enum Capability {
28    /// Adapter accepts `Order.stop` and translates to native stop orders.
29    StopOrders,
30    /// Adapter rejects post-only orders that would cross the book as taker.
31    PostOnly,
32    /// Adapter honours `Order.reduce_only`.
33    ReduceOnly,
34    /// Adapter supports `OrderKind::Ioc`.
35    Ioc,
36    /// Adapter supports `OrderKind::Fok`.
37    Fok,
38    /// Adapter can stream a public market-data feed alongside trading.
39    /// Most do; spot-only HTTP adapters may not.
40    PublicFeed,
41    /// Adapter pushes fill / order-update events on a private feed.
42    PrivateFeed,
43    /// Adapter implements [`ExchangeClient::get_open_orders`] and
44    /// [`ExchangeClient::cancel_order`] — required for resting-order
45    /// tracking, TTL cancellation, and reconnect reconciliation.
46    OrderTracking,
47}
48
49/// Status of an order as reported by the exchange.
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
51#[serde(rename_all = "snake_case")]
52pub enum OrderStatus {
53    /// Submitted, not yet on the book.
54    Pending,
55    /// Resting on the book or in the matching engine.
56    Open,
57    /// Partially filled; still resting for the remainder.
58    PartiallyFilled,
59    /// Fully filled.
60    Filled,
61    /// Cancelled before full fill.
62    Cancelled,
63    /// Rejected by the exchange.
64    Rejected,
65}
66
67impl OrderStatus {
68    /// `true` while the order may still rest on the book and consume margin
69    /// — i.e. it is not in a terminal state. The order-tracking layer keeps
70    /// watching these and drops the rest.
71    pub fn is_live(self) -> bool {
72        matches!(
73            self,
74            OrderStatus::Pending | OrderStatus::Open | OrderStatus::PartiallyFilled
75        )
76    }
77}
78
79/// A resting (not-yet-terminal) order as reported by the exchange, returned
80/// by [`ExchangeClient::get_open_orders`].
81///
82/// This is the exchange's view of an order the bot previously placed — used
83/// by the framework's order-tracking layer to age out stale resting orders
84/// and to reconcile tracked state after a reconnect.
85#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
86pub struct OpenOrder {
87    /// Exchange-assigned order id (matches the id returned by
88    /// [`ExchangeClient::place_order`]).
89    pub order_id: String,
90    /// Client-supplied id echoed back by the exchange, if any.
91    pub client_id: Option<String>,
92    /// Symbol the order is for.
93    pub symbol: Symbol,
94    /// Side of the resting order.
95    pub side: Side,
96    /// Order kind (limit / post-only / …). Market orders never rest, so a
97    /// well-behaved adapter won't report them here.
98    pub kind: OrderKind,
99    /// Limit price, if the order kind carries one.
100    pub limit_price: Option<Price>,
101    /// Original order quantity.
102    pub size: Volume,
103    /// Quantity filled so far (`0` for an untouched resting order).
104    pub filled: Volume,
105    /// Current status as reported by the exchange.
106    pub status: OrderStatus,
107    /// When the order was created at the exchange, if known. Used by the
108    /// tracker's TTL logic; `None` falls back to the time the bot first
109    /// observed the order.
110    pub created_at: Option<DateTime<Utc>>,
111}
112
113/// What the bot framework needs from an exchange to trade.
114///
115/// This trait is intentionally narrow — the full surface of a real exchange
116/// client (ws token management, stop orders, funding history, account tiers)
117/// belongs in the concrete adapter crate, not here. The framework only
118/// needs to: place orders, close positions, read balance and position state.
119///
120/// # Async + object-safe
121///
122/// `async_trait` is used so `Arc<dyn ExchangeClient>` works — downstream
123/// code can swap concrete exchanges at runtime without generics propagating
124/// through the whole system.
125///
126/// # Example
127///
128/// A stub adapter useful for examples and tests. Real adapters connect
129/// to a network and report actual state.
130///
131/// ```
132/// use async_trait::async_trait;
133/// use rustrade_core::{Capability, ExchangeClient, Order, Position, Result, Symbol};
134///
135/// struct StubExchange;
136///
137/// #[async_trait]
138/// impl ExchangeClient for StubExchange {
139///     fn name(&self) -> &str { "stub" }
140///     async fn place_order(&self, _order: &Order) -> Result<String> {
141///         Ok("order-1".into())
142///     }
143///     async fn cancel_all(&self, _symbol: &Symbol) -> Result<usize> { Ok(0) }
144///     async fn close_position(&self, _symbol: &Symbol, _p: &Position) -> Result<String> {
145///         Ok("close-1".into())
146///     }
147///     async fn get_position(&self, _symbol: &Symbol) -> Result<Position> {
148///         Ok(Position::FLAT)
149///     }
150///     async fn get_balance(&self, _currency: &str) -> Result<f64> { Ok(0.0) }
151///     fn supports(&self, c: Capability) -> bool {
152///         matches!(c, Capability::ReduceOnly)
153///     }
154/// }
155/// ```
156#[async_trait]
157pub trait ExchangeClient: Send + Sync + 'static {
158    /// Short, lowercase exchange identifier — e.g. `"kucoin"`.
159    fn name(&self) -> &str;
160
161    /// Place an order. Returns the exchange-assigned order id.
162    async fn place_order(&self, order: &Order) -> Result<String>;
163
164    /// Cancel all open orders for a symbol. Returns the count cancelled.
165    async fn cancel_all(&self, symbol: &Symbol) -> Result<usize>;
166
167    /// Close the given position with a market order. Returns the exchange
168    /// order id of the close.
169    async fn close_position(&self, symbol: &Symbol, position: &Position) -> Result<String>;
170
171    /// Fetch the current position for a symbol (or `Position::FLAT` if flat).
172    async fn get_position(&self, symbol: &Symbol) -> Result<Position>;
173
174    /// Fetch the current balance in the given currency.
175    async fn get_balance(&self, currency: &str) -> Result<f64>;
176
177    /// Does this adapter support the given optional capability?
178    ///
179    /// The default returns `false` for every variant — adapters opt in by
180    /// overriding. This is intentionally conservative: a new adapter that
181    /// forgets to override won't quietly accept orders it can't execute.
182    fn supports(&self, _capability: Capability) -> bool {
183        false
184    }
185
186    /// Base-asset units per one contract for the given symbol.
187    ///
188    /// For spot exchanges this is `1.0` for every symbol — one unit traded
189    /// equals one unit of the base asset. For futures, it's the contract
190    /// multiplier (e.g. `0.001` for KuCoin XBTUSDTM where one contract is
191    /// 0.001 BTC). The risk layer's
192    /// [`PositionSizer`](https://docs.rs/rustrade-risk/latest/rustrade_risk/sizing/struct.PositionSizer.html)
193    /// uses this to convert margin × leverage into a contract count.
194    ///
195    /// The default returns `1.0` — appropriate for spot adapters. Futures
196    /// adapters override.
197    fn contract_value(&self, _symbol: &Symbol) -> f64 {
198        1.0
199    }
200
201    /// List the currently-resting (non-terminal) orders for a symbol.
202    ///
203    /// Used by the framework's order-tracking layer to age out stale limit
204    /// orders and to reconcile after a reconnect. The default returns an
205    /// empty list — an adapter that doesn't advertise
206    /// [`Capability::OrderTracking`] is assumed to have no resting orders
207    /// the framework should manage (e.g. a market-only or fire-and-forget
208    /// adapter). Adapters that support resting orders **must** override this
209    /// and advertise the capability.
210    async fn get_open_orders(&self, _symbol: &Symbol) -> Result<Vec<OpenOrder>> {
211        Ok(Vec::new())
212    }
213
214    /// Cancel a single resting order by its exchange-assigned id.
215    ///
216    /// Returns `Ok(true)` if the order was cancelled, `Ok(false)` if it was
217    /// already gone (filled or cancelled) — a benign no-op the caller can
218    /// ignore. The default errors, so the tracking layer never believes it
219    /// cancelled an order against an adapter that can't actually do so;
220    /// adapters advertising [`Capability::OrderTracking`] override it.
221    async fn cancel_order(&self, _symbol: &Symbol, _order_id: &str) -> Result<bool> {
222        Err(crate::Error::exchange(
223            "cancel_order not supported by this adapter (no Capability::OrderTracking)",
224        ))
225    }
226}
227
228/// A source of live market data (WebSocket feed, backtest replay, simulator).
229///
230/// Implementors push events into the bot via the
231/// [`MarketDataBus`](crate::bus::MarketDataBus) that the supervisor creates.
232/// `MarketSource` is intended to be wrapped by a `TradingService` in
233/// `rustrade-supervisor` so it inherits lifecycle management and
234/// auto-restart; this trait just documents the contract on the data side.
235///
236/// # Example
237///
238/// A loopback source that publishes a single tick and exits. Production
239/// sources hold the `MarketDataBus` sender they were constructed with
240/// and publish to it from `run`.
241///
242/// ```
243/// use async_trait::async_trait;
244/// use rustrade_core::{MarketSource, Result};
245///
246/// struct OneShotSource {
247///     name: String,
248/// }
249///
250/// #[async_trait]
251/// impl MarketSource for OneShotSource {
252///     fn name(&self) -> &str { &self.name }
253///     async fn run(&self) -> Result<()> {
254///         // In a real impl: connect to feed, loop publishing events to
255///         // the bus, return Ok(()) on clean shutdown.
256///         Ok(())
257///     }
258///     fn is_live(&self) -> bool { false }
259/// }
260/// ```
261///
262/// # Cancellation contract
263///
264/// `run` does **not** take a `CancellationToken` directly. Cancellation is
265/// expected to flow through the wrapping `TradingService` — when the
266/// supervisor cancels that service's token, it drops the
267/// `MarketSource::run` future at its next `.await`.
268///
269/// Implementors must therefore be **drop-safe**: any open resources
270/// (WebSocket connections, HTTP sessions, file handles) must release
271/// cleanly when their containing future is dropped. In practice this
272/// means:
273///
274/// - Use `tokio::select!` against external events only inside your own
275///   loop, not against an externally-owned cancel signal here.
276/// - Don't hold a `MutexGuard` across an `.await` that could be dropped
277///   mid-flight — dropping a guard is fine, but holding one while the
278///   future is destructured can deadlock the lock.
279/// - If you need explicit teardown, perform it in a `Drop` impl on the
280///   implementing type rather than at the end of `run`.
281#[async_trait]
282pub trait MarketSource: Send + Sync + 'static {
283    /// Short identifier for logging — typically the exchange name.
284    fn name(&self) -> &str;
285
286    /// Begin streaming events. Should run until the feed terminates or
287    /// the caller drops the returned future (see the cancellation contract
288    /// in the trait docs).
289    async fn run(&self) -> Result<()>;
290
291    /// Is the feed currently receiving data?
292    fn is_live(&self) -> bool;
293}
294
295/// Received fill events from the exchange's private feed.
296///
297/// Adapters implement this to route fills into the bot. Most exchanges push
298/// both order updates and fill events; this trait abstracts the "fill" part.
299///
300/// # Example
301///
302/// An in-memory fill source backed by a [`tokio::sync::mpsc`] channel —
303/// useful for tests and replay drivers.
304///
305/// ```
306/// use async_trait::async_trait;
307/// use rustrade_core::{Fill, FillSource};
308/// use tokio::sync::mpsc;
309/// use tokio::sync::Mutex;
310///
311/// struct ChannelFills {
312///     rx: Mutex<mpsc::UnboundedReceiver<Fill>>,
313/// }
314///
315/// #[async_trait]
316/// impl FillSource for ChannelFills {
317///     async fn next_fill(&self) -> Option<Fill> {
318///         self.rx.lock().await.recv().await
319///     }
320/// }
321/// ```
322#[async_trait]
323pub trait FillSource: Send + Sync + 'static {
324    /// Await the next fill. Returns `None` when the stream ends.
325    async fn next_fill(&self) -> Option<Fill>;
326}
327
328/// Received order-book / market-data events from the exchange's public feed.
329///
330/// # Example
331///
332/// A simple channel-backed event source — typical for tests that push
333/// scripted ticks/candles into the bot.
334///
335/// ```
336/// use async_trait::async_trait;
337/// use rustrade_core::{EventSource, MarketDataEvent};
338/// use tokio::sync::mpsc;
339/// use tokio::sync::Mutex;
340///
341/// struct ChannelEvents {
342///     rx: Mutex<mpsc::UnboundedReceiver<MarketDataEvent>>,
343/// }
344///
345/// #[async_trait]
346/// impl EventSource for ChannelEvents {
347///     async fn next_event(&self) -> Option<MarketDataEvent> {
348///         self.rx.lock().await.recv().await
349///     }
350/// }
351/// ```
352#[async_trait]
353pub trait EventSource: Send + Sync + 'static {
354    /// Await the next event. Returns `None` when the stream ends.
355    async fn next_event(&self) -> Option<MarketDataEvent>;
356}
357
358/// Periodic candle source — separate from [`MarketSource`] because
359/// candle polling has a fundamentally different shape (pull, paced)
360/// than streaming events (push, unbounded).
361///
362/// Spot-only adapters don't need to implement this; the framework will
363/// only spawn a candle poller when one is wired via
364/// `Bot::with_candle_poller`. Futures adapters with native candle
365/// endpoints (KuCoin, Binance, Bybit, …) implement it directly.
366///
367/// # Example
368///
369/// A fixed-series source useful for backtests and replays. The
370/// framework's poller will dedupe by `Candle::time`, so repeated polls
371/// returning the same head are safe.
372///
373/// ```
374/// use std::time::Duration;
375/// use async_trait::async_trait;
376/// use rustrade_core::{Candle, CandleSource, Result, Symbol};
377///
378/// struct FixedCandles {
379///     candles: Vec<Candle>,
380/// }
381///
382/// #[async_trait]
383/// impl CandleSource for FixedCandles {
384///     fn name(&self) -> &str { "fixed" }
385///     async fn poll(
386///         &self,
387///         _symbol: &Symbol,
388///         _interval: Duration,
389///         limit: usize,
390///     ) -> Result<Vec<Candle>> {
391///         Ok(self.candles.iter().rev().take(limit).rev().copied().collect())
392///     }
393/// }
394/// ```
395#[async_trait]
396pub trait CandleSource: Send + Sync + 'static {
397    /// Short identifier for logging — typically the exchange name.
398    fn name(&self) -> &str;
399
400    /// Fetch up to `limit` of the most recent completed candles for
401    /// `symbol` at the given interval. Implementors return them in
402    /// chronological order (oldest first). If the exchange's native
403    /// endpoint returns newest-first, sort before returning.
404    async fn poll(&self, symbol: &Symbol, interval: Duration, limit: usize) -> Result<Vec<Candle>>;
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410    use crate::types::{Price, Volume};
411
412    #[test]
413    fn order_status_is_live_only_for_non_terminal() {
414        assert!(OrderStatus::Pending.is_live());
415        assert!(OrderStatus::Open.is_live());
416        assert!(OrderStatus::PartiallyFilled.is_live());
417        assert!(!OrderStatus::Filled.is_live());
418        assert!(!OrderStatus::Cancelled.is_live());
419        assert!(!OrderStatus::Rejected.is_live());
420    }
421
422    #[test]
423    fn open_order_serde_roundtrip() {
424        let o = OpenOrder {
425            order_id: "ex-1".into(),
426            client_id: Some("cli-1".into()),
427            symbol: Symbol::from("BTCUSDT"),
428            side: Side::Buy,
429            kind: OrderKind::Limit,
430            limit_price: Some(Price(100.0)),
431            size: Volume(2.0),
432            filled: Volume(0.5),
433            status: OrderStatus::PartiallyFilled,
434            created_at: None,
435        };
436        let json = serde_json::to_string(&o).unwrap();
437        let back: OpenOrder = serde_json::from_str(&json).unwrap();
438        assert_eq!(back, o);
439    }
440
441    #[test]
442    fn order_status_serde_is_snake_case() {
443        let json = serde_json::to_string(&OrderStatus::PartiallyFilled).unwrap();
444        assert_eq!(json, "\"partially_filled\"");
445    }
446
447    // A default adapter (no OrderTracking) reports no open orders and
448    // refuses to cancel — proving the conservative defaults.
449    struct DefaultAdapter;
450    #[async_trait]
451    impl ExchangeClient for DefaultAdapter {
452        fn name(&self) -> &str {
453            "default"
454        }
455        async fn place_order(&self, _o: &Order) -> Result<String> {
456            Ok("id".into())
457        }
458        async fn cancel_all(&self, _s: &Symbol) -> Result<usize> {
459            Ok(0)
460        }
461        async fn close_position(&self, _s: &Symbol, _p: &Position) -> Result<String> {
462            Ok("c".into())
463        }
464        async fn get_position(&self, _s: &Symbol) -> Result<Position> {
465            Ok(Position::FLAT)
466        }
467        async fn get_balance(&self, _c: &str) -> Result<f64> {
468            Ok(0.0)
469        }
470    }
471
472    #[tokio::test]
473    async fn default_get_open_orders_is_empty() {
474        let a = DefaultAdapter;
475        assert!(
476            a.get_open_orders(&Symbol::from("X"))
477                .await
478                .unwrap()
479                .is_empty()
480        );
481        assert!(!a.supports(Capability::OrderTracking));
482    }
483
484    #[tokio::test]
485    async fn default_cancel_order_errors() {
486        let a = DefaultAdapter;
487        assert!(a.cancel_order(&Symbol::from("X"), "id").await.is_err());
488    }
489}