1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{anyhow, Result};
6use chrono::{DateTime, Utc};
7use rust_decimal::Decimal;
8use tokio::sync::{mpsc, Mutex};
9
10use tesser_core::{
11 AccountBalance, AssetId, Candle, ExchangeId, Fill, Order, OrderId, OrderStatus, Position,
12 Price, Quantity, Side, Symbol, Tick,
13};
14
15use crate::scenario::ScenarioManager;
16
17const EXEC_HISTORY_LIMIT: usize = 1024;
18const DEFAULT_QUOTE_CURRENCY: &str = "USDT";
19
20pub type ApiKey = String;
21
22pub type PrivateMessage = serde_json::Value;
24
25#[derive(Clone)]
27pub struct MockExchangeState {
28 inner: Arc<Mutex<Inner>>,
29 scenarios: ScenarioManager,
30 auto_fill: Option<AutoFillConfig>,
31}
32
33#[allow(dead_code)]
34pub(crate) struct Inner {
35 pub accounts: HashMap<ApiKey, AccountState>,
36 pub market_data: MarketDataQueues,
37 pub private_ws_sender: Option<mpsc::UnboundedSender<PrivateMessage>>,
38 pub pending_private_messages: VecDeque<PrivateMessage>,
39 pub order_seq: u64,
40 pub exchange: ExchangeId,
41}
42
43#[derive(Clone)]
44pub struct AccountState {
45 pub api_secret: String,
46 pub balances: HashMap<AssetId, AccountBalance>,
47 pub positions: HashMap<Symbol, Position>,
48 pub executions: VecDeque<Fill>,
49 pub orders: HashMap<OrderId, Order>,
50}
51
52impl AccountState {
53 fn from_config(config: AccountConfig) -> Self {
54 Self {
55 api_secret: config.api_secret,
56 balances: config
57 .balances
58 .into_iter()
59 .map(|balance| (balance.asset, balance))
60 .collect(),
61 positions: config
62 .positions
63 .into_iter()
64 .map(|position| (position.symbol, position))
65 .collect(),
66 executions: VecDeque::new(),
67 orders: HashMap::new(),
68 }
69 }
70
71 pub fn insert_order(&mut self, order: Order) {
72 self.orders.insert(order.id.clone(), order);
73 }
74
75 pub fn update_order<F>(&mut self, order_id: &OrderId, mut update: F) -> Result<Order>
76 where
77 F: FnMut(&mut Order) -> Result<()>,
78 {
79 let order = self
80 .orders
81 .get_mut(order_id)
82 .ok_or_else(|| anyhow!("unknown order id {order_id}"))?;
83 update(order)?;
84 Ok(order.clone())
85 }
86
87 pub fn apply_fill(&mut self, fill: &Fill) {
88 self.executions.push_back(fill.clone());
89 if self.executions.len() > EXEC_HISTORY_LIMIT {
90 self.executions.pop_front();
91 }
92 self.update_positions(fill);
93 self.update_balances(fill);
94 }
95
96 fn update_positions(&mut self, fill: &Fill) {
97 let entry = self
98 .positions
99 .entry(fill.symbol)
100 .or_insert_with(|| Position {
101 symbol: fill.symbol,
102 side: None,
103 quantity: Decimal::ZERO,
104 entry_price: None,
105 unrealized_pnl: Decimal::ZERO,
106 updated_at: Utc::now(),
107 });
108 entry.updated_at = Utc::now();
109
110 match (entry.side, fill.side) {
111 (None, Side::Buy) => {
112 entry.side = Some(Side::Buy);
113 entry.quantity = fill.fill_quantity;
114 entry.entry_price = Some(fill.fill_price);
115 }
116 (None, Side::Sell) => {
117 entry.side = Some(Side::Sell);
118 entry.quantity = fill.fill_quantity;
119 entry.entry_price = Some(fill.fill_price);
120 }
121 (Some(Side::Buy), Side::Buy) => {
122 let total_qty = entry.quantity + fill.fill_quantity;
123 let existing_value = entry.entry_price.unwrap_or(Decimal::ZERO) * entry.quantity;
124 let fill_value = fill.fill_price * fill.fill_quantity;
125 entry.quantity = total_qty;
126 entry.entry_price = Some((existing_value + fill_value) / total_qty);
127 }
128 (Some(Side::Sell), Side::Sell) => {
129 let total_qty = entry.quantity + fill.fill_quantity;
130 let existing_value = entry.entry_price.unwrap_or(Decimal::ZERO) * entry.quantity;
131 let fill_value = fill.fill_price * fill.fill_quantity;
132 entry.quantity = total_qty;
133 entry.entry_price = Some((existing_value + fill_value) / total_qty);
134 }
135 (Some(Side::Buy), Side::Sell) => {
136 if fill.fill_quantity < entry.quantity {
137 entry.quantity -= fill.fill_quantity;
138 } else if fill.fill_quantity == entry.quantity {
139 entry.quantity = Decimal::ZERO;
140 entry.side = None;
141 entry.entry_price = None;
142 } else {
143 entry.side = Some(Side::Sell);
144 entry.quantity = fill.fill_quantity - entry.quantity;
145 entry.entry_price = Some(fill.fill_price);
146 }
147 }
148 (Some(Side::Sell), Side::Buy) => {
149 if fill.fill_quantity < entry.quantity {
150 entry.quantity -= fill.fill_quantity;
151 } else if fill.fill_quantity == entry.quantity {
152 entry.quantity = Decimal::ZERO;
153 entry.side = None;
154 entry.entry_price = None;
155 } else {
156 entry.side = Some(Side::Buy);
157 entry.quantity = fill.fill_quantity - entry.quantity;
158 entry.entry_price = Some(fill.fill_price);
159 }
160 }
161 }
162 }
163
164 fn update_balances(&mut self, fill: &Fill) {
165 let quote_asset = AssetId::from_code(fill.symbol.exchange, DEFAULT_QUOTE_CURRENCY);
166 let quote = self.balances.entry(quote_asset).or_insert(AccountBalance {
167 exchange: quote_asset.exchange,
168 asset: quote_asset,
169 total: Decimal::ZERO,
170 available: Decimal::ZERO,
171 updated_at: Utc::now(),
172 });
173 let notional = fill.fill_price * fill.fill_quantity;
174 match fill.side {
175 Side::Buy => {
176 quote.total -= notional;
177 quote.available = quote.total;
178 }
179 Side::Sell => {
180 quote.total += notional;
181 quote.available = quote.total;
182 }
183 }
184 quote.updated_at = Utc::now();
185 }
186
187 pub fn order_by_link_id(&self, client_id: &str) -> Option<OrderId> {
188 self.orders
189 .values()
190 .find(|order| order.request.client_order_id.as_deref() == Some(client_id))
191 .map(|order| order.id.clone())
192 }
193
194 pub fn order(&self, order_id: &OrderId) -> Option<Order> {
195 self.orders.get(order_id).cloned()
196 }
197
198 pub fn balances_snapshot(&self) -> Vec<AccountBalance> {
199 self.balances.values().cloned().collect()
200 }
201
202 pub fn positions_snapshot(&self) -> Vec<Position> {
203 self.positions.values().cloned().collect()
204 }
205
206 pub fn open_orders_snapshot(&self, symbol: Option<Symbol>) -> Vec<Order> {
207 self.orders
208 .values()
209 .filter(|order| {
210 let active = !matches!(
211 order.status,
212 OrderStatus::Filled | OrderStatus::Canceled | OrderStatus::Rejected
213 );
214 let symbol_matches = symbol
215 .map(|value| order.request.symbol == value)
216 .unwrap_or(true);
217 active && symbol_matches
218 })
219 .cloned()
220 .collect()
221 }
222
223 pub fn executions_in_range(
224 &self,
225 start: DateTime<Utc>,
226 end: Option<DateTime<Utc>>,
227 ) -> Vec<Fill> {
228 self.executions
229 .iter()
230 .filter(|fill| {
231 fill.timestamp >= start && end.map(|limit| fill.timestamp <= limit).unwrap_or(true)
232 })
233 .cloned()
234 .collect()
235 }
236}
237
238#[derive(Default)]
239pub struct MarketDataQueues {
240 pub candles: VecDeque<Candle>,
241 pub ticks: VecDeque<Tick>,
242}
243
244impl MarketDataQueues {
245 pub fn push_candle(&mut self, candle: Candle) {
246 self.candles.push_back(candle);
247 }
248
249 pub fn push_tick(&mut self, tick: Tick) {
250 self.ticks.push_back(tick);
251 }
252
253 pub fn next_candle(&mut self) -> Option<Candle> {
254 self.candles.pop_front()
255 }
256
257 pub fn next_tick(&mut self) -> Option<Tick> {
258 self.ticks.pop_front()
259 }
260}
261
262#[derive(Clone)]
264pub struct AccountConfig {
265 pub api_key: String,
266 pub api_secret: String,
267 pub balances: Vec<AccountBalance>,
268 pub positions: Vec<Position>,
269}
270
271impl AccountConfig {
272 pub fn new(api_key: impl Into<String>, api_secret: impl Into<String>) -> Self {
273 Self {
274 api_key: api_key.into(),
275 api_secret: api_secret.into(),
276 balances: Vec::new(),
277 positions: Vec::new(),
278 }
279 }
280
281 pub fn with_balance(mut self, balance: AccountBalance) -> Self {
282 self.balances.push(balance);
283 self
284 }
285
286 pub fn with_position(mut self, position: Position) -> Self {
287 self.positions.push(position);
288 self
289 }
290}
291
292#[derive(Clone)]
294pub struct MockExchangeConfig {
295 pub accounts: Vec<AccountConfig>,
296 pub candles: Vec<Candle>,
297 pub ticks: Vec<Tick>,
298 pub scenarios: ScenarioManager,
299 pub exchange: ExchangeId,
300 pub auto_fill: Option<AutoFillConfig>,
301}
302
303impl MockExchangeConfig {
304 pub fn new() -> Self {
305 Self::default()
306 }
307
308 pub fn with_account(mut self, account: AccountConfig) -> Self {
309 self.accounts.push(account);
310 self
311 }
312
313 pub fn with_candles(mut self, candles: impl IntoIterator<Item = Candle>) -> Self {
314 self.candles.extend(candles);
315 self
316 }
317
318 pub fn with_ticks(mut self, ticks: impl IntoIterator<Item = Tick>) -> Self {
319 self.ticks.extend(ticks);
320 self
321 }
322
323 pub fn with_scenarios(mut self, scenarios: ScenarioManager) -> Self {
324 self.scenarios = scenarios;
325 self
326 }
327
328 pub fn with_auto_fill(mut self, config: AutoFillConfig) -> Self {
329 self.auto_fill = Some(config);
330 self
331 }
332
333 pub fn with_exchange(mut self, exchange: ExchangeId) -> Self {
334 self.exchange = exchange;
335 self
336 }
337}
338
339impl Default for MockExchangeConfig {
340 fn default() -> Self {
341 Self {
342 accounts: Vec::new(),
343 candles: Vec::new(),
344 ticks: Vec::new(),
345 scenarios: ScenarioManager::new(),
346 exchange: ExchangeId::UNSPECIFIED,
347 auto_fill: None,
348 }
349 }
350}
351
352#[derive(Clone)]
353pub struct AutoFillConfig {
354 pub delay: Duration,
355 pub price: Option<Decimal>,
356}
357
358impl MockExchangeState {
359 pub fn new(config: MockExchangeConfig) -> Self {
360 let market_data = MarketDataQueues {
361 candles: config.candles.into_iter().collect(),
362 ticks: config.ticks.into_iter().collect(),
363 };
364 let accounts = config
365 .accounts
366 .into_iter()
367 .map(|account| {
368 let api_key = account.api_key.clone();
369 (api_key, AccountState::from_config(account))
370 })
371 .collect();
372 let inner = Inner {
373 accounts,
374 market_data,
375 private_ws_sender: None,
376 pending_private_messages: VecDeque::new(),
377 order_seq: 1,
378 exchange: config.exchange,
379 };
380 Self {
381 inner: Arc::new(Mutex::new(inner)),
382 scenarios: config.scenarios,
383 auto_fill: config.auto_fill,
384 }
385 }
386
387 pub fn scenarios(&self) -> ScenarioManager {
388 self.scenarios.clone()
389 }
390
391 pub fn auto_fill_config(&self) -> Option<AutoFillConfig> {
392 self.auto_fill.clone()
393 }
394
395 pub async fn exchange(&self) -> ExchangeId {
396 self.inner.lock().await.exchange
397 }
398
399 #[allow(dead_code)]
400 pub(crate) fn inner(&self) -> &Arc<Mutex<Inner>> {
401 &self.inner
402 }
403
404 pub async fn set_private_ws_sender(&self, sender: mpsc::UnboundedSender<PrivateMessage>) {
405 let mut guard = self.inner.lock().await;
406 guard.private_ws_sender = Some(sender.clone());
407 while let Some(payload) = guard.pending_private_messages.pop_front() {
408 if sender.send(payload).is_err() {
409 break;
410 }
411 }
412 }
413
414 pub async fn clear_private_ws_sender(&self) {
415 let mut guard = self.inner.lock().await;
416 guard.private_ws_sender = None;
417 }
418
419 pub async fn emit_private_message(&self, payload: PrivateMessage) -> Result<()> {
420 let mut guard = self.inner.lock().await;
421 if let Some(tx) = guard.private_ws_sender.clone() {
422 tx.send(payload)
423 .map_err(|err| anyhow!("failed to deliver private stream message: {err}"))
424 } else {
425 guard.pending_private_messages.push_back(payload);
426 Ok(())
427 }
428 }
429
430 pub async fn account_secret(&self, api_key: &str) -> Option<String> {
431 let guard = self.inner.lock().await;
432 guard
433 .accounts
434 .get(api_key)
435 .map(|account| account.api_secret.clone())
436 }
437
438 pub async fn with_account_mut<F, T>(&self, api_key: &str, f: F) -> Result<T>
439 where
440 F: FnOnce(&mut AccountState) -> Result<T>,
441 {
442 let mut guard = self.inner.lock().await;
443 let account = guard
444 .accounts
445 .get_mut(api_key)
446 .ok_or_else(|| anyhow!("unknown API key {api_key}"))?;
447 f(account)
448 }
449
450 pub async fn with_account<F, T>(&self, api_key: &str, f: F) -> Result<T>
451 where
452 F: FnOnce(&AccountState) -> Result<T>,
453 {
454 let guard = self.inner.lock().await;
455 let account = guard
456 .accounts
457 .get(api_key)
458 .ok_or_else(|| anyhow!("unknown API key {api_key}"))?;
459 f(account)
460 }
461
462 pub async fn next_order_id(&self) -> OrderId {
463 let mut guard = self.inner.lock().await;
464 let id = guard.order_seq;
465 guard.order_seq += 1;
466 format!("MOCK-ORDER-{id}")
467 }
468
469 pub async fn register_order(&self, api_key: &str, order: Order) -> Result<Order> {
470 self.with_account_mut(api_key, |account| {
471 account.insert_order(order.clone());
472 Ok(order)
473 })
474 .await
475 }
476
477 pub async fn get_order(&self, api_key: &str, order_id: &OrderId) -> Result<Order> {
478 self.with_account(api_key, |account| {
479 account
480 .order(order_id)
481 .ok_or_else(|| anyhow!("unknown order id {order_id}"))
482 })
483 .await
484 }
485
486 pub async fn find_order_id(
487 &self,
488 api_key: &str,
489 order_id: Option<&str>,
490 order_link_id: Option<&str>,
491 ) -> Result<OrderId> {
492 if let Some(id) = order_id {
493 return Ok(id.to_string());
494 }
495 if let Some(link) = order_link_id {
496 return self
497 .with_account(api_key, |account| {
498 account
499 .order_by_link_id(link)
500 .ok_or_else(|| anyhow!("unknown order link id {link}"))
501 })
502 .await;
503 }
504 Err(anyhow!(
505 "request must provide either orderId or orderLinkId"
506 ))
507 }
508
509 pub async fn cancel_order(&self, api_key: &str, order_id: &OrderId) -> Result<Order> {
510 self.with_account_mut(api_key, |account| {
511 account.update_order(order_id, |order| {
512 order.status = OrderStatus::Canceled;
513 order.updated_at = Utc::now();
514 Ok(())
515 })
516 })
517 .await
518 }
519
520 pub async fn fill_order(
521 &self,
522 api_key: &str,
523 order_id: &OrderId,
524 quantity: Quantity,
525 price: Price,
526 ) -> Result<(Order, Fill)> {
527 let mut guard = self.inner.lock().await;
528 let account = guard
529 .accounts
530 .get_mut(api_key)
531 .ok_or_else(|| anyhow!("unknown API key {api_key}"))?;
532 let (fill, order_snapshot) = {
533 let order = account
534 .orders
535 .get_mut(order_id)
536 .ok_or_else(|| anyhow!("unknown order id {order_id}"))?;
537 let remaining = (order.request.quantity - order.filled_quantity).max(Decimal::ZERO);
538 if remaining.is_zero() {
539 return Err(anyhow!("order already fully filled"));
540 }
541 let exec_quantity = quantity.min(remaining);
542 if exec_quantity.is_zero() {
543 return Err(anyhow!("fill quantity resolved to zero"));
544 }
545 let filled_before = order.filled_quantity;
546 let new_filled = filled_before + exec_quantity;
547 let avg_price = if filled_before.is_zero() {
548 price
549 } else {
550 let previous_total = order.avg_fill_price.unwrap_or(price) * filled_before;
551 (previous_total + price * exec_quantity) / new_filled
552 };
553 order.filled_quantity = new_filled;
554 order.avg_fill_price = Some(avg_price);
555 order.status = if new_filled >= order.request.quantity {
556 OrderStatus::Filled
557 } else {
558 OrderStatus::PartiallyFilled
559 };
560 order.updated_at = Utc::now();
561 let fill = Fill {
562 order_id: order.id.clone(),
563 symbol: order.request.symbol,
564 side: order.request.side,
565 fill_price: price,
566 fill_quantity: exec_quantity,
567 fee: None,
568 fee_asset: None,
569 timestamp: Utc::now(),
570 };
571 let order_snapshot = order.clone();
572 (fill, order_snapshot)
573 };
574 account.apply_fill(&fill);
575 Ok((order_snapshot, fill))
576 }
577
578 pub async fn account_balances(&self, api_key: &str) -> Result<Vec<AccountBalance>> {
579 self.with_account(api_key, |account| Ok(account.balances_snapshot()))
580 .await
581 }
582
583 pub async fn account_positions(&self, api_key: &str) -> Result<Vec<Position>> {
584 self.with_account(api_key, |account| Ok(account.positions_snapshot()))
585 .await
586 }
587
588 pub async fn open_orders(&self, api_key: &str, symbol: Option<&str>) -> Result<Vec<Order>> {
589 let exchange = self.inner.lock().await.exchange;
590 let symbol = symbol.map(|code| Symbol::from_code(exchange, code));
591 self.with_account(api_key, |account| Ok(account.open_orders_snapshot(symbol)))
592 .await
593 }
594
595 pub async fn executions_between(
596 &self,
597 api_key: &str,
598 start: DateTime<Utc>,
599 end: Option<DateTime<Utc>>,
600 ) -> Result<Vec<Fill>> {
601 self.with_account(api_key, |account| {
602 Ok(account.executions_in_range(start, end))
603 })
604 .await
605 }
606
607 pub async fn next_candle(&self) -> Option<Candle> {
608 let mut guard = self.inner.lock().await;
609 guard.market_data.next_candle()
610 }
611
612 pub async fn next_tick(&self) -> Option<Tick> {
613 let mut guard = self.inner.lock().await;
614 guard.market_data.next_tick()
615 }
616}