1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use anyhow::{anyhow, Result};
5use chrono::{DateTime, Utc};
6use rust_decimal::Decimal;
7use tokio::sync::{mpsc, Mutex};
8
9use tesser_core::{
10 AccountBalance, Candle, Fill, Order, OrderId, OrderStatus, Position, Price, Quantity, Side,
11 Symbol, Tick,
12};
13
14use crate::scenario::ScenarioManager;
15
16const EXEC_HISTORY_LIMIT: usize = 1024;
17const DEFAULT_QUOTE_CURRENCY: &str = "USDT";
18
19pub type ApiKey = String;
20
21pub type PrivateMessage = serde_json::Value;
23
24#[derive(Clone)]
26pub struct MockExchangeState {
27 inner: Arc<Mutex<Inner>>,
28 scenarios: ScenarioManager,
29}
30
31#[allow(dead_code)]
32pub(crate) struct Inner {
33 pub accounts: HashMap<ApiKey, AccountState>,
34 pub market_data: MarketDataQueues,
35 pub private_ws_sender: Option<mpsc::UnboundedSender<PrivateMessage>>,
36 pub order_seq: u64,
37}
38
39#[derive(Clone)]
40pub struct AccountState {
41 pub api_secret: String,
42 pub balances: HashMap<String, AccountBalance>,
43 pub positions: HashMap<Symbol, Position>,
44 pub executions: VecDeque<Fill>,
45 pub orders: HashMap<OrderId, Order>,
46}
47
48impl AccountState {
49 fn from_config(config: AccountConfig) -> Self {
50 Self {
51 api_secret: config.api_secret,
52 balances: config
53 .balances
54 .into_iter()
55 .map(|balance| (balance.currency.clone(), balance))
56 .collect(),
57 positions: config
58 .positions
59 .into_iter()
60 .map(|position| (position.symbol.clone(), position))
61 .collect(),
62 executions: VecDeque::new(),
63 orders: HashMap::new(),
64 }
65 }
66
67 pub fn insert_order(&mut self, order: Order) {
68 self.orders.insert(order.id.clone(), order);
69 }
70
71 pub fn update_order<F>(&mut self, order_id: &OrderId, mut update: F) -> Result<Order>
72 where
73 F: FnMut(&mut Order) -> Result<()>,
74 {
75 let order = self
76 .orders
77 .get_mut(order_id)
78 .ok_or_else(|| anyhow!("unknown order id {order_id}"))?;
79 update(order)?;
80 Ok(order.clone())
81 }
82
83 pub fn apply_fill(&mut self, fill: &Fill) {
84 self.executions.push_back(fill.clone());
85 if self.executions.len() > EXEC_HISTORY_LIMIT {
86 self.executions.pop_front();
87 }
88 self.update_positions(fill);
89 self.update_balances(fill);
90 }
91
92 fn update_positions(&mut self, fill: &Fill) {
93 let entry = self
94 .positions
95 .entry(fill.symbol.clone())
96 .or_insert_with(|| Position {
97 symbol: fill.symbol.clone(),
98 side: None,
99 quantity: Decimal::ZERO,
100 entry_price: None,
101 unrealized_pnl: Decimal::ZERO,
102 updated_at: Utc::now(),
103 });
104 entry.updated_at = Utc::now();
105
106 match (entry.side, fill.side) {
107 (None, Side::Buy) => {
108 entry.side = Some(Side::Buy);
109 entry.quantity = fill.fill_quantity;
110 entry.entry_price = Some(fill.fill_price);
111 }
112 (None, Side::Sell) => {
113 entry.side = Some(Side::Sell);
114 entry.quantity = fill.fill_quantity;
115 entry.entry_price = Some(fill.fill_price);
116 }
117 (Some(Side::Buy), Side::Buy) => {
118 let total_qty = entry.quantity + fill.fill_quantity;
119 let existing_value = entry.entry_price.unwrap_or(Decimal::ZERO) * entry.quantity;
120 let fill_value = fill.fill_price * fill.fill_quantity;
121 entry.quantity = total_qty;
122 entry.entry_price = Some((existing_value + fill_value) / total_qty);
123 }
124 (Some(Side::Sell), Side::Sell) => {
125 let total_qty = entry.quantity + fill.fill_quantity;
126 let existing_value = entry.entry_price.unwrap_or(Decimal::ZERO) * entry.quantity;
127 let fill_value = fill.fill_price * fill.fill_quantity;
128 entry.quantity = total_qty;
129 entry.entry_price = Some((existing_value + fill_value) / total_qty);
130 }
131 (Some(Side::Buy), Side::Sell) => {
132 if fill.fill_quantity < entry.quantity {
133 entry.quantity -= fill.fill_quantity;
134 } else if fill.fill_quantity == entry.quantity {
135 entry.quantity = Decimal::ZERO;
136 entry.side = None;
137 entry.entry_price = None;
138 } else {
139 entry.side = Some(Side::Sell);
140 entry.quantity = fill.fill_quantity - entry.quantity;
141 entry.entry_price = Some(fill.fill_price);
142 }
143 }
144 (Some(Side::Sell), Side::Buy) => {
145 if fill.fill_quantity < entry.quantity {
146 entry.quantity -= fill.fill_quantity;
147 } else if fill.fill_quantity == entry.quantity {
148 entry.quantity = Decimal::ZERO;
149 entry.side = None;
150 entry.entry_price = None;
151 } else {
152 entry.side = Some(Side::Buy);
153 entry.quantity = fill.fill_quantity - entry.quantity;
154 entry.entry_price = Some(fill.fill_price);
155 }
156 }
157 }
158 }
159
160 fn update_balances(&mut self, fill: &Fill) {
161 let quote = self
162 .balances
163 .entry(DEFAULT_QUOTE_CURRENCY.to_string())
164 .or_insert(AccountBalance {
165 currency: DEFAULT_QUOTE_CURRENCY.into(),
166 total: Decimal::ZERO,
167 available: Decimal::ZERO,
168 updated_at: Utc::now(),
169 });
170 let notional = fill.fill_price * fill.fill_quantity;
171 match fill.side {
172 Side::Buy => {
173 quote.total -= notional;
174 quote.available = quote.total;
175 }
176 Side::Sell => {
177 quote.total += notional;
178 quote.available = quote.total;
179 }
180 }
181 quote.updated_at = Utc::now();
182 }
183
184 pub fn order_by_link_id(&self, client_id: &str) -> Option<OrderId> {
185 self.orders
186 .values()
187 .find(|order| order.request.client_order_id.as_deref() == Some(client_id))
188 .map(|order| order.id.clone())
189 }
190
191 pub fn order(&self, order_id: &OrderId) -> Option<Order> {
192 self.orders.get(order_id).cloned()
193 }
194
195 pub fn balances_snapshot(&self) -> Vec<AccountBalance> {
196 self.balances.values().cloned().collect()
197 }
198
199 pub fn positions_snapshot(&self) -> Vec<Position> {
200 self.positions.values().cloned().collect()
201 }
202
203 pub fn open_orders_snapshot(&self, symbol: Option<&str>) -> Vec<Order> {
204 self.orders
205 .values()
206 .filter(|order| {
207 let active = !matches!(
208 order.status,
209 OrderStatus::Filled | OrderStatus::Canceled | OrderStatus::Rejected
210 );
211 let symbol_matches = symbol
212 .map(|value| order.request.symbol == value)
213 .unwrap_or(true);
214 active && symbol_matches
215 })
216 .cloned()
217 .collect()
218 }
219
220 pub fn executions_in_range(
221 &self,
222 start: DateTime<Utc>,
223 end: Option<DateTime<Utc>>,
224 ) -> Vec<Fill> {
225 self.executions
226 .iter()
227 .filter(|fill| {
228 fill.timestamp >= start && end.map(|limit| fill.timestamp <= limit).unwrap_or(true)
229 })
230 .cloned()
231 .collect()
232 }
233}
234
235#[derive(Default)]
236pub struct MarketDataQueues {
237 pub candles: VecDeque<Candle>,
238 pub ticks: VecDeque<Tick>,
239}
240
241impl MarketDataQueues {
242 pub fn push_candle(&mut self, candle: Candle) {
243 self.candles.push_back(candle);
244 }
245
246 pub fn push_tick(&mut self, tick: Tick) {
247 self.ticks.push_back(tick);
248 }
249
250 pub fn next_candle(&mut self) -> Option<Candle> {
251 self.candles.pop_front()
252 }
253
254 pub fn next_tick(&mut self) -> Option<Tick> {
255 self.ticks.pop_front()
256 }
257}
258
259#[derive(Clone)]
261pub struct AccountConfig {
262 pub api_key: String,
263 pub api_secret: String,
264 pub balances: Vec<AccountBalance>,
265 pub positions: Vec<Position>,
266}
267
268impl AccountConfig {
269 pub fn new(api_key: impl Into<String>, api_secret: impl Into<String>) -> Self {
270 Self {
271 api_key: api_key.into(),
272 api_secret: api_secret.into(),
273 balances: Vec::new(),
274 positions: Vec::new(),
275 }
276 }
277
278 pub fn with_balance(mut self, balance: AccountBalance) -> Self {
279 self.balances.push(balance);
280 self
281 }
282
283 pub fn with_position(mut self, position: Position) -> Self {
284 self.positions.push(position);
285 self
286 }
287}
288
289#[derive(Clone)]
291pub struct MockExchangeConfig {
292 pub accounts: Vec<AccountConfig>,
293 pub candles: Vec<Candle>,
294 pub ticks: Vec<Tick>,
295 pub scenarios: ScenarioManager,
296}
297
298impl MockExchangeConfig {
299 pub fn new() -> Self {
300 Self::default()
301 }
302
303 pub fn with_account(mut self, account: AccountConfig) -> Self {
304 self.accounts.push(account);
305 self
306 }
307
308 pub fn with_candles(mut self, candles: impl IntoIterator<Item = Candle>) -> Self {
309 self.candles.extend(candles);
310 self
311 }
312
313 pub fn with_ticks(mut self, ticks: impl IntoIterator<Item = Tick>) -> Self {
314 self.ticks.extend(ticks);
315 self
316 }
317
318 pub fn with_scenarios(mut self, scenarios: ScenarioManager) -> Self {
319 self.scenarios = scenarios;
320 self
321 }
322}
323
324impl Default for MockExchangeConfig {
325 fn default() -> Self {
326 Self {
327 accounts: Vec::new(),
328 candles: Vec::new(),
329 ticks: Vec::new(),
330 scenarios: ScenarioManager::new(),
331 }
332 }
333}
334
335impl MockExchangeState {
336 pub fn new(config: MockExchangeConfig) -> Self {
337 let market_data = MarketDataQueues {
338 candles: config.candles.into_iter().collect(),
339 ticks: config.ticks.into_iter().collect(),
340 };
341 let accounts = config
342 .accounts
343 .into_iter()
344 .map(|account| {
345 let api_key = account.api_key.clone();
346 (api_key, AccountState::from_config(account))
347 })
348 .collect();
349 let inner = Inner {
350 accounts,
351 market_data,
352 private_ws_sender: None,
353 order_seq: 1,
354 };
355 Self {
356 inner: Arc::new(Mutex::new(inner)),
357 scenarios: config.scenarios,
358 }
359 }
360
361 pub fn scenarios(&self) -> ScenarioManager {
362 self.scenarios.clone()
363 }
364
365 #[allow(dead_code)]
366 pub(crate) fn inner(&self) -> &Arc<Mutex<Inner>> {
367 &self.inner
368 }
369
370 pub async fn set_private_ws_sender(&self, sender: mpsc::UnboundedSender<PrivateMessage>) {
371 let mut guard = self.inner.lock().await;
372 guard.private_ws_sender = Some(sender);
373 }
374
375 pub async fn clear_private_ws_sender(&self) {
376 let mut guard = self.inner.lock().await;
377 guard.private_ws_sender = None;
378 }
379
380 pub async fn emit_private_message(&self, payload: PrivateMessage) -> Result<()> {
381 let sender = {
382 let guard = self.inner.lock().await;
383 guard.private_ws_sender.clone()
384 };
385 if let Some(tx) = sender {
386 tx.send(payload)
387 .map_err(|err| anyhow!("failed to deliver private stream message: {err}"))
388 } else {
389 Ok(())
390 }
391 }
392
393 pub async fn account_secret(&self, api_key: &str) -> Option<String> {
394 let guard = self.inner.lock().await;
395 guard
396 .accounts
397 .get(api_key)
398 .map(|account| account.api_secret.clone())
399 }
400
401 pub async fn with_account_mut<F, T>(&self, api_key: &str, f: F) -> Result<T>
402 where
403 F: FnOnce(&mut AccountState) -> Result<T>,
404 {
405 let mut guard = self.inner.lock().await;
406 let account = guard
407 .accounts
408 .get_mut(api_key)
409 .ok_or_else(|| anyhow!("unknown API key {api_key}"))?;
410 f(account)
411 }
412
413 pub async fn with_account<F, T>(&self, api_key: &str, f: F) -> Result<T>
414 where
415 F: FnOnce(&AccountState) -> Result<T>,
416 {
417 let guard = self.inner.lock().await;
418 let account = guard
419 .accounts
420 .get(api_key)
421 .ok_or_else(|| anyhow!("unknown API key {api_key}"))?;
422 f(account)
423 }
424
425 pub async fn next_order_id(&self) -> OrderId {
426 let mut guard = self.inner.lock().await;
427 let id = guard.order_seq;
428 guard.order_seq += 1;
429 format!("MOCK-ORDER-{id}")
430 }
431
432 pub async fn register_order(&self, api_key: &str, order: Order) -> Result<Order> {
433 self.with_account_mut(api_key, |account| {
434 account.insert_order(order.clone());
435 Ok(order)
436 })
437 .await
438 }
439
440 pub async fn get_order(&self, api_key: &str, order_id: &OrderId) -> Result<Order> {
441 self.with_account(api_key, |account| {
442 account
443 .order(order_id)
444 .ok_or_else(|| anyhow!("unknown order id {order_id}"))
445 })
446 .await
447 }
448
449 pub async fn find_order_id(
450 &self,
451 api_key: &str,
452 order_id: Option<&str>,
453 order_link_id: Option<&str>,
454 ) -> Result<OrderId> {
455 if let Some(id) = order_id {
456 return Ok(id.to_string());
457 }
458 if let Some(link) = order_link_id {
459 return self
460 .with_account(api_key, |account| {
461 account
462 .order_by_link_id(link)
463 .ok_or_else(|| anyhow!("unknown order link id {link}"))
464 })
465 .await;
466 }
467 Err(anyhow!(
468 "request must provide either orderId or orderLinkId"
469 ))
470 }
471
472 pub async fn cancel_order(&self, api_key: &str, order_id: &OrderId) -> Result<Order> {
473 self.with_account_mut(api_key, |account| {
474 account.update_order(order_id, |order| {
475 order.status = OrderStatus::Canceled;
476 order.updated_at = Utc::now();
477 Ok(())
478 })
479 })
480 .await
481 }
482
483 pub async fn fill_order(
484 &self,
485 api_key: &str,
486 order_id: &OrderId,
487 quantity: Quantity,
488 price: Price,
489 ) -> Result<(Order, Fill)> {
490 let mut guard = self.inner.lock().await;
491 let account = guard
492 .accounts
493 .get_mut(api_key)
494 .ok_or_else(|| anyhow!("unknown API key {api_key}"))?;
495 let (fill, order_snapshot) = {
496 let order = account
497 .orders
498 .get_mut(order_id)
499 .ok_or_else(|| anyhow!("unknown order id {order_id}"))?;
500 let remaining = (order.request.quantity - order.filled_quantity).max(Decimal::ZERO);
501 if remaining.is_zero() {
502 return Err(anyhow!("order already fully filled"));
503 }
504 let exec_quantity = quantity.min(remaining);
505 if exec_quantity.is_zero() {
506 return Err(anyhow!("fill quantity resolved to zero"));
507 }
508 let filled_before = order.filled_quantity;
509 let new_filled = filled_before + exec_quantity;
510 let avg_price = if filled_before.is_zero() {
511 price
512 } else {
513 let previous_total = order.avg_fill_price.unwrap_or(price) * filled_before;
514 (previous_total + price * exec_quantity) / new_filled
515 };
516 order.filled_quantity = new_filled;
517 order.avg_fill_price = Some(avg_price);
518 order.status = if new_filled >= order.request.quantity {
519 OrderStatus::Filled
520 } else {
521 OrderStatus::PartiallyFilled
522 };
523 order.updated_at = Utc::now();
524 let fill = Fill {
525 order_id: order.id.clone(),
526 symbol: order.request.symbol.clone(),
527 side: order.request.side,
528 fill_price: price,
529 fill_quantity: exec_quantity,
530 fee: None,
531 timestamp: Utc::now(),
532 };
533 let order_snapshot = order.clone();
534 (fill, order_snapshot)
535 };
536 account.apply_fill(&fill);
537 Ok((order_snapshot, fill))
538 }
539
540 pub async fn account_balances(&self, api_key: &str) -> Result<Vec<AccountBalance>> {
541 self.with_account(api_key, |account| Ok(account.balances_snapshot()))
542 .await
543 }
544
545 pub async fn account_positions(&self, api_key: &str) -> Result<Vec<Position>> {
546 self.with_account(api_key, |account| Ok(account.positions_snapshot()))
547 .await
548 }
549
550 pub async fn open_orders(&self, api_key: &str, symbol: Option<&str>) -> Result<Vec<Order>> {
551 self.with_account(api_key, |account| Ok(account.open_orders_snapshot(symbol)))
552 .await
553 }
554
555 pub async fn executions_between(
556 &self,
557 api_key: &str,
558 start: DateTime<Utc>,
559 end: Option<DateTime<Utc>>,
560 ) -> Result<Vec<Fill>> {
561 self.with_account(api_key, |account| {
562 Ok(account.executions_in_range(start, end))
563 })
564 .await
565 }
566
567 pub async fn next_candle(&self) -> Option<Candle> {
568 let mut guard = self.inner.lock().await;
569 guard.market_data.next_candle()
570 }
571
572 pub async fn next_tick(&self) -> Option<Tick> {
573 let mut guard = self.inner.lock().await;
574 guard.market_data.next_tick()
575 }
576}