1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Instant;
4
5use anyhow::Result;
6use chrono::TimeZone;
7
8use crate::binance::rest::BinanceRestClient;
9use crate::binance::types::{BinanceMyTrade, BinanceOrderResponse};
10use crate::config::RiskConfig;
11use crate::model::order::{Fill, Order, OrderSide, OrderStatus, OrderType};
12use crate::model::position::Position;
13use crate::model::signal::Signal;
14use crate::order_store;
15use crate::risk_module::{
16 ApiEndpointGroup, EndpointRateLimits, OrderIntent, RateBudgetSnapshot, RejectionReasonCode,
17 RiskModule,
18};
19
20pub use crate::risk_module::MarketKind;
21
22#[derive(Debug, Clone)]
23pub enum OrderUpdate {
24 Submitted {
25 intent_id: String,
26 client_order_id: String,
27 server_order_id: u64,
28 },
29 Filled {
30 intent_id: String,
31 client_order_id: String,
32 side: OrderSide,
33 fills: Vec<Fill>,
34 avg_price: f64,
35 },
36 Rejected {
37 intent_id: String,
38 client_order_id: String,
39 reason_code: String,
40 reason: String,
41 },
42}
43
44#[derive(Debug, Clone, Default)]
45pub struct OrderHistoryStats {
46 pub trade_count: u32,
47 pub win_count: u32,
48 pub lose_count: u32,
49 pub realized_pnl: f64,
50}
51
52#[derive(Debug, Clone, Default)]
53pub struct OrderHistorySnapshot {
54 pub rows: Vec<String>,
55 pub stats: OrderHistoryStats,
56 pub strategy_stats: HashMap<String, OrderHistoryStats>,
57 pub fills: Vec<OrderHistoryFill>,
58 pub open_qty: f64,
59 pub open_entry_price: f64,
60 pub estimated_total_pnl_usdt: Option<f64>,
61 pub trade_data_complete: bool,
62 pub fetched_at_ms: u64,
63 pub fetch_latency_ms: u64,
64 pub latest_event_ms: Option<u64>,
65}
66
67#[derive(Debug, Clone)]
68pub struct OrderHistoryFill {
69 pub timestamp_ms: u64,
70 pub side: OrderSide,
71 pub price: f64,
72}
73
74pub struct OrderManager {
75 rest_client: Arc<BinanceRestClient>,
76 active_orders: HashMap<String, Order>,
77 position: Position,
78 symbol: String,
79 market: MarketKind,
80 order_amount_usdt: f64,
81 balances: HashMap<String, f64>,
82 last_price: f64,
83 risk_module: RiskModule,
84 default_strategy_cooldown_ms: u64,
85 default_strategy_max_active_orders: u32,
86 strategy_limits_by_tag: HashMap<String, StrategyExecutionLimit>,
87 last_strategy_submit_ms: HashMap<String, u64>,
88 default_symbol_max_exposure_usdt: f64,
89 symbol_exposure_limit_by_key: HashMap<String, f64>,
90}
91
92#[derive(Debug, Clone, Copy)]
93struct StrategyExecutionLimit {
94 cooldown_ms: u64,
95 max_active_orders: u32,
96}
97
98fn normalize_market_label(market: MarketKind) -> &'static str {
99 match market {
100 MarketKind::Spot => "spot",
101 MarketKind::Futures => "futures",
102 }
103}
104
105fn symbol_limit_key(symbol: &str, market: MarketKind) -> String {
106 format!(
107 "{}:{}",
108 symbol.trim().to_ascii_uppercase(),
109 normalize_market_label(market)
110 )
111}
112
113fn storage_symbol(symbol: &str, market: MarketKind) -> String {
114 match market {
115 MarketKind::Spot => symbol.to_string(),
116 MarketKind::Futures => format!("{}#FUT", symbol),
117 }
118}
119
120fn display_qty_for_history(status: &str, orig_qty: f64, executed_qty: f64) -> f64 {
121 match status {
122 "FILLED" | "PARTIALLY_FILLED" => executed_qty,
123 _ => orig_qty,
124 }
125}
126
127fn format_history_time(timestamp_ms: u64) -> String {
128 chrono::Utc
129 .timestamp_millis_opt(timestamp_ms as i64)
130 .single()
131 .map(|dt| {
132 dt.with_timezone(&chrono::Local)
133 .format("%H:%M:%S")
134 .to_string()
135 })
136 .unwrap_or_else(|| "--:--:--".to_string())
137}
138
139fn format_order_history_row(
140 timestamp_ms: u64,
141 status: &str,
142 side: &str,
143 qty: f64,
144 avg_price: f64,
145 client_order_id: &str,
146) -> String {
147 format!(
148 "{} {:<10} {:<4} {:.5} @ {:.2} {}",
149 format_history_time(timestamp_ms),
150 status,
151 side,
152 qty,
153 avg_price,
154 client_order_id
155 )
156}
157
158fn source_label_from_client_order_id(client_order_id: &str) -> &'static str {
159 if client_order_id.contains("-mnl-") {
160 "MANUAL"
161 } else if client_order_id.contains("-cfg-") {
162 "MA(Config)"
163 } else if client_order_id.contains("-fst-") {
164 "MA(Fast 5/20)"
165 } else if client_order_id.contains("-slw-") {
166 "MA(Slow 20/60)"
167 } else {
168 "UNKNOWN"
169 }
170}
171
172fn format_trade_history_row(t: &BinanceMyTrade, source: &str) -> String {
173 let side = if t.is_buyer { "BUY" } else { "SELL" };
174 format_order_history_row(
175 t.time,
176 "FILLED",
177 side,
178 t.qty,
179 t.price,
180 &format!("order#{}#T{} [{}]", t.order_id, t.id, source),
181 )
182}
183
184fn split_symbol_assets(symbol: &str) -> (String, String) {
185 const QUOTE_SUFFIXES: [&str; 10] = [
186 "USDT", "USDC", "FDUSD", "BUSD", "TUSD", "TRY", "EUR", "BTC", "ETH", "BNB",
187 ];
188 for q in QUOTE_SUFFIXES {
189 if let Some(base) = symbol.strip_suffix(q) {
190 if !base.is_empty() {
191 return (base.to_string(), q.to_string());
192 }
193 }
194 }
195 (symbol.to_string(), String::new())
196}
197
198#[derive(Clone, Copy, Default)]
199struct LongPos {
200 qty: f64,
201 cost_quote: f64,
202}
203
204fn apply_spot_trade_with_fee(
205 pos: &mut LongPos,
206 stats: &mut OrderHistoryStats,
207 t: &BinanceMyTrade,
208 base_asset: &str,
209 quote_asset: &str,
210) {
211 let qty = t.qty.max(0.0);
212 if qty <= f64::EPSILON {
213 return;
214 }
215 let fee_asset = t.commission_asset.as_str();
216 let fee_is_base = !base_asset.is_empty() && fee_asset.eq_ignore_ascii_case(base_asset);
217 let fee_is_quote = !quote_asset.is_empty() && fee_asset.eq_ignore_ascii_case(quote_asset);
218
219 if t.is_buyer {
220 let net_qty = (qty
221 - if fee_is_base {
222 t.commission.max(0.0)
223 } else {
224 0.0
225 })
226 .max(0.0);
227 if net_qty <= f64::EPSILON {
228 return;
229 }
230 let fee_quote = if fee_is_quote {
231 t.commission.max(0.0)
232 } else {
233 0.0
234 };
235 pos.qty += net_qty;
236 pos.cost_quote += qty * t.price + fee_quote;
237 return;
238 }
239
240 if pos.qty <= f64::EPSILON {
242 return;
243 }
244 let close_qty = qty.min(pos.qty);
245 if close_qty <= f64::EPSILON {
246 return;
247 }
248 let avg_cost = pos.cost_quote / pos.qty.max(f64::EPSILON);
249 let fee_quote_total = if fee_is_quote {
250 t.commission.max(0.0)
251 } else if fee_is_base {
252 t.commission.max(0.0) * t.price
254 } else {
255 0.0
256 };
257 let fee_quote = fee_quote_total * (close_qty / qty.max(f64::EPSILON));
258 let pnl_delta = (close_qty * t.price - fee_quote) - (avg_cost * close_qty);
259 if pnl_delta > 0.0 {
260 stats.win_count += 1;
261 stats.trade_count += 1;
262 } else if pnl_delta < 0.0 {
263 stats.lose_count += 1;
264 stats.trade_count += 1;
265 }
266 stats.realized_pnl += pnl_delta;
267
268 pos.qty -= close_qty;
269 pos.cost_quote -= avg_cost * close_qty;
270 if pos.qty <= f64::EPSILON {
271 pos.qty = 0.0;
272 pos.cost_quote = 0.0;
273 }
274}
275
276fn compute_trade_state(
277 mut trades: Vec<BinanceMyTrade>,
278 symbol: &str,
279) -> (OrderHistoryStats, LongPos) {
280 trades.sort_by_key(|t| (t.time, t.id));
281 let (base_asset, quote_asset) = split_symbol_assets(symbol);
282 let mut pos = LongPos::default();
283 let mut stats = OrderHistoryStats::default();
284 for t in trades {
285 apply_spot_trade_with_fee(&mut pos, &mut stats, &t, &base_asset, "e_asset);
286 }
287 (stats, pos)
288}
289
290fn compute_trade_stats_by_source(
291 mut trades: Vec<BinanceMyTrade>,
292 order_source_by_id: &HashMap<u64, String>,
293 symbol: &str,
294) -> HashMap<String, OrderHistoryStats> {
295 trades.sort_by_key(|t| (t.time, t.id));
296 let (base_asset, quote_asset) = split_symbol_assets(symbol);
297 let mut pos_by_source: HashMap<String, LongPos> = HashMap::new();
298 let mut stats_by_source: HashMap<String, OrderHistoryStats> = HashMap::new();
299
300 for t in trades {
301 let source = order_source_by_id
302 .get(&t.order_id)
303 .cloned()
304 .unwrap_or_else(|| "UNKNOWN".to_string());
305 let pos = pos_by_source.entry(source.clone()).or_default();
306 let stats = stats_by_source.entry(source).or_default();
307 apply_spot_trade_with_fee(pos, stats, &t, &base_asset, "e_asset);
308 }
309
310 stats_by_source
311}
312
313fn to_persistable_stats_map(
314 strategy_stats: &HashMap<String, OrderHistoryStats>,
315) -> HashMap<String, order_store::StrategyScopedStats> {
316 strategy_stats
317 .iter()
318 .map(|(k, v)| {
319 (
320 k.clone(),
321 order_store::StrategyScopedStats {
322 trade_count: v.trade_count,
323 win_count: v.win_count,
324 lose_count: v.lose_count,
325 realized_pnl: v.realized_pnl,
326 },
327 )
328 })
329 .collect()
330}
331
332fn from_persisted_stats_map(
333 persisted: HashMap<String, order_store::StrategyScopedStats>,
334) -> HashMap<String, OrderHistoryStats> {
335 persisted
336 .into_iter()
337 .map(|(k, v)| {
338 (
339 k,
340 OrderHistoryStats {
341 trade_count: v.trade_count,
342 win_count: v.win_count,
343 lose_count: v.lose_count,
344 realized_pnl: v.realized_pnl,
345 },
346 )
347 })
348 .collect()
349}
350
351impl OrderManager {
352 pub fn new(
361 rest_client: Arc<BinanceRestClient>,
362 symbol: &str,
363 market: MarketKind,
364 order_amount_usdt: f64,
365 risk_config: &RiskConfig,
366 ) -> Self {
367 let mut strategy_limits_by_tag = HashMap::new();
368 let mut symbol_exposure_limit_by_key = HashMap::new();
369 let default_strategy_cooldown_ms = risk_config.default_strategy_cooldown_ms;
370 let default_strategy_max_active_orders = risk_config.default_strategy_max_active_orders.max(1);
371 let default_symbol_max_exposure_usdt = risk_config.default_symbol_max_exposure_usdt.max(0.0);
372 for profile in &risk_config.strategy_limits {
373 let source_tag = profile.source_tag.trim().to_ascii_lowercase();
374 if source_tag.is_empty() {
375 continue;
376 }
377 strategy_limits_by_tag.insert(
378 source_tag,
379 StrategyExecutionLimit {
380 cooldown_ms: profile
381 .cooldown_ms
382 .unwrap_or(default_strategy_cooldown_ms),
383 max_active_orders: profile
384 .max_active_orders
385 .unwrap_or(default_strategy_max_active_orders)
386 .max(1),
387 },
388 );
389 }
390 for limit in &risk_config.symbol_exposure_limits {
391 let symbol = limit.symbol.trim().to_ascii_uppercase();
392 if symbol.is_empty() {
393 continue;
394 }
395 let market = match limit
396 .market
397 .as_deref()
398 .unwrap_or("spot")
399 .trim()
400 .to_ascii_lowercase()
401 .as_str()
402 {
403 "spot" => MarketKind::Spot,
404 "futures" | "future" | "fut" => MarketKind::Futures,
405 _ => continue,
406 };
407 symbol_exposure_limit_by_key.insert(
408 symbol_limit_key(&symbol, market),
409 limit.max_exposure_usdt.max(0.0),
410 );
411 }
412 Self {
413 rest_client: rest_client.clone(),
414 active_orders: HashMap::new(),
415 position: Position::new(symbol.to_string()),
416 symbol: symbol.to_string(),
417 market,
418 order_amount_usdt,
419 balances: HashMap::new(),
420 last_price: 0.0,
421 risk_module: RiskModule::new(
422 rest_client.clone(),
423 risk_config.global_rate_limit_per_minute,
424 EndpointRateLimits {
425 orders_per_minute: risk_config.endpoint_rate_limits.orders_per_minute,
426 account_per_minute: risk_config.endpoint_rate_limits.account_per_minute,
427 market_data_per_minute: risk_config
428 .endpoint_rate_limits
429 .market_data_per_minute,
430 },
431 ),
432 default_strategy_cooldown_ms,
433 default_strategy_max_active_orders,
434 strategy_limits_by_tag,
435 last_strategy_submit_ms: HashMap::new(),
436 default_symbol_max_exposure_usdt,
437 symbol_exposure_limit_by_key,
438 }
439 }
440
441 pub fn position(&self) -> &Position {
446 &self.position
447 }
448
449 pub fn balances(&self) -> &HashMap<String, f64> {
454 &self.balances
455 }
456
457 pub fn update_unrealized_pnl(&mut self, current_price: f64) {
463 self.last_price = current_price;
464 self.position.update_unrealized_pnl(current_price);
465 }
466
467 pub fn rate_budget_snapshot(&self) -> RateBudgetSnapshot {
471 self.risk_module.rate_budget_snapshot()
472 }
473
474 fn strategy_limits_for(&self, source_tag: &str) -> StrategyExecutionLimit {
475 self.strategy_limits_by_tag
476 .get(source_tag)
477 .copied()
478 .unwrap_or(StrategyExecutionLimit {
479 cooldown_ms: self.default_strategy_cooldown_ms,
480 max_active_orders: self.default_strategy_max_active_orders,
481 })
482 }
483
484 fn active_order_count_for_source(&self, source_tag: &str) -> u32 {
485 let prefix = format!("sq-{}-", source_tag);
486 self.active_orders
487 .values()
488 .filter(|o| !o.status.is_terminal() && o.client_order_id.starts_with(&prefix))
489 .count() as u32
490 }
491
492 fn evaluate_strategy_limits(
493 &self,
494 source_tag: &str,
495 created_at_ms: u64,
496 ) -> Option<(String, String)> {
497 let limits = self.strategy_limits_for(source_tag);
498 let active_count = self.active_order_count_for_source(source_tag);
499 if active_count >= limits.max_active_orders {
500 return Some((
501 RejectionReasonCode::RiskStrategyMaxActiveOrdersExceeded
502 .as_str()
503 .to_string(),
504 format!(
505 "Strategy '{}' active order limit exceeded (active {}, limit {})",
506 source_tag, active_count, limits.max_active_orders
507 ),
508 ));
509 }
510
511 if limits.cooldown_ms > 0 {
512 if let Some(last_submit_ms) = self.last_strategy_submit_ms.get(source_tag) {
513 let elapsed = created_at_ms.saturating_sub(*last_submit_ms);
514 if elapsed < limits.cooldown_ms {
515 let remaining = limits.cooldown_ms - elapsed;
516 return Some((
517 RejectionReasonCode::RiskStrategyCooldownActive
518 .as_str()
519 .to_string(),
520 format!(
521 "Strategy '{}' cooldown active ({}ms remaining)",
522 source_tag, remaining
523 ),
524 ));
525 }
526 }
527 }
528
529 None
530 }
531
532 fn mark_strategy_submit(&mut self, source_tag: &str, created_at_ms: u64) {
533 self.last_strategy_submit_ms
534 .insert(source_tag.to_string(), created_at_ms);
535 }
536
537 fn max_symbol_exposure_usdt(&self) -> f64 {
538 self.symbol_exposure_limit_by_key
539 .get(&symbol_limit_key(&self.symbol, self.market))
540 .copied()
541 .unwrap_or(self.default_symbol_max_exposure_usdt)
542 }
543
544 fn projected_notional_after_fill(&self, side: OrderSide, qty: f64) -> (f64, f64) {
545 let price = self.last_price.max(0.0);
546 if price <= f64::EPSILON {
547 return (0.0, 0.0);
548 }
549 let current_qty_signed = match self.position.side {
550 Some(OrderSide::Buy) => self.position.qty,
551 Some(OrderSide::Sell) => -self.position.qty,
552 None => 0.0,
553 };
554 let delta = match side {
555 OrderSide::Buy => qty,
556 OrderSide::Sell => -qty,
557 };
558 let projected_qty_signed = current_qty_signed + delta;
559 (
560 current_qty_signed.abs() * price,
561 projected_qty_signed.abs() * price,
562 )
563 }
564
565 fn evaluate_symbol_exposure_limit(&self, side: OrderSide, qty: f64) -> Option<(String, String)> {
566 let max_exposure = self.max_symbol_exposure_usdt();
567 if max_exposure <= f64::EPSILON {
568 return None;
569 }
570 let (current_notional, projected_notional) = self.projected_notional_after_fill(side, qty);
571 if projected_notional > max_exposure && projected_notional > current_notional + f64::EPSILON {
572 return Some((
573 RejectionReasonCode::RiskSymbolExposureLimitExceeded
574 .as_str()
575 .to_string(),
576 format!(
577 "Symbol exposure limit exceeded for {} ({:?}): projected {:.2} USDT > limit {:.2} USDT",
578 self.symbol, self.market, projected_notional, max_exposure
579 ),
580 ));
581 }
582 None
583 }
584
585 pub async fn refresh_balances(&mut self) -> Result<HashMap<String, f64>> {
597 if !self
598 .risk_module
599 .reserve_endpoint_budget(ApiEndpointGroup::Account)
600 {
601 return Err(anyhow::anyhow!(
602 "Account endpoint budget exceeded; try again after reset"
603 ));
604 }
605 if self.market == MarketKind::Futures {
606 let account = self.rest_client.get_futures_account().await?;
607 self.balances.clear();
608 for a in &account.assets {
609 if a.wallet_balance.abs() > f64::EPSILON {
610 self.balances.insert(a.asset.clone(), a.available_balance);
611 }
612 }
613 return Ok(self.balances.clone());
614 }
615 let account = self.rest_client.get_account().await?;
616 self.balances.clear();
617 for b in &account.balances {
618 let total = b.free + b.locked;
619 if total > 0.0 {
620 self.balances.insert(b.asset.clone(), b.free);
621 }
622 }
623 tracing::info!(balances = ?self.balances, "Balances refreshed");
624 Ok(self.balances.clone())
625 }
626
627 pub async fn refresh_order_history(&mut self, limit: usize) -> Result<OrderHistorySnapshot> {
636 if !self
637 .risk_module
638 .reserve_endpoint_budget(ApiEndpointGroup::Orders)
639 {
640 return Err(anyhow::anyhow!(
641 "Orders endpoint budget exceeded; try again after reset"
642 ));
643 }
644 if self.market == MarketKind::Futures {
645 let fetch_started = Instant::now();
646 let fetched_at_ms = chrono::Utc::now().timestamp_millis() as u64;
647 let orders_result = self
648 .rest_client
649 .get_futures_all_orders(&self.symbol, limit)
650 .await;
651 let trades_result = self
652 .rest_client
653 .get_futures_my_trades_history(&self.symbol, limit.max(1))
654 .await;
655 let fetch_latency_ms = fetch_started.elapsed().as_millis() as u64;
656
657 if orders_result.is_err() && trades_result.is_err() {
658 let oe = orders_result.err().unwrap();
659 let te = trades_result.err().unwrap();
660 return Err(anyhow::anyhow!(
661 "futures order history fetch failed: allOrders={} | userTrades={}",
662 oe,
663 te
664 ));
665 }
666
667 let mut orders = orders_result.unwrap_or_default();
668 let trades = trades_result.unwrap_or_default();
669 orders.sort_by_key(|o| o.update_time.max(o.time));
670
671 let storage_key = storage_symbol(&self.symbol, self.market);
672 if let Err(e) = order_store::persist_order_snapshot(&storage_key, &orders, &trades) {
673 tracing::warn!(error = %e, "Failed to persist futures order snapshot to sqlite");
674 }
675
676 let mut history = Vec::new();
677 let mut fills = Vec::new();
678 for t in &trades {
679 let side = if t.is_buyer { "BUY" } else { "SELL" };
680 fills.push(OrderHistoryFill {
681 timestamp_ms: t.time,
682 side: if t.is_buyer {
683 OrderSide::Buy
684 } else {
685 OrderSide::Sell
686 },
687 price: t.price,
688 });
689 history.push(format_order_history_row(
690 t.time,
691 "FILLED",
692 side,
693 t.qty,
694 t.price,
695 &format!("order#{}#T{} [FUT]", t.order_id, t.id),
696 ));
697 }
698 for o in &orders {
699 if o.executed_qty <= 0.0 {
700 history.push(format_order_history_row(
701 o.update_time.max(o.time),
702 &o.status,
703 &o.side,
704 display_qty_for_history(&o.status, o.orig_qty, o.executed_qty),
705 if o.executed_qty > 0.0 {
706 o.cummulative_quote_qty / o.executed_qty
707 } else {
708 o.price
709 },
710 &o.client_order_id,
711 ));
712 }
713 }
714
715 let mut stats = OrderHistoryStats::default();
716 for t in &trades {
717 if t.realized_pnl > 0.0 {
718 stats.win_count += 1;
719 stats.trade_count += 1;
720 } else if t.realized_pnl < 0.0 {
721 stats.lose_count += 1;
722 stats.trade_count += 1;
723 }
724 stats.realized_pnl += t.realized_pnl;
725 }
726 let estimated_total_pnl_usdt = Some(stats.realized_pnl);
727 let latest_order_event = orders.iter().map(|o| o.update_time.max(o.time)).max();
728 let latest_trade_event = trades.iter().map(|t| t.time).max();
729 let strategy_stats = match order_store::load_strategy_symbol_stats(&storage_key) {
730 Ok(persisted) => from_persisted_stats_map(persisted),
731 Err(e) => {
732 tracing::warn!(error = %e, "Failed to load persisted strategy stats (futures)");
733 HashMap::new()
734 }
735 };
736 return Ok(OrderHistorySnapshot {
737 rows: history,
738 stats,
739 strategy_stats,
740 fills,
741 open_qty: 0.0,
742 open_entry_price: 0.0,
743 estimated_total_pnl_usdt,
744 trade_data_complete: true,
745 fetched_at_ms,
746 fetch_latency_ms,
747 latest_event_ms: latest_order_event.max(latest_trade_event),
748 });
749 }
750
751 let fetch_started = Instant::now();
752 let fetched_at_ms = chrono::Utc::now().timestamp_millis() as u64;
753 let orders_result = self.rest_client.get_all_orders(&self.symbol, limit).await;
754 let storage_key = storage_symbol(&self.symbol, self.market);
755 let last_trade_id = order_store::load_last_trade_id(&storage_key).ok().flatten();
756 let persisted_trade_count = order_store::load_trade_count(&storage_key).unwrap_or(0);
757 let need_backfill = persisted_trade_count < limit;
758 let trades_result = match (need_backfill, last_trade_id) {
759 (true, _) => {
760 self.rest_client
761 .get_my_trades_history(&self.symbol, limit.max(1))
762 .await
763 }
764 (false, Some(last_id)) => {
765 self.rest_client
766 .get_my_trades_since(&self.symbol, last_id.saturating_add(1), 10)
767 .await
768 }
769 (false, None) => {
770 self.rest_client
771 .get_my_trades_history(&self.symbol, limit.max(1))
772 .await
773 }
774 };
775 let fetch_latency_ms = fetch_started.elapsed().as_millis() as u64;
776 let trade_data_complete = trades_result.is_ok();
777
778 if orders_result.is_err() && trades_result.is_err() {
779 let oe = orders_result.err().unwrap();
780 let te = trades_result.err().unwrap();
781 return Err(anyhow::anyhow!(
782 "order history fetch failed: allOrders={} | myTrades={}",
783 oe,
784 te
785 ));
786 }
787
788 let mut orders = match orders_result {
789 Ok(v) => v,
790 Err(e) => {
791 tracing::warn!(error = %e, "Failed to fetch allOrders; falling back to trade-only history");
792 Vec::new()
793 }
794 };
795 let recent_trades = match trades_result {
796 Ok(t) => t,
797 Err(e) => {
798 tracing::warn!(error = %e, "Failed to fetch myTrades; falling back to order-only history");
799 Vec::new()
800 }
801 };
802 let mut trades = recent_trades.clone();
803 orders.sort_by_key(|o| o.update_time.max(o.time));
804
805 if let Err(e) = order_store::persist_order_snapshot(&storage_key, &orders, &recent_trades) {
806 tracing::warn!(error = %e, "Failed to persist order snapshot to sqlite");
807 }
808 let mut persisted_source_by_order_id: HashMap<u64, String> = HashMap::new();
809 match order_store::load_persisted_trades(&storage_key) {
810 Ok(saved) => {
811 if !saved.is_empty() {
812 trades = saved.iter().map(|r| r.trade.clone()).collect();
813 for row in saved {
814 persisted_source_by_order_id
815 .entry(row.trade.order_id)
816 .or_insert(row.source);
817 }
818 }
819 }
820 Err(e) => {
821 tracing::warn!(error = %e, "Failed to load persisted trades; using recent API trades");
822 }
823 }
824
825 let (stats, open_pos) = compute_trade_state(trades.clone(), &self.symbol);
826 let estimated_total_pnl_usdt = if self.last_price > 0.0 {
827 Some(stats.realized_pnl + (open_pos.qty * self.last_price - open_pos.cost_quote))
828 } else {
829 Some(stats.realized_pnl)
830 };
831 let latest_order_event = orders.iter().map(|o| o.update_time.max(o.time)).max();
832 let latest_trade_event = trades.iter().map(|t| t.time).max();
833 let latest_event_ms = latest_order_event.max(latest_trade_event);
834
835 let mut trades_by_order_id: HashMap<u64, Vec<BinanceMyTrade>> = HashMap::new();
836 for trade in &trades {
837 trades_by_order_id
838 .entry(trade.order_id)
839 .or_default()
840 .push(trade.clone());
841 }
842 for bucket in trades_by_order_id.values_mut() {
843 bucket.sort_by_key(|t| t.time);
844 }
845
846 let mut order_source_by_id = HashMap::new();
847 for o in &orders {
848 order_source_by_id.insert(
849 o.order_id,
850 source_label_from_client_order_id(&o.client_order_id).to_string(),
851 );
852 }
853 for (order_id, source) in persisted_source_by_order_id {
854 order_source_by_id.entry(order_id).or_insert(source);
855 }
856 let mut strategy_stats =
857 compute_trade_stats_by_source(trades.clone(), &order_source_by_id, &self.symbol);
858 let persisted_stats = to_persistable_stats_map(&strategy_stats);
859 if let Err(e) = order_store::persist_strategy_symbol_stats(&storage_key, &persisted_stats) {
860 tracing::warn!(error = %e, "Failed to persist strategy+symbol scoped stats");
861 }
862 if strategy_stats.is_empty() {
863 match order_store::load_strategy_symbol_stats(&storage_key) {
864 Ok(persisted) => {
865 strategy_stats = from_persisted_stats_map(persisted);
866 }
867 Err(e) => {
868 tracing::warn!(error = %e, "Failed to load persisted strategy+symbol stats");
869 }
870 }
871 }
872
873 let mut history = Vec::new();
874 let mut fills = Vec::new();
875 let mut used_trade_ids = std::collections::HashSet::new();
876
877 if orders.is_empty() && !trades.is_empty() {
878 let mut sorted = trades;
879 sorted.sort_by_key(|t| (t.time, t.id));
880 history.extend(sorted.iter().map(|t| {
881 fills.push(OrderHistoryFill {
882 timestamp_ms: t.time,
883 side: if t.is_buyer {
884 OrderSide::Buy
885 } else {
886 OrderSide::Sell
887 },
888 price: t.price,
889 });
890 format_trade_history_row(
891 t,
892 order_source_by_id
893 .get(&t.order_id)
894 .map(String::as_str)
895 .unwrap_or("UNKNOWN"),
896 )
897 }));
898 return Ok(OrderHistorySnapshot {
899 rows: history,
900 stats,
901 strategy_stats,
902 fills,
903 open_qty: open_pos.qty,
904 open_entry_price: if open_pos.qty > f64::EPSILON {
905 open_pos.cost_quote / open_pos.qty
906 } else {
907 0.0
908 },
909 estimated_total_pnl_usdt,
910 trade_data_complete,
911 fetched_at_ms,
912 fetch_latency_ms,
913 latest_event_ms,
914 });
915 }
916
917 for o in orders {
918 if o.executed_qty > 0.0 {
919 if let Some(order_trades) = trades_by_order_id.get(&o.order_id) {
920 for t in order_trades {
921 used_trade_ids.insert(t.id);
922 let side = if t.is_buyer { "BUY" } else { "SELL" };
923 fills.push(OrderHistoryFill {
924 timestamp_ms: t.time,
925 side: if t.is_buyer {
926 OrderSide::Buy
927 } else {
928 OrderSide::Sell
929 },
930 price: t.price,
931 });
932 history.push(format_order_history_row(
933 t.time,
934 "FILLED",
935 side,
936 t.qty,
937 t.price,
938 &format!(
939 "{}#T{} [{}]",
940 o.client_order_id,
941 t.id,
942 source_label_from_client_order_id(&o.client_order_id)
943 ),
944 ));
945 }
946 continue;
947 }
948 }
949
950 let avg_price = if o.executed_qty > 0.0 {
951 o.cummulative_quote_qty / o.executed_qty
952 } else {
953 o.price
954 };
955 history.push(format_order_history_row(
956 o.update_time.max(o.time),
957 &o.status,
958 &o.side,
959 display_qty_for_history(&o.status, o.orig_qty, o.executed_qty),
960 avg_price,
961 &o.client_order_id,
962 ));
963 }
964
965 for bucket in trades_by_order_id.values() {
967 for t in bucket {
968 if !used_trade_ids.contains(&t.id) {
969 fills.push(OrderHistoryFill {
970 timestamp_ms: t.time,
971 side: if t.is_buyer {
972 OrderSide::Buy
973 } else {
974 OrderSide::Sell
975 },
976 price: t.price,
977 });
978 history.push(format_trade_history_row(
979 t,
980 order_source_by_id
981 .get(&t.order_id)
982 .map(String::as_str)
983 .unwrap_or("UNKNOWN"),
984 ));
985 }
986 }
987 }
988 Ok(OrderHistorySnapshot {
989 rows: history,
990 stats,
991 strategy_stats,
992 fills,
993 open_qty: open_pos.qty,
994 open_entry_price: if open_pos.qty > f64::EPSILON {
995 open_pos.cost_quote / open_pos.qty
996 } else {
997 0.0
998 },
999 estimated_total_pnl_usdt,
1000 trade_data_complete,
1001 fetched_at_ms,
1002 fetch_latency_ms,
1003 latest_event_ms,
1004 })
1005 }
1006
1007 pub async fn submit_order(
1032 &mut self,
1033 signal: Signal,
1034 source_tag: &str,
1035 ) -> Result<Option<OrderUpdate>> {
1036 let side = match &signal {
1037 Signal::Buy => OrderSide::Buy,
1038 Signal::Sell => OrderSide::Sell,
1039 Signal::Hold => return Ok(None),
1040 };
1041 let source_tag = source_tag.to_ascii_lowercase();
1042 let intent = OrderIntent {
1043 intent_id: format!("intent-{}", &uuid::Uuid::new_v4().to_string()[..8]),
1044 source_tag: source_tag.clone(),
1045 symbol: self.symbol.clone(),
1046 market: self.market,
1047 side,
1048 order_amount_usdt: self.order_amount_usdt,
1049 last_price: self.last_price,
1050 created_at_ms: chrono::Utc::now().timestamp_millis() as u64,
1051 };
1052 if let Some((reason_code, reason)) =
1053 self.evaluate_strategy_limits(&intent.source_tag, intent.created_at_ms)
1054 {
1055 return Ok(Some(OrderUpdate::Rejected {
1056 intent_id: intent.intent_id.clone(),
1057 client_order_id: "n/a".to_string(),
1058 reason_code,
1059 reason,
1060 }));
1061 }
1062 let decision = self
1063 .risk_module
1064 .evaluate_intent(&intent, &self.balances)
1065 .await?;
1066 if !decision.approved {
1067 return Ok(Some(OrderUpdate::Rejected {
1068 intent_id: intent.intent_id.clone(),
1069 client_order_id: "n/a".to_string(),
1070 reason_code: decision
1071 .reason_code
1072 .unwrap_or_else(|| RejectionReasonCode::RiskUnknown.as_str().to_string()),
1073 reason: decision
1074 .reason
1075 .unwrap_or_else(|| "Rejected by RiskModule".to_string()),
1076 }));
1077 }
1078 if !self.risk_module.reserve_rate_budget() {
1079 return Ok(Some(OrderUpdate::Rejected {
1080 intent_id: intent.intent_id.clone(),
1081 client_order_id: "n/a".to_string(),
1082 reason_code: RejectionReasonCode::RateGlobalBudgetExceeded
1083 .as_str()
1084 .to_string(),
1085 reason: "Global rate budget exceeded; try again after reset".to_string(),
1086 }));
1087 }
1088 if !self
1089 .risk_module
1090 .reserve_endpoint_budget(ApiEndpointGroup::Orders)
1091 {
1092 return Ok(Some(OrderUpdate::Rejected {
1093 intent_id: intent.intent_id.clone(),
1094 client_order_id: "n/a".to_string(),
1095 reason_code: RejectionReasonCode::RateEndpointBudgetExceeded
1096 .as_str()
1097 .to_string(),
1098 reason: "Orders endpoint budget exceeded; try again after reset".to_string(),
1099 }));
1100 }
1101 let qty = decision.normalized_qty;
1102 if let Some((reason_code, reason)) = self.evaluate_symbol_exposure_limit(side, qty) {
1103 return Ok(Some(OrderUpdate::Rejected {
1104 intent_id: intent.intent_id.clone(),
1105 client_order_id: "n/a".to_string(),
1106 reason_code,
1107 reason,
1108 }));
1109 }
1110 self.mark_strategy_submit(&intent.source_tag, intent.created_at_ms);
1111
1112 let client_order_id = format!(
1113 "sq-{}-{}",
1114 intent.source_tag,
1115 &uuid::Uuid::new_v4().to_string()[..8]
1116 );
1117
1118 let order = Order {
1119 client_order_id: client_order_id.clone(),
1120 server_order_id: None,
1121 symbol: self.symbol.clone(),
1122 side,
1123 order_type: OrderType::Market,
1124 quantity: qty,
1125 price: None,
1126 status: OrderStatus::PendingSubmit,
1127 created_at: chrono::Utc::now(),
1128 updated_at: chrono::Utc::now(),
1129 fills: vec![],
1130 };
1131
1132 self.active_orders.insert(client_order_id.clone(), order);
1133
1134 tracing::info!(
1135 side = %side,
1136 qty,
1137 usdt_amount = intent.order_amount_usdt,
1138 price = intent.last_price,
1139 intent_id = %intent.intent_id,
1140 created_at_ms = intent.created_at_ms,
1141 "Submitting order"
1142 );
1143
1144 let submit_res = if self.market == MarketKind::Futures {
1145 self.rest_client
1146 .place_futures_market_order(&self.symbol, side, qty, &client_order_id)
1147 .await
1148 } else {
1149 self.rest_client
1150 .place_market_order(&self.symbol, side, qty, &client_order_id)
1151 .await
1152 };
1153
1154 match submit_res {
1155 Ok(response) => {
1156 let update = self.process_order_response(
1157 &intent.intent_id,
1158 &client_order_id,
1159 side,
1160 &response,
1161 );
1162
1163 if matches!(update, OrderUpdate::Filled { .. }) {
1165 if let Err(e) = self.refresh_balances().await {
1166 tracing::warn!(error = %e, "Failed to refresh balances after fill");
1167 }
1168 }
1169
1170 Ok(Some(update))
1171 }
1172 Err(e) => {
1173 tracing::error!(
1174 client_order_id,
1175 error = %e,
1176 "Order rejected"
1177 );
1178 if let Some(order) = self.active_orders.get_mut(&client_order_id) {
1179 order.status = OrderStatus::Rejected;
1180 order.updated_at = chrono::Utc::now();
1181 }
1182 Ok(Some(OrderUpdate::Rejected {
1183 intent_id: intent.intent_id.clone(),
1184 client_order_id,
1185 reason_code: RejectionReasonCode::BrokerSubmitFailed.as_str().to_string(),
1186 reason: e.to_string(),
1187 }))
1188 }
1189 }
1190 }
1191
1192 fn process_order_response(
1193 &mut self,
1194 intent_id: &str,
1195 client_order_id: &str,
1196 side: OrderSide,
1197 response: &BinanceOrderResponse,
1198 ) -> OrderUpdate {
1199 let fills: Vec<Fill> = response
1200 .fills
1201 .iter()
1202 .map(|f| Fill {
1203 price: f.price,
1204 qty: f.qty,
1205 commission: f.commission,
1206 commission_asset: f.commission_asset.clone(),
1207 })
1208 .collect();
1209
1210 let status = OrderStatus::from_binance_str(&response.status);
1211
1212 if let Some(order) = self.active_orders.get_mut(client_order_id) {
1213 order.server_order_id = Some(response.order_id);
1214 order.status = status;
1215 order.fills = fills.clone();
1216 order.updated_at = chrono::Utc::now();
1217 }
1218
1219 if status == OrderStatus::Filled || status == OrderStatus::PartiallyFilled {
1220 self.position.apply_fill(side, &fills);
1221
1222 let avg_price = if fills.is_empty() {
1223 0.0
1224 } else {
1225 let total_value: f64 = fills.iter().map(|f| f.price * f.qty).sum();
1226 let total_qty: f64 = fills.iter().map(|f| f.qty).sum();
1227 total_value / total_qty
1228 };
1229
1230 tracing::info!(
1231 client_order_id,
1232 order_id = response.order_id,
1233 side = %side,
1234 avg_price,
1235 filled_qty = response.executed_qty,
1236 "Order filled"
1237 );
1238
1239 OrderUpdate::Filled {
1240 intent_id: intent_id.to_string(),
1241 client_order_id: client_order_id.to_string(),
1242 side,
1243 fills,
1244 avg_price,
1245 }
1246 } else {
1247 OrderUpdate::Submitted {
1248 intent_id: intent_id.to_string(),
1249 client_order_id: client_order_id.to_string(),
1250 server_order_id: response.order_id,
1251 }
1252 }
1253 }
1254}
1255
1256#[cfg(test)]
1257mod tests {
1258 use super::{display_qty_for_history, split_symbol_assets, OrderManager};
1259 use crate::binance::rest::BinanceRestClient;
1260 use crate::config::{EndpointRateLimitConfig, RiskConfig, SymbolExposureLimitConfig};
1261 use crate::model::order::{Order, OrderSide, OrderStatus, OrderType};
1262 use std::sync::Arc;
1263
1264 fn build_test_order_manager() -> OrderManager {
1265 let rest = Arc::new(BinanceRestClient::new(
1266 "https://demo-api.binance.com",
1267 "https://demo-fapi.binance.com",
1268 "k",
1269 "s",
1270 "fk",
1271 "fs",
1272 5000,
1273 ));
1274 let risk = RiskConfig {
1275 global_rate_limit_per_minute: 600,
1276 default_strategy_cooldown_ms: 3_000,
1277 default_strategy_max_active_orders: 1,
1278 default_symbol_max_exposure_usdt: 200.0,
1279 strategy_limits: vec![],
1280 symbol_exposure_limits: vec![SymbolExposureLimitConfig {
1281 symbol: "BTCUSDT".to_string(),
1282 market: Some("spot".to_string()),
1283 max_exposure_usdt: 150.0,
1284 }],
1285 endpoint_rate_limits: EndpointRateLimitConfig {
1286 orders_per_minute: 240,
1287 account_per_minute: 180,
1288 market_data_per_minute: 360,
1289 },
1290 };
1291 OrderManager::new(
1292 rest,
1293 "BTCUSDT",
1294 crate::order_manager::MarketKind::Spot,
1295 10.0,
1296 &risk,
1297 )
1298 }
1299
1300 #[test]
1301 fn valid_state_transitions() {
1302 let from = OrderStatus::PendingSubmit;
1304 let to = OrderStatus::Submitted;
1305 assert!(!from.is_terminal());
1306 assert!(!to.is_terminal());
1307
1308 let to = OrderStatus::Filled;
1310 assert!(to.is_terminal());
1311
1312 let to = OrderStatus::Rejected;
1314 assert!(to.is_terminal());
1315
1316 let to = OrderStatus::Cancelled;
1318 assert!(to.is_terminal());
1319 }
1320
1321 #[test]
1322 fn from_binance_str_mapping() {
1323 assert_eq!(OrderStatus::from_binance_str("NEW"), OrderStatus::Submitted);
1324 assert_eq!(OrderStatus::from_binance_str("FILLED"), OrderStatus::Filled);
1325 assert_eq!(
1326 OrderStatus::from_binance_str("CANCELED"),
1327 OrderStatus::Cancelled
1328 );
1329 assert_eq!(
1330 OrderStatus::from_binance_str("REJECTED"),
1331 OrderStatus::Rejected
1332 );
1333 assert_eq!(
1334 OrderStatus::from_binance_str("EXPIRED"),
1335 OrderStatus::Expired
1336 );
1337 assert_eq!(
1338 OrderStatus::from_binance_str("PARTIALLY_FILLED"),
1339 OrderStatus::PartiallyFilled
1340 );
1341 }
1342
1343 #[test]
1344 fn order_history_uses_executed_qty_for_filled_states() {
1345 assert!((display_qty_for_history("FILLED", 1.0, 0.4) - 0.4).abs() < f64::EPSILON);
1346 assert!((display_qty_for_history("PARTIALLY_FILLED", 1.0, 0.4) - 0.4).abs() < f64::EPSILON);
1347 }
1348
1349 #[test]
1350 fn order_history_uses_orig_qty_for_non_filled_states() {
1351 assert!((display_qty_for_history("NEW", 1.0, 0.4) - 1.0).abs() < f64::EPSILON);
1352 assert!((display_qty_for_history("CANCELED", 1.0, 0.4) - 1.0).abs() < f64::EPSILON);
1353 assert!((display_qty_for_history("REJECTED", 1.0, 0.0) - 1.0).abs() < f64::EPSILON);
1354 }
1355
1356 #[test]
1357 fn split_symbol_assets_parses_known_quote_suffixes() {
1358 assert_eq!(
1359 split_symbol_assets("ETHUSDT"),
1360 ("ETH".to_string(), "USDT".to_string())
1361 );
1362 assert_eq!(
1363 split_symbol_assets("ETHBTC"),
1364 ("ETH".to_string(), "BTC".to_string())
1365 );
1366 }
1367
1368 #[test]
1369 fn split_symbol_assets_falls_back_when_quote_unknown() {
1370 assert_eq!(
1371 split_symbol_assets("FOOBAR"),
1372 ("FOOBAR".to_string(), String::new())
1373 );
1374 }
1375
1376 #[test]
1377 fn strategy_limit_rejects_when_active_orders_reach_limit() {
1378 let mut mgr = build_test_order_manager();
1379 let client_order_id = "sq-cfg-abcdef12".to_string();
1380 mgr.active_orders.insert(
1381 client_order_id.clone(),
1382 Order {
1383 client_order_id,
1384 server_order_id: None,
1385 symbol: "BTCUSDT".to_string(),
1386 side: OrderSide::Buy,
1387 order_type: OrderType::Market,
1388 quantity: 0.1,
1389 price: None,
1390 status: OrderStatus::Submitted,
1391 created_at: chrono::Utc::now(),
1392 updated_at: chrono::Utc::now(),
1393 fills: vec![],
1394 },
1395 );
1396
1397 let rejected = mgr
1398 .evaluate_strategy_limits("cfg", chrono::Utc::now().timestamp_millis() as u64)
1399 .expect("must be rejected");
1400 assert_eq!(
1401 rejected.0,
1402 "risk.strategy_max_active_orders_exceeded".to_string()
1403 );
1404 }
1405
1406 #[test]
1407 fn strategy_limit_rejects_during_cooldown_window() {
1408 let mut mgr = build_test_order_manager();
1409 let now = chrono::Utc::now().timestamp_millis() as u64;
1410 mgr.mark_strategy_submit("cfg", now);
1411
1412 let rejected = mgr
1413 .evaluate_strategy_limits("cfg", now + 500)
1414 .expect("must be rejected");
1415 assert_eq!(rejected.0, "risk.strategy_cooldown_active".to_string());
1416 }
1417
1418 #[test]
1419 fn symbol_exposure_limit_rejects_when_projected_notional_exceeds_limit() {
1420 let mut mgr = build_test_order_manager();
1421 mgr.last_price = 100.0;
1422 let rejected = mgr
1424 .evaluate_symbol_exposure_limit(OrderSide::Buy, 2.0)
1425 .expect("must be rejected");
1426 assert_eq!(rejected.0, "risk.symbol_exposure_limit_exceeded".to_string());
1427 }
1428
1429 #[test]
1430 fn symbol_exposure_limit_allows_risk_reducing_order() {
1431 let mut mgr = build_test_order_manager();
1432 mgr.last_price = 100.0;
1433 mgr.position.side = Some(OrderSide::Buy);
1434 mgr.position.qty = 2.0; let rejected = mgr.evaluate_symbol_exposure_limit(OrderSide::Sell, 1.0);
1438 assert!(rejected.is_none());
1439 }
1440}