tesser_test_utils/
state.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{anyhow, Result};
6use chrono::{DateTime, Utc};
7use rust_decimal::Decimal;
8use tokio::sync::{mpsc, Mutex};
9
10use tesser_core::{
11    AccountBalance, AssetId, Candle, ExchangeId, Fill, Order, OrderId, OrderStatus, Position,
12    Price, Quantity, Side, Symbol, Tick,
13};
14
15use crate::scenario::ScenarioManager;
16
17const EXEC_HISTORY_LIMIT: usize = 1024;
18const DEFAULT_QUOTE_CURRENCY: &str = "USDT";
19
20pub type ApiKey = String;
21
22/// Message pushed onto the private WebSocket stream.
23pub type PrivateMessage = serde_json::Value;
24
25/// Shared state for the in-memory mock exchange.
26#[derive(Clone)]
27pub struct MockExchangeState {
28    inner: Arc<Mutex<Inner>>,
29    scenarios: ScenarioManager,
30    auto_fill: Option<AutoFillConfig>,
31}
32
33#[allow(dead_code)]
34pub(crate) struct Inner {
35    pub accounts: HashMap<ApiKey, AccountState>,
36    pub market_data: MarketDataQueues,
37    pub private_ws_sender: Option<mpsc::UnboundedSender<PrivateMessage>>,
38    pub pending_private_messages: VecDeque<PrivateMessage>,
39    pub order_seq: u64,
40    pub exchange: ExchangeId,
41}
42
43#[derive(Clone)]
44pub struct AccountState {
45    pub api_secret: String,
46    pub balances: HashMap<AssetId, AccountBalance>,
47    pub positions: HashMap<Symbol, Position>,
48    pub executions: VecDeque<Fill>,
49    pub orders: HashMap<OrderId, Order>,
50}
51
52impl AccountState {
53    fn from_config(config: AccountConfig) -> Self {
54        Self {
55            api_secret: config.api_secret,
56            balances: config
57                .balances
58                .into_iter()
59                .map(|balance| (balance.asset, balance))
60                .collect(),
61            positions: config
62                .positions
63                .into_iter()
64                .map(|position| (position.symbol, position))
65                .collect(),
66            executions: VecDeque::new(),
67            orders: HashMap::new(),
68        }
69    }
70
71    pub fn insert_order(&mut self, order: Order) {
72        self.orders.insert(order.id.clone(), order);
73    }
74
75    pub fn update_order<F>(&mut self, order_id: &OrderId, mut update: F) -> Result<Order>
76    where
77        F: FnMut(&mut Order) -> Result<()>,
78    {
79        let order = self
80            .orders
81            .get_mut(order_id)
82            .ok_or_else(|| anyhow!("unknown order id {order_id}"))?;
83        update(order)?;
84        Ok(order.clone())
85    }
86
87    pub fn apply_fill(&mut self, fill: &Fill) {
88        self.executions.push_back(fill.clone());
89        if self.executions.len() > EXEC_HISTORY_LIMIT {
90            self.executions.pop_front();
91        }
92        self.update_positions(fill);
93        self.update_balances(fill);
94    }
95
96    fn update_positions(&mut self, fill: &Fill) {
97        let entry = self
98            .positions
99            .entry(fill.symbol)
100            .or_insert_with(|| Position {
101                symbol: fill.symbol,
102                side: None,
103                quantity: Decimal::ZERO,
104                entry_price: None,
105                unrealized_pnl: Decimal::ZERO,
106                updated_at: Utc::now(),
107            });
108        entry.updated_at = Utc::now();
109
110        match (entry.side, fill.side) {
111            (None, Side::Buy) => {
112                entry.side = Some(Side::Buy);
113                entry.quantity = fill.fill_quantity;
114                entry.entry_price = Some(fill.fill_price);
115            }
116            (None, Side::Sell) => {
117                entry.side = Some(Side::Sell);
118                entry.quantity = fill.fill_quantity;
119                entry.entry_price = Some(fill.fill_price);
120            }
121            (Some(Side::Buy), Side::Buy) => {
122                let total_qty = entry.quantity + fill.fill_quantity;
123                let existing_value = entry.entry_price.unwrap_or(Decimal::ZERO) * entry.quantity;
124                let fill_value = fill.fill_price * fill.fill_quantity;
125                entry.quantity = total_qty;
126                entry.entry_price = Some((existing_value + fill_value) / total_qty);
127            }
128            (Some(Side::Sell), Side::Sell) => {
129                let total_qty = entry.quantity + fill.fill_quantity;
130                let existing_value = entry.entry_price.unwrap_or(Decimal::ZERO) * entry.quantity;
131                let fill_value = fill.fill_price * fill.fill_quantity;
132                entry.quantity = total_qty;
133                entry.entry_price = Some((existing_value + fill_value) / total_qty);
134            }
135            (Some(Side::Buy), Side::Sell) => {
136                if fill.fill_quantity < entry.quantity {
137                    entry.quantity -= fill.fill_quantity;
138                } else if fill.fill_quantity == entry.quantity {
139                    entry.quantity = Decimal::ZERO;
140                    entry.side = None;
141                    entry.entry_price = None;
142                } else {
143                    entry.side = Some(Side::Sell);
144                    entry.quantity = fill.fill_quantity - entry.quantity;
145                    entry.entry_price = Some(fill.fill_price);
146                }
147            }
148            (Some(Side::Sell), Side::Buy) => {
149                if fill.fill_quantity < entry.quantity {
150                    entry.quantity -= fill.fill_quantity;
151                } else if fill.fill_quantity == entry.quantity {
152                    entry.quantity = Decimal::ZERO;
153                    entry.side = None;
154                    entry.entry_price = None;
155                } else {
156                    entry.side = Some(Side::Buy);
157                    entry.quantity = fill.fill_quantity - entry.quantity;
158                    entry.entry_price = Some(fill.fill_price);
159                }
160            }
161        }
162    }
163
164    fn update_balances(&mut self, fill: &Fill) {
165        let quote_asset = AssetId::from_code(fill.symbol.exchange, DEFAULT_QUOTE_CURRENCY);
166        let quote = self.balances.entry(quote_asset).or_insert(AccountBalance {
167            exchange: quote_asset.exchange,
168            asset: quote_asset,
169            total: Decimal::ZERO,
170            available: Decimal::ZERO,
171            updated_at: Utc::now(),
172        });
173        let notional = fill.fill_price * fill.fill_quantity;
174        match fill.side {
175            Side::Buy => {
176                quote.total -= notional;
177                quote.available = quote.total;
178            }
179            Side::Sell => {
180                quote.total += notional;
181                quote.available = quote.total;
182            }
183        }
184        quote.updated_at = Utc::now();
185    }
186
187    pub fn order_by_link_id(&self, client_id: &str) -> Option<OrderId> {
188        self.orders
189            .values()
190            .find(|order| order.request.client_order_id.as_deref() == Some(client_id))
191            .map(|order| order.id.clone())
192    }
193
194    pub fn order(&self, order_id: &OrderId) -> Option<Order> {
195        self.orders.get(order_id).cloned()
196    }
197
198    pub fn balances_snapshot(&self) -> Vec<AccountBalance> {
199        self.balances.values().cloned().collect()
200    }
201
202    pub fn positions_snapshot(&self) -> Vec<Position> {
203        self.positions.values().cloned().collect()
204    }
205
206    pub fn open_orders_snapshot(&self, symbol: Option<Symbol>) -> Vec<Order> {
207        self.orders
208            .values()
209            .filter(|order| {
210                let active = !matches!(
211                    order.status,
212                    OrderStatus::Filled | OrderStatus::Canceled | OrderStatus::Rejected
213                );
214                let symbol_matches = symbol
215                    .map(|value| order.request.symbol == value)
216                    .unwrap_or(true);
217                active && symbol_matches
218            })
219            .cloned()
220            .collect()
221    }
222
223    pub fn executions_in_range(
224        &self,
225        start: DateTime<Utc>,
226        end: Option<DateTime<Utc>>,
227    ) -> Vec<Fill> {
228        self.executions
229            .iter()
230            .filter(|fill| {
231                fill.timestamp >= start && end.map(|limit| fill.timestamp <= limit).unwrap_or(true)
232            })
233            .cloned()
234            .collect()
235    }
236}
237
238#[derive(Default)]
239pub struct MarketDataQueues {
240    pub candles: VecDeque<Candle>,
241    pub ticks: VecDeque<Tick>,
242}
243
244impl MarketDataQueues {
245    pub fn push_candle(&mut self, candle: Candle) {
246        self.candles.push_back(candle);
247    }
248
249    pub fn push_tick(&mut self, tick: Tick) {
250        self.ticks.push_back(tick);
251    }
252
253    pub fn next_candle(&mut self) -> Option<Candle> {
254        self.candles.pop_front()
255    }
256
257    pub fn next_tick(&mut self) -> Option<Tick> {
258        self.ticks.pop_front()
259    }
260}
261
262/// Declarative account bootstrap configuration.
263#[derive(Clone)]
264pub struct AccountConfig {
265    pub api_key: String,
266    pub api_secret: String,
267    pub balances: Vec<AccountBalance>,
268    pub positions: Vec<Position>,
269}
270
271impl AccountConfig {
272    pub fn new(api_key: impl Into<String>, api_secret: impl Into<String>) -> Self {
273        Self {
274            api_key: api_key.into(),
275            api_secret: api_secret.into(),
276            balances: Vec::new(),
277            positions: Vec::new(),
278        }
279    }
280
281    pub fn with_balance(mut self, balance: AccountBalance) -> Self {
282        self.balances.push(balance);
283        self
284    }
285
286    pub fn with_position(mut self, position: Position) -> Self {
287        self.positions.push(position);
288        self
289    }
290}
291
292/// Configuration object passed into [`MockExchangeState::new`].
293#[derive(Clone)]
294pub struct MockExchangeConfig {
295    pub accounts: Vec<AccountConfig>,
296    pub candles: Vec<Candle>,
297    pub ticks: Vec<Tick>,
298    pub scenarios: ScenarioManager,
299    pub exchange: ExchangeId,
300    pub auto_fill: Option<AutoFillConfig>,
301}
302
303impl MockExchangeConfig {
304    pub fn new() -> Self {
305        Self::default()
306    }
307
308    pub fn with_account(mut self, account: AccountConfig) -> Self {
309        self.accounts.push(account);
310        self
311    }
312
313    pub fn with_candles(mut self, candles: impl IntoIterator<Item = Candle>) -> Self {
314        self.candles.extend(candles);
315        self
316    }
317
318    pub fn with_ticks(mut self, ticks: impl IntoIterator<Item = Tick>) -> Self {
319        self.ticks.extend(ticks);
320        self
321    }
322
323    pub fn with_scenarios(mut self, scenarios: ScenarioManager) -> Self {
324        self.scenarios = scenarios;
325        self
326    }
327
328    pub fn with_auto_fill(mut self, config: AutoFillConfig) -> Self {
329        self.auto_fill = Some(config);
330        self
331    }
332
333    pub fn with_exchange(mut self, exchange: ExchangeId) -> Self {
334        self.exchange = exchange;
335        self
336    }
337}
338
339impl Default for MockExchangeConfig {
340    fn default() -> Self {
341        Self {
342            accounts: Vec::new(),
343            candles: Vec::new(),
344            ticks: Vec::new(),
345            scenarios: ScenarioManager::new(),
346            exchange: ExchangeId::UNSPECIFIED,
347            auto_fill: None,
348        }
349    }
350}
351
352#[derive(Clone)]
353pub struct AutoFillConfig {
354    pub delay: Duration,
355    pub price: Option<Decimal>,
356}
357
358impl MockExchangeState {
359    pub fn new(config: MockExchangeConfig) -> Self {
360        let market_data = MarketDataQueues {
361            candles: config.candles.into_iter().collect(),
362            ticks: config.ticks.into_iter().collect(),
363        };
364        let accounts = config
365            .accounts
366            .into_iter()
367            .map(|account| {
368                let api_key = account.api_key.clone();
369                (api_key, AccountState::from_config(account))
370            })
371            .collect();
372        let inner = Inner {
373            accounts,
374            market_data,
375            private_ws_sender: None,
376            pending_private_messages: VecDeque::new(),
377            order_seq: 1,
378            exchange: config.exchange,
379        };
380        Self {
381            inner: Arc::new(Mutex::new(inner)),
382            scenarios: config.scenarios,
383            auto_fill: config.auto_fill,
384        }
385    }
386
387    pub fn scenarios(&self) -> ScenarioManager {
388        self.scenarios.clone()
389    }
390
391    pub fn auto_fill_config(&self) -> Option<AutoFillConfig> {
392        self.auto_fill.clone()
393    }
394
395    pub async fn exchange(&self) -> ExchangeId {
396        self.inner.lock().await.exchange
397    }
398
399    #[allow(dead_code)]
400    pub(crate) fn inner(&self) -> &Arc<Mutex<Inner>> {
401        &self.inner
402    }
403
404    pub async fn set_private_ws_sender(&self, sender: mpsc::UnboundedSender<PrivateMessage>) {
405        let mut guard = self.inner.lock().await;
406        guard.private_ws_sender = Some(sender.clone());
407        while let Some(payload) = guard.pending_private_messages.pop_front() {
408            if sender.send(payload).is_err() {
409                break;
410            }
411        }
412    }
413
414    pub async fn clear_private_ws_sender(&self) {
415        let mut guard = self.inner.lock().await;
416        guard.private_ws_sender = None;
417    }
418
419    pub async fn emit_private_message(&self, payload: PrivateMessage) -> Result<()> {
420        let mut guard = self.inner.lock().await;
421        if let Some(tx) = guard.private_ws_sender.clone() {
422            tx.send(payload)
423                .map_err(|err| anyhow!("failed to deliver private stream message: {err}"))
424        } else {
425            guard.pending_private_messages.push_back(payload);
426            Ok(())
427        }
428    }
429
430    pub async fn account_secret(&self, api_key: &str) -> Option<String> {
431        let guard = self.inner.lock().await;
432        guard
433            .accounts
434            .get(api_key)
435            .map(|account| account.api_secret.clone())
436    }
437
438    pub async fn with_account_mut<F, T>(&self, api_key: &str, f: F) -> Result<T>
439    where
440        F: FnOnce(&mut AccountState) -> Result<T>,
441    {
442        let mut guard = self.inner.lock().await;
443        let account = guard
444            .accounts
445            .get_mut(api_key)
446            .ok_or_else(|| anyhow!("unknown API key {api_key}"))?;
447        f(account)
448    }
449
450    pub async fn with_account<F, T>(&self, api_key: &str, f: F) -> Result<T>
451    where
452        F: FnOnce(&AccountState) -> Result<T>,
453    {
454        let guard = self.inner.lock().await;
455        let account = guard
456            .accounts
457            .get(api_key)
458            .ok_or_else(|| anyhow!("unknown API key {api_key}"))?;
459        f(account)
460    }
461
462    pub async fn next_order_id(&self) -> OrderId {
463        let mut guard = self.inner.lock().await;
464        let id = guard.order_seq;
465        guard.order_seq += 1;
466        format!("MOCK-ORDER-{id}")
467    }
468
469    pub async fn register_order(&self, api_key: &str, order: Order) -> Result<Order> {
470        self.with_account_mut(api_key, |account| {
471            account.insert_order(order.clone());
472            Ok(order)
473        })
474        .await
475    }
476
477    pub async fn get_order(&self, api_key: &str, order_id: &OrderId) -> Result<Order> {
478        self.with_account(api_key, |account| {
479            account
480                .order(order_id)
481                .ok_or_else(|| anyhow!("unknown order id {order_id}"))
482        })
483        .await
484    }
485
486    pub async fn find_order_id(
487        &self,
488        api_key: &str,
489        order_id: Option<&str>,
490        order_link_id: Option<&str>,
491    ) -> Result<OrderId> {
492        if let Some(id) = order_id {
493            return Ok(id.to_string());
494        }
495        if let Some(link) = order_link_id {
496            return self
497                .with_account(api_key, |account| {
498                    account
499                        .order_by_link_id(link)
500                        .ok_or_else(|| anyhow!("unknown order link id {link}"))
501                })
502                .await;
503        }
504        Err(anyhow!(
505            "request must provide either orderId or orderLinkId"
506        ))
507    }
508
509    pub async fn cancel_order(&self, api_key: &str, order_id: &OrderId) -> Result<Order> {
510        self.with_account_mut(api_key, |account| {
511            account.update_order(order_id, |order| {
512                order.status = OrderStatus::Canceled;
513                order.updated_at = Utc::now();
514                Ok(())
515            })
516        })
517        .await
518    }
519
520    pub async fn fill_order(
521        &self,
522        api_key: &str,
523        order_id: &OrderId,
524        quantity: Quantity,
525        price: Price,
526    ) -> Result<(Order, Fill)> {
527        let mut guard = self.inner.lock().await;
528        let account = guard
529            .accounts
530            .get_mut(api_key)
531            .ok_or_else(|| anyhow!("unknown API key {api_key}"))?;
532        let (fill, order_snapshot) = {
533            let order = account
534                .orders
535                .get_mut(order_id)
536                .ok_or_else(|| anyhow!("unknown order id {order_id}"))?;
537            let remaining = (order.request.quantity - order.filled_quantity).max(Decimal::ZERO);
538            if remaining.is_zero() {
539                return Err(anyhow!("order already fully filled"));
540            }
541            let exec_quantity = quantity.min(remaining);
542            if exec_quantity.is_zero() {
543                return Err(anyhow!("fill quantity resolved to zero"));
544            }
545            let filled_before = order.filled_quantity;
546            let new_filled = filled_before + exec_quantity;
547            let avg_price = if filled_before.is_zero() {
548                price
549            } else {
550                let previous_total = order.avg_fill_price.unwrap_or(price) * filled_before;
551                (previous_total + price * exec_quantity) / new_filled
552            };
553            order.filled_quantity = new_filled;
554            order.avg_fill_price = Some(avg_price);
555            order.status = if new_filled >= order.request.quantity {
556                OrderStatus::Filled
557            } else {
558                OrderStatus::PartiallyFilled
559            };
560            order.updated_at = Utc::now();
561            let fill = Fill {
562                order_id: order.id.clone(),
563                symbol: order.request.symbol,
564                side: order.request.side,
565                fill_price: price,
566                fill_quantity: exec_quantity,
567                fee: None,
568                fee_asset: None,
569                timestamp: Utc::now(),
570            };
571            let order_snapshot = order.clone();
572            (fill, order_snapshot)
573        };
574        account.apply_fill(&fill);
575        Ok((order_snapshot, fill))
576    }
577
578    pub async fn account_balances(&self, api_key: &str) -> Result<Vec<AccountBalance>> {
579        self.with_account(api_key, |account| Ok(account.balances_snapshot()))
580            .await
581    }
582
583    pub async fn account_positions(&self, api_key: &str) -> Result<Vec<Position>> {
584        self.with_account(api_key, |account| Ok(account.positions_snapshot()))
585            .await
586    }
587
588    pub async fn open_orders(&self, api_key: &str, symbol: Option<&str>) -> Result<Vec<Order>> {
589        let exchange = self.inner.lock().await.exchange;
590        let symbol = symbol.map(|code| Symbol::from_code(exchange, code));
591        self.with_account(api_key, |account| Ok(account.open_orders_snapshot(symbol)))
592            .await
593    }
594
595    pub async fn executions_between(
596        &self,
597        api_key: &str,
598        start: DateTime<Utc>,
599        end: Option<DateTime<Utc>>,
600    ) -> Result<Vec<Fill>> {
601        self.with_account(api_key, |account| {
602            Ok(account.executions_in_range(start, end))
603        })
604        .await
605    }
606
607    pub async fn next_candle(&self) -> Option<Candle> {
608        let mut guard = self.inner.lock().await;
609        guard.market_data.next_candle()
610    }
611
612    pub async fn next_tick(&self) -> Option<Tick> {
613        let mut guard = self.inner.lock().await;
614        guard.market_data.next_tick()
615    }
616}