1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Instant;
4
5use anyhow::Result;
6use chrono::TimeZone;
7
8use crate::binance::rest::BinanceRestClient;
9use crate::binance::types::{BinanceMyTrade, BinanceOrderResponse};
10use crate::config::RiskConfig;
11use crate::model::order::{Fill, Order, OrderSide, OrderStatus, OrderType};
12use crate::model::position::Position;
13use crate::model::signal::Signal;
14use crate::order_store;
15use crate::risk_module::{
16 ApiEndpointGroup, EndpointRateLimits, OrderIntent, RateBudgetSnapshot, RejectionReasonCode,
17 RiskModule,
18};
19
20pub use crate::risk_module::MarketKind;
21
22#[derive(Debug, Clone)]
23pub enum OrderUpdate {
24 Submitted {
25 intent_id: String,
26 client_order_id: String,
27 server_order_id: u64,
28 },
29 Filled {
30 intent_id: String,
31 client_order_id: String,
32 side: OrderSide,
33 fills: Vec<Fill>,
34 avg_price: f64,
35 },
36 Rejected {
37 intent_id: String,
38 client_order_id: String,
39 reason_code: String,
40 reason: String,
41 },
42}
43
44#[derive(Debug, Clone, Default)]
45pub struct OrderHistoryStats {
46 pub trade_count: u32,
47 pub win_count: u32,
48 pub lose_count: u32,
49 pub realized_pnl: f64,
50}
51
52#[derive(Debug, Clone, Default)]
53pub struct OrderHistorySnapshot {
54 pub rows: Vec<String>,
55 pub stats: OrderHistoryStats,
56 pub strategy_stats: HashMap<String, OrderHistoryStats>,
57 pub fills: Vec<OrderHistoryFill>,
58 pub open_qty: f64,
59 pub open_entry_price: f64,
60 pub estimated_total_pnl_usdt: Option<f64>,
61 pub trade_data_complete: bool,
62 pub fetched_at_ms: u64,
63 pub fetch_latency_ms: u64,
64 pub latest_event_ms: Option<u64>,
65}
66
67#[derive(Debug, Clone)]
68pub struct OrderHistoryFill {
69 pub timestamp_ms: u64,
70 pub side: OrderSide,
71 pub price: f64,
72}
73
74pub struct OrderManager {
75 rest_client: Arc<BinanceRestClient>,
76 active_orders: HashMap<String, Order>,
77 position: Position,
78 symbol: String,
79 market: MarketKind,
80 order_amount_usdt: f64,
81 balances: HashMap<String, f64>,
82 last_price: f64,
83 risk_module: RiskModule,
84 default_strategy_cooldown_ms: u64,
85 default_strategy_max_active_orders: u32,
86 strategy_limits_by_tag: HashMap<String, StrategyExecutionLimit>,
87 last_strategy_submit_ms: HashMap<String, u64>,
88 default_symbol_max_exposure_usdt: f64,
89 symbol_exposure_limit_by_key: HashMap<String, f64>,
90}
91
92#[derive(Debug, Clone, Copy)]
93struct StrategyExecutionLimit {
94 cooldown_ms: u64,
95 max_active_orders: u32,
96}
97
98fn normalize_market_label(market: MarketKind) -> &'static str {
99 match market {
100 MarketKind::Spot => "spot",
101 MarketKind::Futures => "futures",
102 }
103}
104
105fn symbol_limit_key(symbol: &str, market: MarketKind) -> String {
106 format!(
107 "{}:{}",
108 symbol.trim().to_ascii_uppercase(),
109 normalize_market_label(market)
110 )
111}
112
113fn storage_symbol(symbol: &str, market: MarketKind) -> String {
114 match market {
115 MarketKind::Spot => symbol.to_string(),
116 MarketKind::Futures => format!("{}#FUT", symbol),
117 }
118}
119
120fn display_qty_for_history(status: &str, orig_qty: f64, executed_qty: f64) -> f64 {
121 match status {
122 "FILLED" | "PARTIALLY_FILLED" => executed_qty,
123 _ => orig_qty,
124 }
125}
126
127fn format_history_time(timestamp_ms: u64) -> String {
128 chrono::Utc
129 .timestamp_millis_opt(timestamp_ms as i64)
130 .single()
131 .map(|dt| {
132 dt.with_timezone(&chrono::Local)
133 .format("%H:%M:%S")
134 .to_string()
135 })
136 .unwrap_or_else(|| "--:--:--".to_string())
137}
138
139fn format_order_history_row(
140 timestamp_ms: u64,
141 status: &str,
142 side: &str,
143 qty: f64,
144 avg_price: f64,
145 client_order_id: &str,
146) -> String {
147 format!(
148 "{} {:<10} {:<4} {:.5} @ {:.2} {}",
149 format_history_time(timestamp_ms),
150 status,
151 side,
152 qty,
153 avg_price,
154 client_order_id
155 )
156}
157
158fn source_label_from_client_order_id(client_order_id: &str) -> String {
159 if client_order_id.contains("-mnl-") {
160 "MANUAL".to_string()
161 } else if client_order_id.contains("-cfg-") {
162 "MA(Config)".to_string()
163 } else if client_order_id.contains("-fst-") {
164 "MA(Fast 5/20)".to_string()
165 } else if client_order_id.contains("-slw-") {
166 "MA(Slow 20/60)".to_string()
167 } else if let Some(source_tag) = parse_source_tag_from_client_order_id(client_order_id) {
168 source_tag.to_ascii_lowercase()
169 } else {
170 "UNKNOWN".to_string()
171 }
172}
173
174fn parse_source_tag_from_client_order_id(client_order_id: &str) -> Option<&str> {
175 let body = client_order_id.strip_prefix("sq-")?;
176 let (source_tag, _) = body.split_once('-')?;
177 if source_tag.is_empty() {
178 None
179 } else {
180 Some(source_tag)
181 }
182}
183
184fn format_trade_history_row(t: &BinanceMyTrade, source: &str) -> String {
185 let side = if t.is_buyer { "BUY" } else { "SELL" };
186 format_order_history_row(
187 t.time,
188 "FILLED",
189 side,
190 t.qty,
191 t.price,
192 &format!("order#{}#T{} [{}]", t.order_id, t.id, source),
193 )
194}
195
196fn split_symbol_assets(symbol: &str) -> (String, String) {
197 const QUOTE_SUFFIXES: [&str; 10] = [
198 "USDT", "USDC", "FDUSD", "BUSD", "TUSD", "TRY", "EUR", "BTC", "ETH", "BNB",
199 ];
200 for q in QUOTE_SUFFIXES {
201 if let Some(base) = symbol.strip_suffix(q) {
202 if !base.is_empty() {
203 return (base.to_string(), q.to_string());
204 }
205 }
206 }
207 (symbol.to_string(), String::new())
208}
209
210#[derive(Clone, Copy, Default)]
211struct LongPos {
212 qty: f64,
213 cost_quote: f64,
214}
215
216fn apply_spot_trade_with_fee(
217 pos: &mut LongPos,
218 stats: &mut OrderHistoryStats,
219 t: &BinanceMyTrade,
220 base_asset: &str,
221 quote_asset: &str,
222) {
223 let qty = t.qty.max(0.0);
224 if qty <= f64::EPSILON {
225 return;
226 }
227 let fee_asset = t.commission_asset.as_str();
228 let fee_is_base = !base_asset.is_empty() && fee_asset.eq_ignore_ascii_case(base_asset);
229 let fee_is_quote = !quote_asset.is_empty() && fee_asset.eq_ignore_ascii_case(quote_asset);
230
231 if t.is_buyer {
232 let net_qty = (qty
233 - if fee_is_base {
234 t.commission.max(0.0)
235 } else {
236 0.0
237 })
238 .max(0.0);
239 if net_qty <= f64::EPSILON {
240 return;
241 }
242 let fee_quote = if fee_is_quote {
243 t.commission.max(0.0)
244 } else {
245 0.0
246 };
247 pos.qty += net_qty;
248 pos.cost_quote += qty * t.price + fee_quote;
249 return;
250 }
251
252 if pos.qty <= f64::EPSILON {
254 return;
255 }
256 let close_qty = qty.min(pos.qty);
257 if close_qty <= f64::EPSILON {
258 return;
259 }
260 let avg_cost = pos.cost_quote / pos.qty.max(f64::EPSILON);
261 let fee_quote_total = if fee_is_quote {
262 t.commission.max(0.0)
263 } else if fee_is_base {
264 t.commission.max(0.0) * t.price
266 } else {
267 0.0
268 };
269 let fee_quote = fee_quote_total * (close_qty / qty.max(f64::EPSILON));
270 let pnl_delta = (close_qty * t.price - fee_quote) - (avg_cost * close_qty);
271 if pnl_delta > 0.0 {
272 stats.win_count += 1;
273 stats.trade_count += 1;
274 } else if pnl_delta < 0.0 {
275 stats.lose_count += 1;
276 stats.trade_count += 1;
277 }
278 stats.realized_pnl += pnl_delta;
279
280 pos.qty -= close_qty;
281 pos.cost_quote -= avg_cost * close_qty;
282 if pos.qty <= f64::EPSILON {
283 pos.qty = 0.0;
284 pos.cost_quote = 0.0;
285 }
286}
287
288fn compute_trade_state(
289 mut trades: Vec<BinanceMyTrade>,
290 symbol: &str,
291) -> (OrderHistoryStats, LongPos) {
292 trades.sort_by_key(|t| (t.time, t.id));
293 let (base_asset, quote_asset) = split_symbol_assets(symbol);
294 let mut pos = LongPos::default();
295 let mut stats = OrderHistoryStats::default();
296 for t in trades {
297 apply_spot_trade_with_fee(&mut pos, &mut stats, &t, &base_asset, "e_asset);
298 }
299 (stats, pos)
300}
301
302fn compute_futures_open_state(mut trades: Vec<BinanceMyTrade>) -> LongPos {
303 trades.sort_by_key(|t| (t.time, t.id));
304 let mut pos = LongPos::default();
305 for t in trades {
306 let qty = t.qty.max(0.0);
307 if qty <= f64::EPSILON {
308 continue;
309 }
310 if t.is_buyer {
311 pos.qty += qty;
312 pos.cost_quote += qty * t.price;
313 continue;
314 }
315 if pos.qty <= f64::EPSILON {
316 continue;
317 }
318 let close_qty = qty.min(pos.qty);
319 let avg_cost = pos.cost_quote / pos.qty.max(f64::EPSILON);
320 pos.qty -= close_qty;
321 pos.cost_quote -= avg_cost * close_qty;
322 if pos.qty <= f64::EPSILON {
323 pos.qty = 0.0;
324 pos.cost_quote = 0.0;
325 }
326 }
327 pos
328}
329
330fn compute_trade_stats_by_source(
331 mut trades: Vec<BinanceMyTrade>,
332 order_source_by_id: &HashMap<u64, String>,
333 symbol: &str,
334) -> HashMap<String, OrderHistoryStats> {
335 trades.sort_by_key(|t| (t.time, t.id));
336
337 if symbol.ends_with("#FUT") {
339 let mut stats_by_source: HashMap<String, OrderHistoryStats> = HashMap::new();
340 for t in trades {
341 let source = order_source_by_id
342 .get(&t.order_id)
343 .cloned()
344 .unwrap_or_else(|| "UNKNOWN".to_string());
345 let stats = stats_by_source.entry(source).or_default();
346 if t.realized_pnl > 0.0 {
347 stats.win_count += 1;
348 stats.trade_count += 1;
349 } else if t.realized_pnl < 0.0 {
350 stats.lose_count += 1;
351 stats.trade_count += 1;
352 }
353 stats.realized_pnl += t.realized_pnl;
354 }
355 return stats_by_source;
356 }
357
358 let (base_asset, quote_asset) = split_symbol_assets(symbol);
359 let mut pos_by_source: HashMap<String, LongPos> = HashMap::new();
360 let mut stats_by_source: HashMap<String, OrderHistoryStats> = HashMap::new();
361
362 for t in trades {
363 let source = order_source_by_id
364 .get(&t.order_id)
365 .cloned()
366 .unwrap_or_else(|| "UNKNOWN".to_string());
367 let pos = pos_by_source.entry(source.clone()).or_default();
368 let stats = stats_by_source.entry(source).or_default();
369 apply_spot_trade_with_fee(pos, stats, &t, &base_asset, "e_asset);
370 }
371
372 stats_by_source
373}
374
375fn to_persistable_stats_map(
376 strategy_stats: &HashMap<String, OrderHistoryStats>,
377) -> HashMap<String, order_store::StrategyScopedStats> {
378 strategy_stats
379 .iter()
380 .map(|(k, v)| {
381 (
382 k.clone(),
383 order_store::StrategyScopedStats {
384 trade_count: v.trade_count,
385 win_count: v.win_count,
386 lose_count: v.lose_count,
387 realized_pnl: v.realized_pnl,
388 },
389 )
390 })
391 .collect()
392}
393
394fn from_persisted_stats_map(
395 persisted: HashMap<String, order_store::StrategyScopedStats>,
396) -> HashMap<String, OrderHistoryStats> {
397 persisted
398 .into_iter()
399 .map(|(k, v)| {
400 (
401 k,
402 OrderHistoryStats {
403 trade_count: v.trade_count,
404 win_count: v.win_count,
405 lose_count: v.lose_count,
406 realized_pnl: v.realized_pnl,
407 },
408 )
409 })
410 .collect()
411}
412
413impl OrderManager {
414 pub fn new(
423 rest_client: Arc<BinanceRestClient>,
424 symbol: &str,
425 market: MarketKind,
426 order_amount_usdt: f64,
427 risk_config: &RiskConfig,
428 ) -> Self {
429 let mut strategy_limits_by_tag = HashMap::new();
430 let mut symbol_exposure_limit_by_key = HashMap::new();
431 let default_strategy_cooldown_ms = risk_config.default_strategy_cooldown_ms;
432 let default_strategy_max_active_orders =
433 risk_config.default_strategy_max_active_orders.max(1);
434 let default_symbol_max_exposure_usdt =
435 risk_config.default_symbol_max_exposure_usdt.max(0.0);
436 for profile in &risk_config.strategy_limits {
437 let source_tag = profile.source_tag.trim().to_ascii_lowercase();
438 if source_tag.is_empty() {
439 continue;
440 }
441 strategy_limits_by_tag.insert(
442 source_tag,
443 StrategyExecutionLimit {
444 cooldown_ms: profile.cooldown_ms.unwrap_or(default_strategy_cooldown_ms),
445 max_active_orders: profile
446 .max_active_orders
447 .unwrap_or(default_strategy_max_active_orders)
448 .max(1),
449 },
450 );
451 }
452 for limit in &risk_config.symbol_exposure_limits {
453 let symbol = limit.symbol.trim().to_ascii_uppercase();
454 if symbol.is_empty() {
455 continue;
456 }
457 let market = match limit
458 .market
459 .as_deref()
460 .unwrap_or("spot")
461 .trim()
462 .to_ascii_lowercase()
463 .as_str()
464 {
465 "spot" => MarketKind::Spot,
466 "futures" | "future" | "fut" => MarketKind::Futures,
467 _ => continue,
468 };
469 symbol_exposure_limit_by_key.insert(
470 symbol_limit_key(&symbol, market),
471 limit.max_exposure_usdt.max(0.0),
472 );
473 }
474 Self {
475 rest_client: rest_client.clone(),
476 active_orders: HashMap::new(),
477 position: Position::new(symbol.to_string()),
478 symbol: symbol.to_string(),
479 market,
480 order_amount_usdt,
481 balances: HashMap::new(),
482 last_price: 0.0,
483 risk_module: RiskModule::new(
484 rest_client.clone(),
485 risk_config.global_rate_limit_per_minute,
486 EndpointRateLimits {
487 orders_per_minute: risk_config.endpoint_rate_limits.orders_per_minute,
488 account_per_minute: risk_config.endpoint_rate_limits.account_per_minute,
489 market_data_per_minute: risk_config.endpoint_rate_limits.market_data_per_minute,
490 },
491 ),
492 default_strategy_cooldown_ms,
493 default_strategy_max_active_orders,
494 strategy_limits_by_tag,
495 last_strategy_submit_ms: HashMap::new(),
496 default_symbol_max_exposure_usdt,
497 symbol_exposure_limit_by_key,
498 }
499 }
500
501 pub fn position(&self) -> &Position {
506 &self.position
507 }
508
509 pub fn market_kind(&self) -> MarketKind {
510 self.market
511 }
512
513 pub fn balances(&self) -> &HashMap<String, f64> {
518 &self.balances
519 }
520
521 pub fn update_unrealized_pnl(&mut self, current_price: f64) {
527 self.last_price = current_price;
528 self.position.update_unrealized_pnl(current_price);
529 }
530
531 pub fn last_price(&self) -> Option<f64> {
532 (self.last_price > f64::EPSILON).then_some(self.last_price)
533 }
534
535 pub fn open_order_count(&self) -> usize {
536 self.active_orders.len()
537 }
538
539 pub fn reserved_cash_usdt(&self) -> f64 {
540 self.open_order_count() as f64 * self.order_amount_usdt.max(0.0)
541 }
542
543 pub fn rate_budget_snapshot(&self) -> RateBudgetSnapshot {
547 self.risk_module.rate_budget_snapshot()
548 }
549
550 pub fn orders_rate_budget_snapshot(&self) -> RateBudgetSnapshot {
551 self.risk_module
552 .endpoint_budget_snapshot(ApiEndpointGroup::Orders)
553 }
554
555 pub fn account_rate_budget_snapshot(&self) -> RateBudgetSnapshot {
556 self.risk_module
557 .endpoint_budget_snapshot(ApiEndpointGroup::Account)
558 }
559
560 pub fn market_data_rate_budget_snapshot(&self) -> RateBudgetSnapshot {
561 self.risk_module
562 .endpoint_budget_snapshot(ApiEndpointGroup::MarketData)
563 }
564
565 fn strategy_limits_for(&self, source_tag: &str) -> StrategyExecutionLimit {
566 self.strategy_limits_by_tag
567 .get(source_tag)
568 .copied()
569 .unwrap_or(StrategyExecutionLimit {
570 cooldown_ms: self.default_strategy_cooldown_ms,
571 max_active_orders: self.default_strategy_max_active_orders,
572 })
573 }
574
575 fn active_order_count_for_source(&self, source_tag: &str) -> u32 {
576 let prefix = format!("sq-{}-", source_tag);
577 self.active_orders
578 .values()
579 .filter(|o| !o.status.is_terminal() && o.client_order_id.starts_with(&prefix))
580 .count() as u32
581 }
582
583 fn evaluate_strategy_limits(
584 &self,
585 source_tag: &str,
586 created_at_ms: u64,
587 ) -> Option<(String, String)> {
588 let limits = self.strategy_limits_for(source_tag);
589 let active_count = self.active_order_count_for_source(source_tag);
590 if active_count >= limits.max_active_orders {
591 return Some((
592 RejectionReasonCode::RiskStrategyMaxActiveOrdersExceeded
593 .as_str()
594 .to_string(),
595 format!(
596 "Strategy '{}' active order limit exceeded (active {}, limit {})",
597 source_tag, active_count, limits.max_active_orders
598 ),
599 ));
600 }
601
602 if limits.cooldown_ms > 0 {
603 if let Some(last_submit_ms) = self.last_strategy_submit_ms.get(source_tag) {
604 let elapsed = created_at_ms.saturating_sub(*last_submit_ms);
605 if elapsed < limits.cooldown_ms {
606 let remaining = limits.cooldown_ms - elapsed;
607 return Some((
608 RejectionReasonCode::RiskStrategyCooldownActive
609 .as_str()
610 .to_string(),
611 format!(
612 "Strategy '{}' cooldown active ({}ms remaining)",
613 source_tag, remaining
614 ),
615 ));
616 }
617 }
618 }
619
620 None
621 }
622
623 fn mark_strategy_submit(&mut self, source_tag: &str, created_at_ms: u64) {
624 self.last_strategy_submit_ms
625 .insert(source_tag.to_string(), created_at_ms);
626 }
627
628 fn max_symbol_exposure_usdt(&self) -> f64 {
629 self.symbol_exposure_limit_by_key
630 .get(&symbol_limit_key(&self.symbol, self.market))
631 .copied()
632 .unwrap_or(self.default_symbol_max_exposure_usdt)
633 }
634
635 fn projected_notional_after_fill(&self, side: OrderSide, qty: f64) -> (f64, f64) {
636 let price = self.last_price.max(0.0);
637 if price <= f64::EPSILON {
638 return (0.0, 0.0);
639 }
640 let current_qty_signed = match self.position.side {
641 Some(OrderSide::Buy) => self.position.qty,
642 Some(OrderSide::Sell) => -self.position.qty,
643 None => 0.0,
644 };
645 let delta = match side {
646 OrderSide::Buy => qty,
647 OrderSide::Sell => -qty,
648 };
649 let projected_qty_signed = current_qty_signed + delta;
650 (
651 current_qty_signed.abs() * price,
652 projected_qty_signed.abs() * price,
653 )
654 }
655
656 fn evaluate_symbol_exposure_limit(
657 &self,
658 side: OrderSide,
659 qty: f64,
660 ) -> Option<(String, String)> {
661 let max_exposure = self.max_symbol_exposure_usdt();
662 if max_exposure <= f64::EPSILON {
663 return None;
664 }
665 let (current_notional, projected_notional) = self.projected_notional_after_fill(side, qty);
666 if projected_notional > max_exposure && projected_notional > current_notional + f64::EPSILON
667 {
668 return Some((
669 RejectionReasonCode::RiskSymbolExposureLimitExceeded
670 .as_str()
671 .to_string(),
672 format!(
673 "Symbol exposure limit exceeded for {} ({:?}): projected {:.2} USDT > limit {:.2} USDT",
674 self.symbol, self.market, projected_notional, max_exposure
675 ),
676 ));
677 }
678 None
679 }
680
681 pub fn would_exceed_symbol_exposure_limit(&self, side: OrderSide, qty: f64) -> bool {
685 self.evaluate_symbol_exposure_limit(side, qty).is_some()
686 }
687
688 pub async fn refresh_balances(&mut self) -> Result<HashMap<String, f64>> {
700 if !self
701 .risk_module
702 .reserve_endpoint_budget(ApiEndpointGroup::Account)
703 {
704 return Err(anyhow::anyhow!(
705 "Account endpoint budget exceeded; try again after reset"
706 ));
707 }
708 if self.market == MarketKind::Futures {
709 let account = self.rest_client.get_futures_account().await?;
710 self.balances.clear();
711 for a in &account.assets {
712 if a.wallet_balance.abs() > f64::EPSILON {
713 self.balances.insert(a.asset.clone(), a.available_balance);
714 }
715 }
716 return Ok(self.balances.clone());
717 }
718 let account = self.rest_client.get_account().await?;
719 self.balances.clear();
720 for b in &account.balances {
721 let total = b.free + b.locked;
722 if total > 0.0 {
723 self.balances.insert(b.asset.clone(), b.free);
724 }
725 }
726 tracing::info!(balances = ?self.balances, "Balances refreshed");
727 Ok(self.balances.clone())
728 }
729
730 pub async fn refresh_order_history(&mut self, limit: usize) -> Result<OrderHistorySnapshot> {
739 if !self
740 .risk_module
741 .reserve_endpoint_budget(ApiEndpointGroup::Orders)
742 {
743 return Err(anyhow::anyhow!(
744 "Orders endpoint budget exceeded; try again after reset"
745 ));
746 }
747 if self.market == MarketKind::Futures {
748 let fetch_started = Instant::now();
749 let fetched_at_ms = chrono::Utc::now().timestamp_millis() as u64;
750 let orders_result = self
751 .rest_client
752 .get_futures_all_orders(&self.symbol, limit)
753 .await;
754 let trades_result = self
755 .rest_client
756 .get_futures_my_trades_history(&self.symbol, limit.max(1))
757 .await;
758 let fetch_latency_ms = fetch_started.elapsed().as_millis() as u64;
759
760 if orders_result.is_err() && trades_result.is_err() {
761 let oe = orders_result.err().unwrap();
762 let te = trades_result.err().unwrap();
763 return Err(anyhow::anyhow!(
764 "futures order history fetch failed: allOrders={} | userTrades={}",
765 oe,
766 te
767 ));
768 }
769
770 let mut orders = orders_result.unwrap_or_default();
771 let trades = trades_result.unwrap_or_default();
772 orders.sort_by_key(|o| o.update_time.max(o.time));
773
774 let storage_key = storage_symbol(&self.symbol, self.market);
775 if let Err(e) = order_store::persist_order_snapshot(&storage_key, &orders, &trades) {
776 tracing::warn!(error = %e, "Failed to persist futures order snapshot to sqlite");
777 }
778
779 let mut order_source_by_id = HashMap::new();
780 for o in &orders {
781 order_source_by_id.insert(
782 o.order_id,
783 source_label_from_client_order_id(&o.client_order_id),
784 );
785 }
786 let mut trades_for_stats = trades.clone();
787 match order_store::load_persisted_trades(&storage_key) {
788 Ok(saved) if !saved.is_empty() => {
789 trades_for_stats = saved.iter().map(|r| r.trade.clone()).collect();
790 for row in saved {
791 order_source_by_id
792 .entry(row.trade.order_id)
793 .or_insert(row.source);
794 }
795 }
796 Ok(_) => {}
797 Err(e) => {
798 tracing::warn!(
799 error = %e,
800 "Failed to load persisted futures trades; using API trades"
801 );
802 }
803 }
804
805 let mut history = Vec::new();
806 let mut fills = Vec::new();
807 for t in &trades {
808 let side = if t.is_buyer { "BUY" } else { "SELL" };
809 let source = order_source_by_id
810 .get(&t.order_id)
811 .cloned()
812 .unwrap_or_else(|| "UNKNOWN".to_string());
813 fills.push(OrderHistoryFill {
814 timestamp_ms: t.time,
815 side: if t.is_buyer {
816 OrderSide::Buy
817 } else {
818 OrderSide::Sell
819 },
820 price: t.price,
821 });
822 history.push(format_order_history_row(
823 t.time,
824 "FILLED",
825 side,
826 t.qty,
827 t.price,
828 &format!("order#{}#T{} [{}]", t.order_id, t.id, source),
829 ));
830 }
831 for o in &orders {
832 if o.executed_qty <= 0.0 {
833 history.push(format_order_history_row(
834 o.update_time.max(o.time),
835 &o.status,
836 &o.side,
837 display_qty_for_history(&o.status, o.orig_qty, o.executed_qty),
838 if o.executed_qty > 0.0 {
839 o.cummulative_quote_qty / o.executed_qty
840 } else {
841 o.price
842 },
843 &o.client_order_id,
844 ));
845 }
846 }
847
848 let mut stats = OrderHistoryStats::default();
849 for t in &trades {
850 if t.realized_pnl > 0.0 {
851 stats.win_count += 1;
852 stats.trade_count += 1;
853 } else if t.realized_pnl < 0.0 {
854 stats.lose_count += 1;
855 stats.trade_count += 1;
856 }
857 stats.realized_pnl += t.realized_pnl;
858 }
859 let open_pos = compute_futures_open_state(trades_for_stats.clone());
860 let open_entry_price = if open_pos.qty > f64::EPSILON {
861 open_pos.cost_quote / open_pos.qty
862 } else {
863 0.0
864 };
865 self.position.side = if open_pos.qty > f64::EPSILON {
866 Some(OrderSide::Buy)
867 } else {
868 None
869 };
870 self.position.qty = open_pos.qty;
871 self.position.entry_price = open_entry_price;
872 self.position.realized_pnl = stats.realized_pnl;
873 if self.last_price > 0.0 {
874 self.position.update_unrealized_pnl(self.last_price);
875 } else {
876 self.position.unrealized_pnl = 0.0;
877 }
878 let estimated_total_pnl_usdt = if self.last_price > 0.0 && open_pos.qty > f64::EPSILON {
879 Some(stats.realized_pnl + (self.last_price - open_entry_price) * open_pos.qty)
880 } else {
881 Some(stats.realized_pnl)
882 };
883 let latest_order_event = orders.iter().map(|o| o.update_time.max(o.time)).max();
884 let latest_trade_event = trades.iter().map(|t| t.time).max();
885 let mut strategy_stats =
886 compute_trade_stats_by_source(trades_for_stats, &order_source_by_id, &storage_key);
887 let persisted_stats = to_persistable_stats_map(&strategy_stats);
888 if let Err(e) =
889 order_store::persist_strategy_symbol_stats(&storage_key, &persisted_stats)
890 {
891 tracing::warn!(error = %e, "Failed to persist strategy stats (futures)");
892 }
893 if strategy_stats.is_empty() {
894 match order_store::load_strategy_symbol_stats(&storage_key) {
895 Ok(persisted) => {
896 strategy_stats = from_persisted_stats_map(persisted);
897 }
898 Err(e) => {
899 tracing::warn!(
900 error = %e,
901 "Failed to load persisted strategy stats (futures)"
902 );
903 }
904 }
905 }
906 return Ok(OrderHistorySnapshot {
907 rows: history,
908 stats,
909 strategy_stats,
910 fills,
911 open_qty: open_pos.qty,
912 open_entry_price,
913 estimated_total_pnl_usdt,
914 trade_data_complete: true,
915 fetched_at_ms,
916 fetch_latency_ms,
917 latest_event_ms: latest_order_event.max(latest_trade_event),
918 });
919 }
920
921 let fetch_started = Instant::now();
922 let fetched_at_ms = chrono::Utc::now().timestamp_millis() as u64;
923 let orders_result = self.rest_client.get_all_orders(&self.symbol, limit).await;
924 let storage_key = storage_symbol(&self.symbol, self.market);
925 let last_trade_id = order_store::load_last_trade_id(&storage_key).ok().flatten();
926 let persisted_trade_count = order_store::load_trade_count(&storage_key).unwrap_or(0);
927 let need_backfill = persisted_trade_count < limit;
928 let trades_result = match (need_backfill, last_trade_id) {
929 (true, _) => {
930 self.rest_client
931 .get_my_trades_history(&self.symbol, limit.max(1))
932 .await
933 }
934 (false, Some(last_id)) => {
935 self.rest_client
936 .get_my_trades_since(&self.symbol, last_id.saturating_add(1), 10)
937 .await
938 }
939 (false, None) => {
940 self.rest_client
941 .get_my_trades_history(&self.symbol, limit.max(1))
942 .await
943 }
944 };
945 let fetch_latency_ms = fetch_started.elapsed().as_millis() as u64;
946 let trade_data_complete = trades_result.is_ok();
947
948 if orders_result.is_err() && trades_result.is_err() {
949 let oe = orders_result.err().unwrap();
950 let te = trades_result.err().unwrap();
951 return Err(anyhow::anyhow!(
952 "order history fetch failed: allOrders={} | myTrades={}",
953 oe,
954 te
955 ));
956 }
957
958 let mut orders = match orders_result {
959 Ok(v) => v,
960 Err(e) => {
961 tracing::warn!(error = %e, "Failed to fetch allOrders; falling back to trade-only history");
962 Vec::new()
963 }
964 };
965 let recent_trades = match trades_result {
966 Ok(t) => t,
967 Err(e) => {
968 tracing::warn!(error = %e, "Failed to fetch myTrades; falling back to order-only history");
969 Vec::new()
970 }
971 };
972 let mut trades = recent_trades.clone();
973 orders.sort_by_key(|o| o.update_time.max(o.time));
974
975 if let Err(e) = order_store::persist_order_snapshot(&storage_key, &orders, &recent_trades) {
976 tracing::warn!(error = %e, "Failed to persist order snapshot to sqlite");
977 }
978 let mut persisted_source_by_order_id: HashMap<u64, String> = HashMap::new();
979 match order_store::load_persisted_trades(&storage_key) {
980 Ok(saved) => {
981 if !saved.is_empty() {
982 trades = saved.iter().map(|r| r.trade.clone()).collect();
983 for row in saved {
984 persisted_source_by_order_id
985 .entry(row.trade.order_id)
986 .or_insert(row.source);
987 }
988 }
989 }
990 Err(e) => {
991 tracing::warn!(error = %e, "Failed to load persisted trades; using recent API trades");
992 }
993 }
994
995 let (stats, open_pos) = compute_trade_state(trades.clone(), &self.symbol);
996 self.position.side = if open_pos.qty > f64::EPSILON {
997 Some(OrderSide::Buy)
998 } else {
999 None
1000 };
1001 self.position.qty = open_pos.qty;
1002 self.position.entry_price = if open_pos.qty > f64::EPSILON {
1003 open_pos.cost_quote / open_pos.qty
1004 } else {
1005 0.0
1006 };
1007 self.position.realized_pnl = stats.realized_pnl;
1008 if self.last_price > 0.0 {
1009 self.position.update_unrealized_pnl(self.last_price);
1010 } else {
1011 self.position.unrealized_pnl = 0.0;
1012 }
1013 let estimated_total_pnl_usdt = if self.last_price > 0.0 {
1014 Some(stats.realized_pnl + (open_pos.qty * self.last_price - open_pos.cost_quote))
1015 } else {
1016 Some(stats.realized_pnl)
1017 };
1018 let latest_order_event = orders.iter().map(|o| o.update_time.max(o.time)).max();
1019 let latest_trade_event = trades.iter().map(|t| t.time).max();
1020 let latest_event_ms = latest_order_event.max(latest_trade_event);
1021
1022 let mut trades_by_order_id: HashMap<u64, Vec<BinanceMyTrade>> = HashMap::new();
1023 for trade in &trades {
1024 trades_by_order_id
1025 .entry(trade.order_id)
1026 .or_default()
1027 .push(trade.clone());
1028 }
1029 for bucket in trades_by_order_id.values_mut() {
1030 bucket.sort_by_key(|t| t.time);
1031 }
1032
1033 let mut order_source_by_id = HashMap::new();
1034 for o in &orders {
1035 order_source_by_id.insert(
1036 o.order_id,
1037 source_label_from_client_order_id(&o.client_order_id),
1038 );
1039 }
1040 for (order_id, source) in persisted_source_by_order_id {
1041 order_source_by_id.entry(order_id).or_insert(source);
1042 }
1043 let mut strategy_stats =
1044 compute_trade_stats_by_source(trades.clone(), &order_source_by_id, &self.symbol);
1045 let persisted_stats = to_persistable_stats_map(&strategy_stats);
1046 if let Err(e) = order_store::persist_strategy_symbol_stats(&storage_key, &persisted_stats) {
1047 tracing::warn!(error = %e, "Failed to persist strategy+symbol scoped stats");
1048 }
1049 if strategy_stats.is_empty() {
1050 match order_store::load_strategy_symbol_stats(&storage_key) {
1051 Ok(persisted) => {
1052 strategy_stats = from_persisted_stats_map(persisted);
1053 }
1054 Err(e) => {
1055 tracing::warn!(error = %e, "Failed to load persisted strategy+symbol stats");
1056 }
1057 }
1058 }
1059
1060 let mut history = Vec::new();
1061 let mut fills = Vec::new();
1062 let mut used_trade_ids = std::collections::HashSet::new();
1063
1064 if orders.is_empty() && !trades.is_empty() {
1065 let mut sorted = trades;
1066 sorted.sort_by_key(|t| (t.time, t.id));
1067 history.extend(sorted.iter().map(|t| {
1068 fills.push(OrderHistoryFill {
1069 timestamp_ms: t.time,
1070 side: if t.is_buyer {
1071 OrderSide::Buy
1072 } else {
1073 OrderSide::Sell
1074 },
1075 price: t.price,
1076 });
1077 format_trade_history_row(
1078 t,
1079 order_source_by_id
1080 .get(&t.order_id)
1081 .map(String::as_str)
1082 .unwrap_or("UNKNOWN"),
1083 )
1084 }));
1085 return Ok(OrderHistorySnapshot {
1086 rows: history,
1087 stats,
1088 strategy_stats,
1089 fills,
1090 open_qty: open_pos.qty,
1091 open_entry_price: if open_pos.qty > f64::EPSILON {
1092 open_pos.cost_quote / open_pos.qty
1093 } else {
1094 0.0
1095 },
1096 estimated_total_pnl_usdt,
1097 trade_data_complete,
1098 fetched_at_ms,
1099 fetch_latency_ms,
1100 latest_event_ms,
1101 });
1102 }
1103
1104 for o in orders {
1105 if o.executed_qty > 0.0 {
1106 if let Some(order_trades) = trades_by_order_id.get(&o.order_id) {
1107 for t in order_trades {
1108 used_trade_ids.insert(t.id);
1109 let side = if t.is_buyer { "BUY" } else { "SELL" };
1110 fills.push(OrderHistoryFill {
1111 timestamp_ms: t.time,
1112 side: if t.is_buyer {
1113 OrderSide::Buy
1114 } else {
1115 OrderSide::Sell
1116 },
1117 price: t.price,
1118 });
1119 history.push(format_order_history_row(
1120 t.time,
1121 "FILLED",
1122 side,
1123 t.qty,
1124 t.price,
1125 &format!(
1126 "{}#T{} [{}]",
1127 o.client_order_id,
1128 t.id,
1129 source_label_from_client_order_id(&o.client_order_id)
1130 ),
1131 ));
1132 }
1133 continue;
1134 }
1135 }
1136
1137 let avg_price = if o.executed_qty > 0.0 {
1138 o.cummulative_quote_qty / o.executed_qty
1139 } else {
1140 o.price
1141 };
1142 history.push(format_order_history_row(
1143 o.update_time.max(o.time),
1144 &o.status,
1145 &o.side,
1146 display_qty_for_history(&o.status, o.orig_qty, o.executed_qty),
1147 avg_price,
1148 &o.client_order_id,
1149 ));
1150 }
1151
1152 for bucket in trades_by_order_id.values() {
1154 for t in bucket {
1155 if !used_trade_ids.contains(&t.id) {
1156 fills.push(OrderHistoryFill {
1157 timestamp_ms: t.time,
1158 side: if t.is_buyer {
1159 OrderSide::Buy
1160 } else {
1161 OrderSide::Sell
1162 },
1163 price: t.price,
1164 });
1165 history.push(format_trade_history_row(
1166 t,
1167 order_source_by_id
1168 .get(&t.order_id)
1169 .map(String::as_str)
1170 .unwrap_or("UNKNOWN"),
1171 ));
1172 }
1173 }
1174 }
1175 Ok(OrderHistorySnapshot {
1176 rows: history,
1177 stats,
1178 strategy_stats,
1179 fills,
1180 open_qty: open_pos.qty,
1181 open_entry_price: if open_pos.qty > f64::EPSILON {
1182 open_pos.cost_quote / open_pos.qty
1183 } else {
1184 0.0
1185 },
1186 estimated_total_pnl_usdt,
1187 trade_data_complete,
1188 fetched_at_ms,
1189 fetch_latency_ms,
1190 latest_event_ms,
1191 })
1192 }
1193
1194 pub async fn submit_order(
1219 &mut self,
1220 signal: Signal,
1221 source_tag: &str,
1222 ) -> Result<Option<OrderUpdate>> {
1223 let side = match &signal {
1224 Signal::Buy => OrderSide::Buy,
1225 Signal::Sell => OrderSide::Sell,
1226 Signal::Hold => return Ok(None),
1227 };
1228 let source_tag = source_tag.to_ascii_lowercase();
1229 let intent = OrderIntent {
1230 intent_id: format!("intent-{}", &uuid::Uuid::new_v4().to_string()[..8]),
1231 source_tag: source_tag.clone(),
1232 symbol: self.symbol.clone(),
1233 market: self.market,
1234 side,
1235 order_amount_usdt: self.order_amount_usdt,
1236 last_price: self.last_price,
1237 created_at_ms: chrono::Utc::now().timestamp_millis() as u64,
1238 };
1239 if let Some((reason_code, reason)) =
1240 self.evaluate_strategy_limits(&intent.source_tag, intent.created_at_ms)
1241 {
1242 return Ok(Some(OrderUpdate::Rejected {
1243 intent_id: intent.intent_id.clone(),
1244 client_order_id: "n/a".to_string(),
1245 reason_code,
1246 reason,
1247 }));
1248 }
1249 let decision = self
1250 .risk_module
1251 .evaluate_intent(&intent, &self.balances)
1252 .await?;
1253 if !decision.approved {
1254 return Ok(Some(OrderUpdate::Rejected {
1255 intent_id: intent.intent_id.clone(),
1256 client_order_id: "n/a".to_string(),
1257 reason_code: decision
1258 .reason_code
1259 .unwrap_or_else(|| RejectionReasonCode::RiskUnknown.as_str().to_string()),
1260 reason: decision
1261 .reason
1262 .unwrap_or_else(|| "Rejected by RiskModule".to_string()),
1263 }));
1264 }
1265 if !self.risk_module.reserve_rate_budget() {
1266 return Ok(Some(OrderUpdate::Rejected {
1267 intent_id: intent.intent_id.clone(),
1268 client_order_id: "n/a".to_string(),
1269 reason_code: RejectionReasonCode::RateGlobalBudgetExceeded
1270 .as_str()
1271 .to_string(),
1272 reason: "Global rate budget exceeded; try again after reset".to_string(),
1273 }));
1274 }
1275 if !self
1276 .risk_module
1277 .reserve_endpoint_budget(ApiEndpointGroup::Orders)
1278 {
1279 return Ok(Some(OrderUpdate::Rejected {
1280 intent_id: intent.intent_id.clone(),
1281 client_order_id: "n/a".to_string(),
1282 reason_code: RejectionReasonCode::RateEndpointBudgetExceeded
1283 .as_str()
1284 .to_string(),
1285 reason: "Orders endpoint budget exceeded; try again after reset".to_string(),
1286 }));
1287 }
1288 let qty = decision.normalized_qty;
1289 if let Some((reason_code, reason)) = self.evaluate_symbol_exposure_limit(side, qty) {
1290 return Ok(Some(OrderUpdate::Rejected {
1291 intent_id: intent.intent_id.clone(),
1292 client_order_id: "n/a".to_string(),
1293 reason_code,
1294 reason,
1295 }));
1296 }
1297 self.mark_strategy_submit(&intent.source_tag, intent.created_at_ms);
1298
1299 let client_order_id = format!(
1300 "sq-{}-{}",
1301 intent.source_tag,
1302 &uuid::Uuid::new_v4().to_string()[..8]
1303 );
1304
1305 let order = Order {
1306 client_order_id: client_order_id.clone(),
1307 server_order_id: None,
1308 symbol: self.symbol.clone(),
1309 side,
1310 order_type: OrderType::Market,
1311 quantity: qty,
1312 price: None,
1313 status: OrderStatus::PendingSubmit,
1314 created_at: chrono::Utc::now(),
1315 updated_at: chrono::Utc::now(),
1316 fills: vec![],
1317 };
1318
1319 self.active_orders.insert(client_order_id.clone(), order);
1320
1321 tracing::info!(
1322 side = %side,
1323 qty,
1324 usdt_amount = intent.order_amount_usdt,
1325 price = intent.last_price,
1326 intent_id = %intent.intent_id,
1327 created_at_ms = intent.created_at_ms,
1328 "Submitting order"
1329 );
1330
1331 let submit_res = if self.market == MarketKind::Futures {
1332 self.rest_client
1333 .place_futures_market_order(&self.symbol, side, qty, &client_order_id)
1334 .await
1335 } else {
1336 self.rest_client
1337 .place_market_order(&self.symbol, side, qty, &client_order_id)
1338 .await
1339 };
1340
1341 match submit_res {
1342 Ok(response) => {
1343 let update = self.process_order_response(
1344 &intent.intent_id,
1345 &client_order_id,
1346 side,
1347 &response,
1348 );
1349
1350 if matches!(update, OrderUpdate::Filled { .. }) {
1352 if let Err(e) = self.refresh_balances().await {
1353 tracing::warn!(error = %e, "Failed to refresh balances after fill");
1354 }
1355 }
1356
1357 Ok(Some(update))
1358 }
1359 Err(e) => {
1360 tracing::error!(
1361 client_order_id,
1362 error = %e,
1363 "Order rejected"
1364 );
1365 if let Some(order) = self.active_orders.get_mut(&client_order_id) {
1366 order.status = OrderStatus::Rejected;
1367 order.updated_at = chrono::Utc::now();
1368 }
1369 Ok(Some(OrderUpdate::Rejected {
1370 intent_id: intent.intent_id.clone(),
1371 client_order_id,
1372 reason_code: RejectionReasonCode::BrokerSubmitFailed.as_str().to_string(),
1373 reason: e.to_string(),
1374 }))
1375 }
1376 }
1377 }
1378
1379 pub async fn place_protective_stop_for_open_position(
1384 &mut self,
1385 source_tag: &str,
1386 stop_price: f64,
1387 ) -> Result<Option<String>> {
1388 if self.position.is_flat() {
1389 return Ok(None);
1390 }
1391 let stop_side = match self.position.side {
1392 Some(OrderSide::Buy) => OrderSide::Sell,
1393 Some(OrderSide::Sell) => OrderSide::Buy,
1394 None => return Ok(None),
1395 };
1396 let qty = self.position.qty.max(0.0);
1397 if qty <= f64::EPSILON {
1398 return Ok(None);
1399 }
1400
1401 if self.market != MarketKind::Futures {
1402 tracing::warn!(
1403 symbol = %self.symbol,
1404 market = %normalize_market_label(self.market),
1405 source_tag = %source_tag,
1406 stop_price,
1407 "Spot protective stop placement is not implemented yet; skipping"
1408 );
1409 return Ok(None);
1410 }
1411
1412 let client_order_id = format!(
1413 "sq-{}-stp-{}",
1414 source_tag.to_ascii_lowercase(),
1415 &uuid::Uuid::new_v4().to_string()[..8]
1416 );
1417 let res = self
1418 .rest_client
1419 .place_futures_stop_market_order(
1420 &self.symbol,
1421 stop_side,
1422 qty,
1423 stop_price,
1424 &client_order_id,
1425 )
1426 .await?;
1427 tracing::info!(
1428 symbol = %self.symbol,
1429 stop_order_id = res.order_id,
1430 stop_side = %stop_side,
1431 stop_price,
1432 qty,
1433 source_tag = %source_tag,
1434 "Protective stop order submitted"
1435 );
1436 Ok(Some(res.order_id.to_string()))
1437 }
1438
1439 pub async fn ensure_protective_stop(
1445 &mut self,
1446 source_tag: &str,
1447 fallback_stop_price: f64,
1448 ) -> Result<bool> {
1449 if self.position.is_flat() {
1450 return Ok(true);
1451 }
1452 Ok(self
1453 .place_protective_stop_for_open_position(source_tag, fallback_stop_price)
1454 .await?
1455 .is_some())
1456 }
1457
1458 pub async fn emergency_close_position(
1462 &mut self,
1463 source_tag: &str,
1464 reason_code: &str,
1465 ) -> Result<Option<OrderUpdate>> {
1466 if self.position.is_flat() {
1467 return Ok(None);
1468 }
1469 let _ = self.refresh_balances().await;
1472 if self.last_price <= f64::EPSILON && self.position.entry_price > f64::EPSILON {
1473 self.last_price = self.position.entry_price;
1474 self.position.update_unrealized_pnl(self.last_price);
1475 }
1476 tracing::warn!(
1477 symbol = %self.symbol,
1478 market = %normalize_market_label(self.market),
1479 source_tag = %source_tag,
1480 reason_code = %reason_code,
1481 "Emergency close triggered"
1482 );
1483 self.submit_order(Signal::Sell, source_tag).await
1484 }
1485
1486 fn process_order_response(
1487 &mut self,
1488 intent_id: &str,
1489 client_order_id: &str,
1490 side: OrderSide,
1491 response: &BinanceOrderResponse,
1492 ) -> OrderUpdate {
1493 let fills: Vec<Fill> = response
1494 .fills
1495 .iter()
1496 .map(|f| Fill {
1497 price: f.price,
1498 qty: f.qty,
1499 commission: f.commission,
1500 commission_asset: f.commission_asset.clone(),
1501 })
1502 .collect();
1503
1504 let status = OrderStatus::from_binance_str(&response.status);
1505
1506 if let Some(order) = self.active_orders.get_mut(client_order_id) {
1507 order.server_order_id = Some(response.order_id);
1508 order.status = status;
1509 order.fills = fills.clone();
1510 order.updated_at = chrono::Utc::now();
1511 }
1512
1513 if status == OrderStatus::Filled || status == OrderStatus::PartiallyFilled {
1514 self.position.apply_fill(side, &fills);
1515
1516 let avg_price = if fills.is_empty() {
1517 0.0
1518 } else {
1519 let total_value: f64 = fills.iter().map(|f| f.price * f.qty).sum();
1520 let total_qty: f64 = fills.iter().map(|f| f.qty).sum();
1521 total_value / total_qty
1522 };
1523
1524 tracing::info!(
1525 client_order_id,
1526 order_id = response.order_id,
1527 side = %side,
1528 avg_price,
1529 filled_qty = response.executed_qty,
1530 "Order filled"
1531 );
1532
1533 OrderUpdate::Filled {
1534 intent_id: intent_id.to_string(),
1535 client_order_id: client_order_id.to_string(),
1536 side,
1537 fills,
1538 avg_price,
1539 }
1540 } else {
1541 OrderUpdate::Submitted {
1542 intent_id: intent_id.to_string(),
1543 client_order_id: client_order_id.to_string(),
1544 server_order_id: response.order_id,
1545 }
1546 }
1547 }
1548}
1549
1550#[cfg(test)]
1551mod tests {
1552 use super::{
1553 compute_trade_stats_by_source, display_qty_for_history, split_symbol_assets, OrderManager,
1554 };
1555 use crate::binance::rest::BinanceRestClient;
1556 use crate::binance::types::BinanceMyTrade;
1557 use crate::config::{EndpointRateLimitConfig, RiskConfig, SymbolExposureLimitConfig};
1558 use crate::model::order::{Order, OrderSide, OrderStatus, OrderType};
1559 use std::sync::Arc;
1560
1561 fn build_test_order_manager() -> OrderManager {
1562 let rest = Arc::new(BinanceRestClient::new(
1563 "https://demo-api.binance.com",
1564 "https://demo-fapi.binance.com",
1565 "k",
1566 "s",
1567 "fk",
1568 "fs",
1569 5000,
1570 ));
1571 let risk = RiskConfig {
1572 global_rate_limit_per_minute: 600,
1573 default_strategy_cooldown_ms: 3_000,
1574 default_strategy_max_active_orders: 1,
1575 default_symbol_max_exposure_usdt: 200.0,
1576 strategy_limits: vec![],
1577 symbol_exposure_limits: vec![SymbolExposureLimitConfig {
1578 symbol: "BTCUSDT".to_string(),
1579 market: Some("spot".to_string()),
1580 max_exposure_usdt: 150.0,
1581 }],
1582 endpoint_rate_limits: EndpointRateLimitConfig {
1583 orders_per_minute: 240,
1584 account_per_minute: 180,
1585 market_data_per_minute: 360,
1586 },
1587 };
1588 OrderManager::new(
1589 rest,
1590 "BTCUSDT",
1591 crate::order_manager::MarketKind::Spot,
1592 10.0,
1593 &risk,
1594 )
1595 }
1596
1597 #[test]
1598 fn valid_state_transitions() {
1599 let from = OrderStatus::PendingSubmit;
1601 let to = OrderStatus::Submitted;
1602 assert!(!from.is_terminal());
1603 assert!(!to.is_terminal());
1604
1605 let to = OrderStatus::Filled;
1607 assert!(to.is_terminal());
1608
1609 let to = OrderStatus::Rejected;
1611 assert!(to.is_terminal());
1612
1613 let to = OrderStatus::Cancelled;
1615 assert!(to.is_terminal());
1616 }
1617
1618 #[test]
1619 fn from_binance_str_mapping() {
1620 assert_eq!(OrderStatus::from_binance_str("NEW"), OrderStatus::Submitted);
1621 assert_eq!(OrderStatus::from_binance_str("FILLED"), OrderStatus::Filled);
1622 assert_eq!(
1623 OrderStatus::from_binance_str("CANCELED"),
1624 OrderStatus::Cancelled
1625 );
1626 assert_eq!(
1627 OrderStatus::from_binance_str("REJECTED"),
1628 OrderStatus::Rejected
1629 );
1630 assert_eq!(
1631 OrderStatus::from_binance_str("EXPIRED"),
1632 OrderStatus::Expired
1633 );
1634 assert_eq!(
1635 OrderStatus::from_binance_str("PARTIALLY_FILLED"),
1636 OrderStatus::PartiallyFilled
1637 );
1638 }
1639
1640 #[test]
1641 fn order_history_uses_executed_qty_for_filled_states() {
1642 assert!((display_qty_for_history("FILLED", 1.0, 0.4) - 0.4).abs() < f64::EPSILON);
1643 assert!((display_qty_for_history("PARTIALLY_FILLED", 1.0, 0.4) - 0.4).abs() < f64::EPSILON);
1644 }
1645
1646 #[test]
1647 fn order_history_uses_orig_qty_for_non_filled_states() {
1648 assert!((display_qty_for_history("NEW", 1.0, 0.4) - 1.0).abs() < f64::EPSILON);
1649 assert!((display_qty_for_history("CANCELED", 1.0, 0.4) - 1.0).abs() < f64::EPSILON);
1650 assert!((display_qty_for_history("REJECTED", 1.0, 0.0) - 1.0).abs() < f64::EPSILON);
1651 }
1652
1653 #[test]
1654 fn split_symbol_assets_parses_known_quote_suffixes() {
1655 assert_eq!(
1656 split_symbol_assets("ETHUSDT"),
1657 ("ETH".to_string(), "USDT".to_string())
1658 );
1659 assert_eq!(
1660 split_symbol_assets("ETHBTC"),
1661 ("ETH".to_string(), "BTC".to_string())
1662 );
1663 }
1664
1665 #[test]
1666 fn split_symbol_assets_falls_back_when_quote_unknown() {
1667 assert_eq!(
1668 split_symbol_assets("FOOBAR"),
1669 ("FOOBAR".to_string(), String::new())
1670 );
1671 }
1672
1673 #[test]
1674 fn strategy_limit_rejects_when_active_orders_reach_limit() {
1675 let mut mgr = build_test_order_manager();
1676 let client_order_id = "sq-cfg-abcdef12".to_string();
1677 mgr.active_orders.insert(
1678 client_order_id.clone(),
1679 Order {
1680 client_order_id,
1681 server_order_id: None,
1682 symbol: "BTCUSDT".to_string(),
1683 side: OrderSide::Buy,
1684 order_type: OrderType::Market,
1685 quantity: 0.1,
1686 price: None,
1687 status: OrderStatus::Submitted,
1688 created_at: chrono::Utc::now(),
1689 updated_at: chrono::Utc::now(),
1690 fills: vec![],
1691 },
1692 );
1693
1694 let rejected = mgr
1695 .evaluate_strategy_limits("cfg", chrono::Utc::now().timestamp_millis() as u64)
1696 .expect("must be rejected");
1697 assert_eq!(
1698 rejected.0,
1699 "risk.strategy_max_active_orders_exceeded".to_string()
1700 );
1701 }
1702
1703 #[test]
1704 fn strategy_limit_rejects_during_cooldown_window() {
1705 let mut mgr = build_test_order_manager();
1706 let now = chrono::Utc::now().timestamp_millis() as u64;
1707 mgr.mark_strategy_submit("cfg", now);
1708
1709 let rejected = mgr
1710 .evaluate_strategy_limits("cfg", now + 500)
1711 .expect("must be rejected");
1712 assert_eq!(rejected.0, "risk.strategy_cooldown_active".to_string());
1713 }
1714
1715 #[test]
1716 fn symbol_exposure_limit_rejects_when_projected_notional_exceeds_limit() {
1717 let mut mgr = build_test_order_manager();
1718 mgr.last_price = 100.0;
1719 let rejected = mgr
1721 .evaluate_symbol_exposure_limit(OrderSide::Buy, 2.0)
1722 .expect("must be rejected");
1723 assert_eq!(
1724 rejected.0,
1725 "risk.symbol_exposure_limit_exceeded".to_string()
1726 );
1727 }
1728
1729 #[test]
1730 fn symbol_exposure_limit_allows_risk_reducing_order() {
1731 let mut mgr = build_test_order_manager();
1732 mgr.last_price = 100.0;
1733 mgr.position.side = Some(OrderSide::Buy);
1734 mgr.position.qty = 2.0; let rejected = mgr.evaluate_symbol_exposure_limit(OrderSide::Sell, 1.0);
1738 assert!(rejected.is_none());
1739 }
1740
1741 #[test]
1742 fn futures_trade_stats_by_source_use_realized_pnl() {
1743 let trades = vec![
1744 BinanceMyTrade {
1745 symbol: "XRPUSDT".to_string(),
1746 id: 1,
1747 order_id: 1001,
1748 price: 1.0,
1749 qty: 100.0,
1750 commission: 0.0,
1751 commission_asset: "USDT".to_string(),
1752 time: 1,
1753 is_buyer: false,
1754 is_maker: false,
1755 realized_pnl: 5.0,
1756 },
1757 BinanceMyTrade {
1758 symbol: "XRPUSDT".to_string(),
1759 id: 2,
1760 order_id: 1002,
1761 price: 1.0,
1762 qty: 100.0,
1763 commission: 0.0,
1764 commission_asset: "USDT".to_string(),
1765 time: 2,
1766 is_buyer: false,
1767 is_maker: false,
1768 realized_pnl: -2.5,
1769 },
1770 ];
1771 let mut source_by_order = std::collections::HashMap::new();
1772 source_by_order.insert(1001, "c20".to_string());
1773 source_by_order.insert(1002, "c20".to_string());
1774
1775 let stats = compute_trade_stats_by_source(trades, &source_by_order, "XRPUSDT#FUT");
1776 let c20 = stats.get("c20").expect("source tag must exist");
1777 assert_eq!(c20.trade_count, 2);
1778 assert_eq!(c20.win_count, 1);
1779 assert_eq!(c20.lose_count, 1);
1780 assert!((c20.realized_pnl - 2.5).abs() < f64::EPSILON);
1781 }
1782}