rustrade_core/exchange.rs
1//! Trait contracts for exchange integrations.
2//!
3//! Concrete exchange clients (KuCoin, Binance, …) implement [`ExchangeClient`]
4//! so the bot framework can stay exchange-agnostic. A client crate like
5//! `exchange-apiws` already provides most of this — these traits are the
6//! framework-side view.
7
8use std::time::Duration;
9
10use async_trait::async_trait;
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13
14use crate::error::Result;
15use crate::instrument::InstrumentSpec;
16use crate::market::{MarketDataEvent, Side, Symbol};
17use crate::types::{Candle, Fill, Order, OrderKind, Position, Price, Volume};
18
19/// Optional adapter capabilities, queried via [`ExchangeClient::supports`].
20///
21/// The framework consults this to degrade gracefully when an adapter
22/// doesn't implement a feature an [`Order`] or strategy requests — e.g.
23/// rejecting an order with `Order.stop = Some(...)` against an adapter
24/// that returns `false` for [`Capability::StopOrders`] rather than
25/// silently dropping the attachment.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27#[non_exhaustive]
28pub enum Capability {
29 /// Adapter accepts `Order.stop` and translates to native stop orders.
30 StopOrders,
31 /// Adapter rejects post-only orders that would cross the book as taker.
32 PostOnly,
33 /// Adapter honours `Order.reduce_only`.
34 ReduceOnly,
35 /// Adapter supports `OrderKind::Ioc`.
36 Ioc,
37 /// Adapter supports `OrderKind::Fok`.
38 Fok,
39 /// Adapter can stream a public market-data feed alongside trading.
40 /// Most do; spot-only HTTP adapters may not.
41 PublicFeed,
42 /// Adapter pushes fill / order-update events on a private feed.
43 PrivateFeed,
44 /// Adapter implements [`ExchangeClient::get_open_orders`] and
45 /// [`ExchangeClient::cancel_order`] — required for resting-order
46 /// tracking, TTL cancellation, and reconnect reconciliation.
47 OrderTracking,
48}
49
50/// Status of an order as reported by the exchange.
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
52#[serde(rename_all = "snake_case")]
53pub enum OrderStatus {
54 /// Submitted, not yet on the book.
55 Pending,
56 /// Resting on the book or in the matching engine.
57 Open,
58 /// Partially filled; still resting for the remainder.
59 PartiallyFilled,
60 /// Fully filled.
61 Filled,
62 /// Cancelled before full fill.
63 Cancelled,
64 /// Rejected by the exchange.
65 Rejected,
66}
67
68impl OrderStatus {
69 /// `true` while the order may still rest on the book and consume margin
70 /// — i.e. it is not in a terminal state. The order-tracking layer keeps
71 /// watching these and drops the rest.
72 pub fn is_live(self) -> bool {
73 matches!(
74 self,
75 OrderStatus::Pending | OrderStatus::Open | OrderStatus::PartiallyFilled
76 )
77 }
78}
79
80/// A resting (not-yet-terminal) order as reported by the exchange, returned
81/// by [`ExchangeClient::get_open_orders`].
82///
83/// This is the exchange's view of an order the bot previously placed — used
84/// by the framework's order-tracking layer to age out stale resting orders
85/// and to reconcile tracked state after a reconnect.
86#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
87pub struct OpenOrder {
88 /// Exchange-assigned order id (matches the id returned by
89 /// [`ExchangeClient::place_order`]).
90 pub order_id: String,
91 /// Client-supplied id echoed back by the exchange, if any.
92 pub client_id: Option<String>,
93 /// Symbol the order is for.
94 pub symbol: Symbol,
95 /// Side of the resting order.
96 pub side: Side,
97 /// Order kind (limit / post-only / …). Market orders never rest, so a
98 /// well-behaved adapter won't report them here.
99 pub kind: OrderKind,
100 /// Limit price, if the order kind carries one.
101 pub limit_price: Option<Price>,
102 /// Original order quantity.
103 pub size: Volume,
104 /// Quantity filled so far (`0` for an untouched resting order).
105 pub filled: Volume,
106 /// Current status as reported by the exchange.
107 pub status: OrderStatus,
108 /// When the order was created at the exchange, if known. Used by the
109 /// tracker's TTL logic; `None` falls back to the time the bot first
110 /// observed the order.
111 pub created_at: Option<DateTime<Utc>>,
112}
113
114/// What the bot framework needs from an exchange to trade.
115///
116/// This trait is intentionally narrow — the full surface of a real exchange
117/// client (ws token management, stop orders, funding history, account tiers)
118/// belongs in the concrete adapter crate, not here. The framework only
119/// needs to: place orders, close positions, read balance and position state.
120///
121/// # Async + object-safe
122///
123/// `async_trait` is used so `Arc<dyn ExchangeClient>` works — downstream
124/// code can swap concrete exchanges at runtime without generics propagating
125/// through the whole system.
126///
127/// # Example
128///
129/// A stub adapter useful for examples and tests. Real adapters connect
130/// to a network and report actual state.
131///
132/// ```
133/// use async_trait::async_trait;
134/// use rustrade_core::{Capability, ExchangeClient, Order, Position, Result, Symbol};
135///
136/// struct StubExchange;
137///
138/// #[async_trait]
139/// impl ExchangeClient for StubExchange {
140/// fn name(&self) -> &str { "stub" }
141/// async fn place_order(&self, _order: &Order) -> Result<String> {
142/// Ok("order-1".into())
143/// }
144/// async fn cancel_all(&self, _symbol: &Symbol) -> Result<usize> { Ok(0) }
145/// async fn close_position(&self, _symbol: &Symbol, _p: &Position) -> Result<String> {
146/// Ok("close-1".into())
147/// }
148/// async fn get_position(&self, _symbol: &Symbol) -> Result<Position> {
149/// Ok(Position::FLAT)
150/// }
151/// async fn get_balance(&self, _currency: &str) -> Result<f64> { Ok(0.0) }
152/// fn supports(&self, c: Capability) -> bool {
153/// matches!(c, Capability::ReduceOnly)
154/// }
155/// }
156/// ```
157#[async_trait]
158pub trait ExchangeClient: Send + Sync + 'static {
159 /// Short, lowercase exchange identifier — e.g. `"kucoin"`.
160 fn name(&self) -> &str;
161
162 /// Place an order. Returns the exchange-assigned order id.
163 async fn place_order(&self, order: &Order) -> Result<String>;
164
165 /// Cancel all open orders for a symbol. Returns the count cancelled.
166 async fn cancel_all(&self, symbol: &Symbol) -> Result<usize>;
167
168 /// Close the given position with a market order. Returns the exchange
169 /// order id of the close.
170 async fn close_position(&self, symbol: &Symbol, position: &Position) -> Result<String>;
171
172 /// Fetch the current position for a symbol (or `Position::FLAT` if flat).
173 async fn get_position(&self, symbol: &Symbol) -> Result<Position>;
174
175 /// Fetch the current balance in the given currency.
176 async fn get_balance(&self, currency: &str) -> Result<f64>;
177
178 /// Does this adapter support the given optional capability?
179 ///
180 /// The default returns `false` for every variant — adapters opt in by
181 /// overriding. This is intentionally conservative: a new adapter that
182 /// forgets to override won't quietly accept orders it can't execute.
183 fn supports(&self, _capability: Capability) -> bool {
184 false
185 }
186
187 /// Base-asset units per one contract for the given symbol.
188 ///
189 /// For spot exchanges this is `1.0` for every symbol — one unit traded
190 /// equals one unit of the base asset. For futures, it's the contract
191 /// multiplier (e.g. `0.001` for KuCoin XBTUSDTM where one contract is
192 /// 0.001 BTC). The risk layer's
193 /// [`PositionSizer`](https://docs.rs/rustrade-risk/latest/rustrade_risk/sizing/struct.PositionSizer.html)
194 /// uses this to convert margin × leverage into a contract count.
195 ///
196 /// The default returns `1.0` — appropriate for spot adapters. Futures
197 /// adapters override.
198 fn contract_value(&self, _symbol: &Symbol) -> f64 {
199 1.0
200 }
201
202 /// Full instrument metadata for `symbol`: contract size, price tick,
203 /// quantity lot, minimum notional, and [`AssetClass`](crate::AssetClass).
204 ///
205 /// The framework uses this to round orders to the venue's increments,
206 /// enforce a minimum order notional, and apply class-aware risk rules.
207 /// The default derives an [`InstrumentSpec`] from [`Self::contract_value`]
208 /// with no other constraints, so adapters that only override
209 /// `contract_value` keep working unchanged; adapters with real venue
210 /// metadata (tick/lot/min-notional) override this and should make
211 /// `contract_value` agree with `instrument_spec(symbol).contract_value`.
212 fn instrument_spec(&self, symbol: &Symbol) -> InstrumentSpec {
213 InstrumentSpec::from_contract_value(self.contract_value(symbol))
214 }
215
216 /// List the currently-resting (non-terminal) orders for a symbol.
217 ///
218 /// Used by the framework's order-tracking layer to age out stale limit
219 /// orders and to reconcile after a reconnect. The default returns an
220 /// empty list — an adapter that doesn't advertise
221 /// [`Capability::OrderTracking`] is assumed to have no resting orders
222 /// the framework should manage (e.g. a market-only or fire-and-forget
223 /// adapter). Adapters that support resting orders **must** override this
224 /// and advertise the capability.
225 async fn get_open_orders(&self, _symbol: &Symbol) -> Result<Vec<OpenOrder>> {
226 Ok(Vec::new())
227 }
228
229 /// Cancel a single resting order by its exchange-assigned id.
230 ///
231 /// Returns `Ok(true)` if the order was cancelled, `Ok(false)` if it was
232 /// already gone (filled or cancelled) — a benign no-op the caller can
233 /// ignore. The default errors, so the tracking layer never believes it
234 /// cancelled an order against an adapter that can't actually do so;
235 /// adapters advertising [`Capability::OrderTracking`] override it.
236 async fn cancel_order(&self, _symbol: &Symbol, _order_id: &str) -> Result<bool> {
237 Err(crate::Error::exchange(
238 "cancel_order not supported by this adapter (no Capability::OrderTracking)",
239 ))
240 }
241}
242
243/// A source of live market data (WebSocket feed, backtest replay, simulator).
244///
245/// Implementors push events into the bot via the
246/// [`MarketDataBus`](crate::bus::MarketDataBus) that the supervisor creates.
247/// `MarketSource` is intended to be wrapped by a `TradingService` in
248/// `rustrade-supervisor` so it inherits lifecycle management and
249/// auto-restart; this trait just documents the contract on the data side.
250///
251/// # Example
252///
253/// A loopback source that publishes a single tick and exits. Production
254/// sources hold the `MarketDataBus` sender they were constructed with
255/// and publish to it from `run`.
256///
257/// ```
258/// use async_trait::async_trait;
259/// use rustrade_core::{MarketSource, Result};
260///
261/// struct OneShotSource {
262/// name: String,
263/// }
264///
265/// #[async_trait]
266/// impl MarketSource for OneShotSource {
267/// fn name(&self) -> &str { &self.name }
268/// async fn run(&self) -> Result<()> {
269/// // In a real impl: connect to feed, loop publishing events to
270/// // the bus, return Ok(()) on clean shutdown.
271/// Ok(())
272/// }
273/// fn is_live(&self) -> bool { false }
274/// }
275/// ```
276///
277/// # Cancellation contract
278///
279/// `run` does **not** take a `CancellationToken` directly. Cancellation is
280/// expected to flow through the wrapping `TradingService` — when the
281/// supervisor cancels that service's token, it drops the
282/// `MarketSource::run` future at its next `.await`.
283///
284/// Implementors must therefore be **drop-safe**: any open resources
285/// (WebSocket connections, HTTP sessions, file handles) must release
286/// cleanly when their containing future is dropped. In practice this
287/// means:
288///
289/// - Use `tokio::select!` against external events only inside your own
290/// loop, not against an externally-owned cancel signal here.
291/// - Don't hold a `MutexGuard` across an `.await` that could be dropped
292/// mid-flight — dropping a guard is fine, but holding one while the
293/// future is destructured can deadlock the lock.
294/// - If you need explicit teardown, perform it in a `Drop` impl on the
295/// implementing type rather than at the end of `run`.
296#[async_trait]
297pub trait MarketSource: Send + Sync + 'static {
298 /// Short identifier for logging — typically the exchange name.
299 fn name(&self) -> &str;
300
301 /// Begin streaming events. Should run until the feed terminates or
302 /// the caller drops the returned future (see the cancellation contract
303 /// in the trait docs).
304 async fn run(&self) -> Result<()>;
305
306 /// Is the feed currently receiving data?
307 fn is_live(&self) -> bool;
308}
309
310/// Received fill events from the exchange's private feed.
311///
312/// Adapters implement this to route fills into the bot. Most exchanges push
313/// both order updates and fill events; this trait abstracts the "fill" part.
314///
315/// # Example
316///
317/// An in-memory fill source backed by a [`tokio::sync::mpsc`] channel —
318/// useful for tests and replay drivers.
319///
320/// ```
321/// use async_trait::async_trait;
322/// use rustrade_core::{Fill, FillSource};
323/// use tokio::sync::mpsc;
324/// use tokio::sync::Mutex;
325///
326/// struct ChannelFills {
327/// rx: Mutex<mpsc::UnboundedReceiver<Fill>>,
328/// }
329///
330/// #[async_trait]
331/// impl FillSource for ChannelFills {
332/// async fn next_fill(&self) -> Option<Fill> {
333/// self.rx.lock().await.recv().await
334/// }
335/// }
336/// ```
337#[async_trait]
338pub trait FillSource: Send + Sync + 'static {
339 /// Await the next fill. Returns `None` when the stream ends.
340 async fn next_fill(&self) -> Option<Fill>;
341}
342
343/// Received order-book / market-data events from the exchange's public feed.
344///
345/// # Example
346///
347/// A simple channel-backed event source — typical for tests that push
348/// scripted ticks/candles into the bot.
349///
350/// ```
351/// use async_trait::async_trait;
352/// use rustrade_core::{EventSource, MarketDataEvent};
353/// use tokio::sync::mpsc;
354/// use tokio::sync::Mutex;
355///
356/// struct ChannelEvents {
357/// rx: Mutex<mpsc::UnboundedReceiver<MarketDataEvent>>,
358/// }
359///
360/// #[async_trait]
361/// impl EventSource for ChannelEvents {
362/// async fn next_event(&self) -> Option<MarketDataEvent> {
363/// self.rx.lock().await.recv().await
364/// }
365/// }
366/// ```
367#[async_trait]
368pub trait EventSource: Send + Sync + 'static {
369 /// Await the next event. Returns `None` when the stream ends.
370 async fn next_event(&self) -> Option<MarketDataEvent>;
371}
372
373/// Periodic candle source — separate from [`MarketSource`] because
374/// candle polling has a fundamentally different shape (pull, paced)
375/// than streaming events (push, unbounded).
376///
377/// Spot-only adapters don't need to implement this; the framework will
378/// only spawn a candle poller when one is wired via
379/// `Bot::with_candle_poller`. Futures adapters with native candle
380/// endpoints (KuCoin, Binance, Bybit, …) implement it directly.
381///
382/// # Example
383///
384/// A fixed-series source useful for backtests and replays. The
385/// framework's poller will dedupe by `Candle::time`, so repeated polls
386/// returning the same head are safe.
387///
388/// ```
389/// use std::time::Duration;
390/// use async_trait::async_trait;
391/// use rustrade_core::{Candle, CandleSource, Result, Symbol};
392///
393/// struct FixedCandles {
394/// candles: Vec<Candle>,
395/// }
396///
397/// #[async_trait]
398/// impl CandleSource for FixedCandles {
399/// fn name(&self) -> &str { "fixed" }
400/// async fn poll(
401/// &self,
402/// _symbol: &Symbol,
403/// _interval: Duration,
404/// limit: usize,
405/// ) -> Result<Vec<Candle>> {
406/// Ok(self.candles.iter().rev().take(limit).rev().copied().collect())
407/// }
408/// }
409/// ```
410#[async_trait]
411pub trait CandleSource: Send + Sync + 'static {
412 /// Short identifier for logging — typically the exchange name.
413 fn name(&self) -> &str;
414
415 /// Fetch up to `limit` of the most recent completed candles for
416 /// `symbol` at the given interval. Implementors return them in
417 /// chronological order (oldest first). If the exchange's native
418 /// endpoint returns newest-first, sort before returning.
419 async fn poll(&self, symbol: &Symbol, interval: Duration, limit: usize) -> Result<Vec<Candle>>;
420}
421
422#[cfg(test)]
423mod tests {
424 use super::*;
425 use crate::types::{Price, Volume};
426
427 #[test]
428 fn order_status_is_live_only_for_non_terminal() {
429 assert!(OrderStatus::Pending.is_live());
430 assert!(OrderStatus::Open.is_live());
431 assert!(OrderStatus::PartiallyFilled.is_live());
432 assert!(!OrderStatus::Filled.is_live());
433 assert!(!OrderStatus::Cancelled.is_live());
434 assert!(!OrderStatus::Rejected.is_live());
435 }
436
437 #[test]
438 fn open_order_serde_roundtrip() {
439 let o = OpenOrder {
440 order_id: "ex-1".into(),
441 client_id: Some("cli-1".into()),
442 symbol: Symbol::from("BTCUSDT"),
443 side: Side::Buy,
444 kind: OrderKind::Limit,
445 limit_price: Some(Price(100.0)),
446 size: Volume(2.0),
447 filled: Volume(0.5),
448 status: OrderStatus::PartiallyFilled,
449 created_at: None,
450 };
451 let json = serde_json::to_string(&o).unwrap();
452 let back: OpenOrder = serde_json::from_str(&json).unwrap();
453 assert_eq!(back, o);
454 }
455
456 #[test]
457 fn order_status_serde_is_snake_case() {
458 let json = serde_json::to_string(&OrderStatus::PartiallyFilled).unwrap();
459 assert_eq!(json, "\"partially_filled\"");
460 }
461
462 // A default adapter (no OrderTracking) reports no open orders and
463 // refuses to cancel — proving the conservative defaults.
464 struct DefaultAdapter;
465 #[async_trait]
466 impl ExchangeClient for DefaultAdapter {
467 fn name(&self) -> &str {
468 "default"
469 }
470 async fn place_order(&self, _o: &Order) -> Result<String> {
471 Ok("id".into())
472 }
473 async fn cancel_all(&self, _s: &Symbol) -> Result<usize> {
474 Ok(0)
475 }
476 async fn close_position(&self, _s: &Symbol, _p: &Position) -> Result<String> {
477 Ok("c".into())
478 }
479 async fn get_position(&self, _s: &Symbol) -> Result<Position> {
480 Ok(Position::FLAT)
481 }
482 async fn get_balance(&self, _c: &str) -> Result<f64> {
483 Ok(0.0)
484 }
485 }
486
487 #[tokio::test]
488 async fn default_get_open_orders_is_empty() {
489 let a = DefaultAdapter;
490 assert!(
491 a.get_open_orders(&Symbol::from("X"))
492 .await
493 .unwrap()
494 .is_empty()
495 );
496 assert!(!a.supports(Capability::OrderTracking));
497 }
498
499 #[tokio::test]
500 async fn default_cancel_order_errors() {
501 let a = DefaultAdapter;
502 assert!(a.cancel_order(&Symbol::from("X"), "id").await.is_err());
503 }
504}