tesser_test_utils/
state.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use anyhow::{anyhow, Result};
5use chrono::{DateTime, Utc};
6use rust_decimal::Decimal;
7use tokio::sync::{mpsc, Mutex};
8
9use tesser_core::{
10    AccountBalance, Candle, Fill, Order, OrderId, OrderStatus, Position, Price, Quantity, Side,
11    Symbol, Tick,
12};
13
14use crate::scenario::ScenarioManager;
15
16const EXEC_HISTORY_LIMIT: usize = 1024;
17const DEFAULT_QUOTE_CURRENCY: &str = "USDT";
18
19pub type ApiKey = String;
20
21/// Message pushed onto the private WebSocket stream.
22pub type PrivateMessage = serde_json::Value;
23
24/// Shared state for the in-memory mock exchange.
25#[derive(Clone)]
26pub struct MockExchangeState {
27    inner: Arc<Mutex<Inner>>,
28    scenarios: ScenarioManager,
29}
30
31#[allow(dead_code)]
32pub(crate) struct Inner {
33    pub accounts: HashMap<ApiKey, AccountState>,
34    pub market_data: MarketDataQueues,
35    pub private_ws_sender: Option<mpsc::UnboundedSender<PrivateMessage>>,
36    pub order_seq: u64,
37}
38
39#[derive(Clone)]
40pub struct AccountState {
41    pub api_secret: String,
42    pub balances: HashMap<String, AccountBalance>,
43    pub positions: HashMap<Symbol, Position>,
44    pub executions: VecDeque<Fill>,
45    pub orders: HashMap<OrderId, Order>,
46}
47
48impl AccountState {
49    fn from_config(config: AccountConfig) -> Self {
50        Self {
51            api_secret: config.api_secret,
52            balances: config
53                .balances
54                .into_iter()
55                .map(|balance| (balance.currency.clone(), balance))
56                .collect(),
57            positions: config
58                .positions
59                .into_iter()
60                .map(|position| (position.symbol.clone(), position))
61                .collect(),
62            executions: VecDeque::new(),
63            orders: HashMap::new(),
64        }
65    }
66
67    pub fn insert_order(&mut self, order: Order) {
68        self.orders.insert(order.id.clone(), order);
69    }
70
71    pub fn update_order<F>(&mut self, order_id: &OrderId, mut update: F) -> Result<Order>
72    where
73        F: FnMut(&mut Order) -> Result<()>,
74    {
75        let order = self
76            .orders
77            .get_mut(order_id)
78            .ok_or_else(|| anyhow!("unknown order id {order_id}"))?;
79        update(order)?;
80        Ok(order.clone())
81    }
82
83    pub fn apply_fill(&mut self, fill: &Fill) {
84        self.executions.push_back(fill.clone());
85        if self.executions.len() > EXEC_HISTORY_LIMIT {
86            self.executions.pop_front();
87        }
88        self.update_positions(fill);
89        self.update_balances(fill);
90    }
91
92    fn update_positions(&mut self, fill: &Fill) {
93        let entry = self
94            .positions
95            .entry(fill.symbol.clone())
96            .or_insert_with(|| Position {
97                symbol: fill.symbol.clone(),
98                side: None,
99                quantity: Decimal::ZERO,
100                entry_price: None,
101                unrealized_pnl: Decimal::ZERO,
102                updated_at: Utc::now(),
103            });
104        entry.updated_at = Utc::now();
105
106        match (entry.side, fill.side) {
107            (None, Side::Buy) => {
108                entry.side = Some(Side::Buy);
109                entry.quantity = fill.fill_quantity;
110                entry.entry_price = Some(fill.fill_price);
111            }
112            (None, Side::Sell) => {
113                entry.side = Some(Side::Sell);
114                entry.quantity = fill.fill_quantity;
115                entry.entry_price = Some(fill.fill_price);
116            }
117            (Some(Side::Buy), Side::Buy) => {
118                let total_qty = entry.quantity + fill.fill_quantity;
119                let existing_value = entry.entry_price.unwrap_or(Decimal::ZERO) * entry.quantity;
120                let fill_value = fill.fill_price * fill.fill_quantity;
121                entry.quantity = total_qty;
122                entry.entry_price = Some((existing_value + fill_value) / total_qty);
123            }
124            (Some(Side::Sell), Side::Sell) => {
125                let total_qty = entry.quantity + fill.fill_quantity;
126                let existing_value = entry.entry_price.unwrap_or(Decimal::ZERO) * entry.quantity;
127                let fill_value = fill.fill_price * fill.fill_quantity;
128                entry.quantity = total_qty;
129                entry.entry_price = Some((existing_value + fill_value) / total_qty);
130            }
131            (Some(Side::Buy), Side::Sell) => {
132                if fill.fill_quantity < entry.quantity {
133                    entry.quantity -= fill.fill_quantity;
134                } else if fill.fill_quantity == entry.quantity {
135                    entry.quantity = Decimal::ZERO;
136                    entry.side = None;
137                    entry.entry_price = None;
138                } else {
139                    entry.side = Some(Side::Sell);
140                    entry.quantity = fill.fill_quantity - entry.quantity;
141                    entry.entry_price = Some(fill.fill_price);
142                }
143            }
144            (Some(Side::Sell), Side::Buy) => {
145                if fill.fill_quantity < entry.quantity {
146                    entry.quantity -= fill.fill_quantity;
147                } else if fill.fill_quantity == entry.quantity {
148                    entry.quantity = Decimal::ZERO;
149                    entry.side = None;
150                    entry.entry_price = None;
151                } else {
152                    entry.side = Some(Side::Buy);
153                    entry.quantity = fill.fill_quantity - entry.quantity;
154                    entry.entry_price = Some(fill.fill_price);
155                }
156            }
157        }
158    }
159
160    fn update_balances(&mut self, fill: &Fill) {
161        let quote = self
162            .balances
163            .entry(DEFAULT_QUOTE_CURRENCY.to_string())
164            .or_insert(AccountBalance {
165                currency: DEFAULT_QUOTE_CURRENCY.into(),
166                total: Decimal::ZERO,
167                available: Decimal::ZERO,
168                updated_at: Utc::now(),
169            });
170        let notional = fill.fill_price * fill.fill_quantity;
171        match fill.side {
172            Side::Buy => {
173                quote.total -= notional;
174                quote.available = quote.total;
175            }
176            Side::Sell => {
177                quote.total += notional;
178                quote.available = quote.total;
179            }
180        }
181        quote.updated_at = Utc::now();
182    }
183
184    pub fn order_by_link_id(&self, client_id: &str) -> Option<OrderId> {
185        self.orders
186            .values()
187            .find(|order| order.request.client_order_id.as_deref() == Some(client_id))
188            .map(|order| order.id.clone())
189    }
190
191    pub fn order(&self, order_id: &OrderId) -> Option<Order> {
192        self.orders.get(order_id).cloned()
193    }
194
195    pub fn balances_snapshot(&self) -> Vec<AccountBalance> {
196        self.balances.values().cloned().collect()
197    }
198
199    pub fn positions_snapshot(&self) -> Vec<Position> {
200        self.positions.values().cloned().collect()
201    }
202
203    pub fn open_orders_snapshot(&self, symbol: Option<&str>) -> Vec<Order> {
204        self.orders
205            .values()
206            .filter(|order| {
207                let active = !matches!(
208                    order.status,
209                    OrderStatus::Filled | OrderStatus::Canceled | OrderStatus::Rejected
210                );
211                let symbol_matches = symbol
212                    .map(|value| order.request.symbol == value)
213                    .unwrap_or(true);
214                active && symbol_matches
215            })
216            .cloned()
217            .collect()
218    }
219
220    pub fn executions_in_range(
221        &self,
222        start: DateTime<Utc>,
223        end: Option<DateTime<Utc>>,
224    ) -> Vec<Fill> {
225        self.executions
226            .iter()
227            .filter(|fill| {
228                fill.timestamp >= start && end.map(|limit| fill.timestamp <= limit).unwrap_or(true)
229            })
230            .cloned()
231            .collect()
232    }
233}
234
235#[derive(Default)]
236pub struct MarketDataQueues {
237    pub candles: VecDeque<Candle>,
238    pub ticks: VecDeque<Tick>,
239}
240
241impl MarketDataQueues {
242    pub fn push_candle(&mut self, candle: Candle) {
243        self.candles.push_back(candle);
244    }
245
246    pub fn push_tick(&mut self, tick: Tick) {
247        self.ticks.push_back(tick);
248    }
249
250    pub fn next_candle(&mut self) -> Option<Candle> {
251        self.candles.pop_front()
252    }
253
254    pub fn next_tick(&mut self) -> Option<Tick> {
255        self.ticks.pop_front()
256    }
257}
258
259/// Declarative account bootstrap configuration.
260#[derive(Clone)]
261pub struct AccountConfig {
262    pub api_key: String,
263    pub api_secret: String,
264    pub balances: Vec<AccountBalance>,
265    pub positions: Vec<Position>,
266}
267
268impl AccountConfig {
269    pub fn new(api_key: impl Into<String>, api_secret: impl Into<String>) -> Self {
270        Self {
271            api_key: api_key.into(),
272            api_secret: api_secret.into(),
273            balances: Vec::new(),
274            positions: Vec::new(),
275        }
276    }
277
278    pub fn with_balance(mut self, balance: AccountBalance) -> Self {
279        self.balances.push(balance);
280        self
281    }
282
283    pub fn with_position(mut self, position: Position) -> Self {
284        self.positions.push(position);
285        self
286    }
287}
288
289/// Configuration object passed into [`MockExchangeState::new`].
290#[derive(Clone)]
291pub struct MockExchangeConfig {
292    pub accounts: Vec<AccountConfig>,
293    pub candles: Vec<Candle>,
294    pub ticks: Vec<Tick>,
295    pub scenarios: ScenarioManager,
296}
297
298impl MockExchangeConfig {
299    pub fn new() -> Self {
300        Self::default()
301    }
302
303    pub fn with_account(mut self, account: AccountConfig) -> Self {
304        self.accounts.push(account);
305        self
306    }
307
308    pub fn with_candles(mut self, candles: impl IntoIterator<Item = Candle>) -> Self {
309        self.candles.extend(candles);
310        self
311    }
312
313    pub fn with_ticks(mut self, ticks: impl IntoIterator<Item = Tick>) -> Self {
314        self.ticks.extend(ticks);
315        self
316    }
317
318    pub fn with_scenarios(mut self, scenarios: ScenarioManager) -> Self {
319        self.scenarios = scenarios;
320        self
321    }
322}
323
324impl Default for MockExchangeConfig {
325    fn default() -> Self {
326        Self {
327            accounts: Vec::new(),
328            candles: Vec::new(),
329            ticks: Vec::new(),
330            scenarios: ScenarioManager::new(),
331        }
332    }
333}
334
335impl MockExchangeState {
336    pub fn new(config: MockExchangeConfig) -> Self {
337        let market_data = MarketDataQueues {
338            candles: config.candles.into_iter().collect(),
339            ticks: config.ticks.into_iter().collect(),
340        };
341        let accounts = config
342            .accounts
343            .into_iter()
344            .map(|account| {
345                let api_key = account.api_key.clone();
346                (api_key, AccountState::from_config(account))
347            })
348            .collect();
349        let inner = Inner {
350            accounts,
351            market_data,
352            private_ws_sender: None,
353            order_seq: 1,
354        };
355        Self {
356            inner: Arc::new(Mutex::new(inner)),
357            scenarios: config.scenarios,
358        }
359    }
360
361    pub fn scenarios(&self) -> ScenarioManager {
362        self.scenarios.clone()
363    }
364
365    #[allow(dead_code)]
366    pub(crate) fn inner(&self) -> &Arc<Mutex<Inner>> {
367        &self.inner
368    }
369
370    pub async fn set_private_ws_sender(&self, sender: mpsc::UnboundedSender<PrivateMessage>) {
371        let mut guard = self.inner.lock().await;
372        guard.private_ws_sender = Some(sender);
373    }
374
375    pub async fn clear_private_ws_sender(&self) {
376        let mut guard = self.inner.lock().await;
377        guard.private_ws_sender = None;
378    }
379
380    pub async fn emit_private_message(&self, payload: PrivateMessage) -> Result<()> {
381        let sender = {
382            let guard = self.inner.lock().await;
383            guard.private_ws_sender.clone()
384        };
385        if let Some(tx) = sender {
386            tx.send(payload)
387                .map_err(|err| anyhow!("failed to deliver private stream message: {err}"))
388        } else {
389            Ok(())
390        }
391    }
392
393    pub async fn account_secret(&self, api_key: &str) -> Option<String> {
394        let guard = self.inner.lock().await;
395        guard
396            .accounts
397            .get(api_key)
398            .map(|account| account.api_secret.clone())
399    }
400
401    pub async fn with_account_mut<F, T>(&self, api_key: &str, f: F) -> Result<T>
402    where
403        F: FnOnce(&mut AccountState) -> Result<T>,
404    {
405        let mut guard = self.inner.lock().await;
406        let account = guard
407            .accounts
408            .get_mut(api_key)
409            .ok_or_else(|| anyhow!("unknown API key {api_key}"))?;
410        f(account)
411    }
412
413    pub async fn with_account<F, T>(&self, api_key: &str, f: F) -> Result<T>
414    where
415        F: FnOnce(&AccountState) -> Result<T>,
416    {
417        let guard = self.inner.lock().await;
418        let account = guard
419            .accounts
420            .get(api_key)
421            .ok_or_else(|| anyhow!("unknown API key {api_key}"))?;
422        f(account)
423    }
424
425    pub async fn next_order_id(&self) -> OrderId {
426        let mut guard = self.inner.lock().await;
427        let id = guard.order_seq;
428        guard.order_seq += 1;
429        format!("MOCK-ORDER-{id}")
430    }
431
432    pub async fn register_order(&self, api_key: &str, order: Order) -> Result<Order> {
433        self.with_account_mut(api_key, |account| {
434            account.insert_order(order.clone());
435            Ok(order)
436        })
437        .await
438    }
439
440    pub async fn get_order(&self, api_key: &str, order_id: &OrderId) -> Result<Order> {
441        self.with_account(api_key, |account| {
442            account
443                .order(order_id)
444                .ok_or_else(|| anyhow!("unknown order id {order_id}"))
445        })
446        .await
447    }
448
449    pub async fn find_order_id(
450        &self,
451        api_key: &str,
452        order_id: Option<&str>,
453        order_link_id: Option<&str>,
454    ) -> Result<OrderId> {
455        if let Some(id) = order_id {
456            return Ok(id.to_string());
457        }
458        if let Some(link) = order_link_id {
459            return self
460                .with_account(api_key, |account| {
461                    account
462                        .order_by_link_id(link)
463                        .ok_or_else(|| anyhow!("unknown order link id {link}"))
464                })
465                .await;
466        }
467        Err(anyhow!(
468            "request must provide either orderId or orderLinkId"
469        ))
470    }
471
472    pub async fn cancel_order(&self, api_key: &str, order_id: &OrderId) -> Result<Order> {
473        self.with_account_mut(api_key, |account| {
474            account.update_order(order_id, |order| {
475                order.status = OrderStatus::Canceled;
476                order.updated_at = Utc::now();
477                Ok(())
478            })
479        })
480        .await
481    }
482
483    pub async fn fill_order(
484        &self,
485        api_key: &str,
486        order_id: &OrderId,
487        quantity: Quantity,
488        price: Price,
489    ) -> Result<(Order, Fill)> {
490        let mut guard = self.inner.lock().await;
491        let account = guard
492            .accounts
493            .get_mut(api_key)
494            .ok_or_else(|| anyhow!("unknown API key {api_key}"))?;
495        let (fill, order_snapshot) = {
496            let order = account
497                .orders
498                .get_mut(order_id)
499                .ok_or_else(|| anyhow!("unknown order id {order_id}"))?;
500            let remaining = (order.request.quantity - order.filled_quantity).max(Decimal::ZERO);
501            if remaining.is_zero() {
502                return Err(anyhow!("order already fully filled"));
503            }
504            let exec_quantity = quantity.min(remaining);
505            if exec_quantity.is_zero() {
506                return Err(anyhow!("fill quantity resolved to zero"));
507            }
508            let filled_before = order.filled_quantity;
509            let new_filled = filled_before + exec_quantity;
510            let avg_price = if filled_before.is_zero() {
511                price
512            } else {
513                let previous_total = order.avg_fill_price.unwrap_or(price) * filled_before;
514                (previous_total + price * exec_quantity) / new_filled
515            };
516            order.filled_quantity = new_filled;
517            order.avg_fill_price = Some(avg_price);
518            order.status = if new_filled >= order.request.quantity {
519                OrderStatus::Filled
520            } else {
521                OrderStatus::PartiallyFilled
522            };
523            order.updated_at = Utc::now();
524            let fill = Fill {
525                order_id: order.id.clone(),
526                symbol: order.request.symbol.clone(),
527                side: order.request.side,
528                fill_price: price,
529                fill_quantity: exec_quantity,
530                fee: None,
531                timestamp: Utc::now(),
532            };
533            let order_snapshot = order.clone();
534            (fill, order_snapshot)
535        };
536        account.apply_fill(&fill);
537        Ok((order_snapshot, fill))
538    }
539
540    pub async fn account_balances(&self, api_key: &str) -> Result<Vec<AccountBalance>> {
541        self.with_account(api_key, |account| Ok(account.balances_snapshot()))
542            .await
543    }
544
545    pub async fn account_positions(&self, api_key: &str) -> Result<Vec<Position>> {
546        self.with_account(api_key, |account| Ok(account.positions_snapshot()))
547            .await
548    }
549
550    pub async fn open_orders(&self, api_key: &str, symbol: Option<&str>) -> Result<Vec<Order>> {
551        self.with_account(api_key, |account| Ok(account.open_orders_snapshot(symbol)))
552            .await
553    }
554
555    pub async fn executions_between(
556        &self,
557        api_key: &str,
558        start: DateTime<Utc>,
559        end: Option<DateTime<Utc>>,
560    ) -> Result<Vec<Fill>> {
561        self.with_account(api_key, |account| {
562            Ok(account.executions_in_range(start, end))
563        })
564        .await
565    }
566
567    pub async fn next_candle(&self) -> Option<Candle> {
568        let mut guard = self.inner.lock().await;
569        guard.market_data.next_candle()
570    }
571
572    pub async fn next_tick(&self) -> Option<Tick> {
573        let mut guard = self.inner.lock().await;
574        guard.market_data.next_tick()
575    }
576}