1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Instant;
4
5use anyhow::Result;
6use chrono::TimeZone;
7
8use crate::binance::rest::BinanceRestClient;
9use crate::binance::types::{BinanceMyTrade, BinanceOrderResponse};
10use crate::config::RiskConfig;
11use crate::model::order::{Fill, Order, OrderSide, OrderStatus, OrderType};
12use crate::model::position::Position;
13use crate::model::signal::Signal;
14use crate::order_store;
15use crate::risk_module::{
16 ApiEndpointGroup, EndpointRateLimits, OrderIntent, RateBudgetSnapshot, RejectionReasonCode,
17 RiskModule,
18};
19
20pub use crate::risk_module::MarketKind;
21
22#[derive(Debug, Clone)]
23pub enum OrderUpdate {
24 Submitted {
25 intent_id: String,
26 client_order_id: String,
27 server_order_id: u64,
28 },
29 Filled {
30 intent_id: String,
31 client_order_id: String,
32 side: OrderSide,
33 fills: Vec<Fill>,
34 avg_price: f64,
35 },
36 Rejected {
37 intent_id: String,
38 client_order_id: String,
39 reason_code: String,
40 reason: String,
41 },
42}
43
44#[derive(Debug, Clone, Default)]
45pub struct OrderHistoryStats {
46 pub trade_count: u32,
47 pub win_count: u32,
48 pub lose_count: u32,
49 pub realized_pnl: f64,
50}
51
52#[derive(Debug, Clone, Default)]
53pub struct OrderHistorySnapshot {
54 pub rows: Vec<String>,
55 pub stats: OrderHistoryStats,
56 pub strategy_stats: HashMap<String, OrderHistoryStats>,
57 pub fills: Vec<OrderHistoryFill>,
58 pub open_qty: f64,
59 pub open_entry_price: f64,
60 pub estimated_total_pnl_usdt: Option<f64>,
61 pub trade_data_complete: bool,
62 pub fetched_at_ms: u64,
63 pub fetch_latency_ms: u64,
64 pub latest_event_ms: Option<u64>,
65}
66
67#[derive(Debug, Clone)]
68pub struct OrderHistoryFill {
69 pub timestamp_ms: u64,
70 pub side: OrderSide,
71 pub price: f64,
72}
73
74pub struct OrderManager {
75 rest_client: Arc<BinanceRestClient>,
76 active_orders: HashMap<String, Order>,
77 position: Position,
78 symbol: String,
79 market: MarketKind,
80 order_amount_usdt: f64,
81 balances: HashMap<String, f64>,
82 last_price: f64,
83 risk_module: RiskModule,
84 default_strategy_cooldown_ms: u64,
85 default_strategy_max_active_orders: u32,
86 strategy_limits_by_tag: HashMap<String, StrategyExecutionLimit>,
87 last_strategy_submit_ms: HashMap<String, u64>,
88 default_symbol_max_exposure_usdt: f64,
89 symbol_exposure_limit_by_key: HashMap<String, f64>,
90}
91
92#[derive(Debug, Clone, Copy)]
93struct StrategyExecutionLimit {
94 cooldown_ms: u64,
95 max_active_orders: u32,
96}
97
98fn normalize_market_label(market: MarketKind) -> &'static str {
99 match market {
100 MarketKind::Spot => "spot",
101 MarketKind::Futures => "futures",
102 }
103}
104
105fn symbol_limit_key(symbol: &str, market: MarketKind) -> String {
106 format!(
107 "{}:{}",
108 symbol.trim().to_ascii_uppercase(),
109 normalize_market_label(market)
110 )
111}
112
113fn storage_symbol(symbol: &str, market: MarketKind) -> String {
114 match market {
115 MarketKind::Spot => symbol.to_string(),
116 MarketKind::Futures => format!("{}#FUT", symbol),
117 }
118}
119
120fn display_qty_for_history(status: &str, orig_qty: f64, executed_qty: f64) -> f64 {
121 match status {
122 "FILLED" | "PARTIALLY_FILLED" => executed_qty,
123 _ => orig_qty,
124 }
125}
126
127fn format_history_time(timestamp_ms: u64) -> String {
128 chrono::Utc
129 .timestamp_millis_opt(timestamp_ms as i64)
130 .single()
131 .map(|dt| {
132 dt.with_timezone(&chrono::Local)
133 .format("%H:%M:%S")
134 .to_string()
135 })
136 .unwrap_or_else(|| "--:--:--".to_string())
137}
138
139fn format_order_history_row(
140 timestamp_ms: u64,
141 status: &str,
142 side: &str,
143 qty: f64,
144 avg_price: f64,
145 client_order_id: &str,
146) -> String {
147 format!(
148 "{} {:<10} {:<4} {:.5} @ {:.2} {}",
149 format_history_time(timestamp_ms),
150 status,
151 side,
152 qty,
153 avg_price,
154 client_order_id
155 )
156}
157
158fn source_label_from_client_order_id(client_order_id: &str) -> &'static str {
159 if client_order_id.contains("-mnl-") {
160 "MANUAL"
161 } else if client_order_id.contains("-cfg-") {
162 "MA(Config)"
163 } else if client_order_id.contains("-fst-") {
164 "MA(Fast 5/20)"
165 } else if client_order_id.contains("-slw-") {
166 "MA(Slow 20/60)"
167 } else {
168 "UNKNOWN"
169 }
170}
171
172fn format_trade_history_row(t: &BinanceMyTrade, source: &str) -> String {
173 let side = if t.is_buyer { "BUY" } else { "SELL" };
174 format_order_history_row(
175 t.time,
176 "FILLED",
177 side,
178 t.qty,
179 t.price,
180 &format!("order#{}#T{} [{}]", t.order_id, t.id, source),
181 )
182}
183
184fn split_symbol_assets(symbol: &str) -> (String, String) {
185 const QUOTE_SUFFIXES: [&str; 10] = [
186 "USDT", "USDC", "FDUSD", "BUSD", "TUSD", "TRY", "EUR", "BTC", "ETH", "BNB",
187 ];
188 for q in QUOTE_SUFFIXES {
189 if let Some(base) = symbol.strip_suffix(q) {
190 if !base.is_empty() {
191 return (base.to_string(), q.to_string());
192 }
193 }
194 }
195 (symbol.to_string(), String::new())
196}
197
198#[derive(Clone, Copy, Default)]
199struct LongPos {
200 qty: f64,
201 cost_quote: f64,
202}
203
204fn apply_spot_trade_with_fee(
205 pos: &mut LongPos,
206 stats: &mut OrderHistoryStats,
207 t: &BinanceMyTrade,
208 base_asset: &str,
209 quote_asset: &str,
210) {
211 let qty = t.qty.max(0.0);
212 if qty <= f64::EPSILON {
213 return;
214 }
215 let fee_asset = t.commission_asset.as_str();
216 let fee_is_base = !base_asset.is_empty() && fee_asset.eq_ignore_ascii_case(base_asset);
217 let fee_is_quote = !quote_asset.is_empty() && fee_asset.eq_ignore_ascii_case(quote_asset);
218
219 if t.is_buyer {
220 let net_qty = (qty
221 - if fee_is_base {
222 t.commission.max(0.0)
223 } else {
224 0.0
225 })
226 .max(0.0);
227 if net_qty <= f64::EPSILON {
228 return;
229 }
230 let fee_quote = if fee_is_quote {
231 t.commission.max(0.0)
232 } else {
233 0.0
234 };
235 pos.qty += net_qty;
236 pos.cost_quote += qty * t.price + fee_quote;
237 return;
238 }
239
240 if pos.qty <= f64::EPSILON {
242 return;
243 }
244 let close_qty = qty.min(pos.qty);
245 if close_qty <= f64::EPSILON {
246 return;
247 }
248 let avg_cost = pos.cost_quote / pos.qty.max(f64::EPSILON);
249 let fee_quote_total = if fee_is_quote {
250 t.commission.max(0.0)
251 } else if fee_is_base {
252 t.commission.max(0.0) * t.price
254 } else {
255 0.0
256 };
257 let fee_quote = fee_quote_total * (close_qty / qty.max(f64::EPSILON));
258 let pnl_delta = (close_qty * t.price - fee_quote) - (avg_cost * close_qty);
259 if pnl_delta > 0.0 {
260 stats.win_count += 1;
261 stats.trade_count += 1;
262 } else if pnl_delta < 0.0 {
263 stats.lose_count += 1;
264 stats.trade_count += 1;
265 }
266 stats.realized_pnl += pnl_delta;
267
268 pos.qty -= close_qty;
269 pos.cost_quote -= avg_cost * close_qty;
270 if pos.qty <= f64::EPSILON {
271 pos.qty = 0.0;
272 pos.cost_quote = 0.0;
273 }
274}
275
276fn compute_trade_state(
277 mut trades: Vec<BinanceMyTrade>,
278 symbol: &str,
279) -> (OrderHistoryStats, LongPos) {
280 trades.sort_by_key(|t| (t.time, t.id));
281 let (base_asset, quote_asset) = split_symbol_assets(symbol);
282 let mut pos = LongPos::default();
283 let mut stats = OrderHistoryStats::default();
284 for t in trades {
285 apply_spot_trade_with_fee(&mut pos, &mut stats, &t, &base_asset, "e_asset);
286 }
287 (stats, pos)
288}
289
290fn compute_trade_stats_by_source(
291 mut trades: Vec<BinanceMyTrade>,
292 order_source_by_id: &HashMap<u64, String>,
293 symbol: &str,
294) -> HashMap<String, OrderHistoryStats> {
295 trades.sort_by_key(|t| (t.time, t.id));
296 let (base_asset, quote_asset) = split_symbol_assets(symbol);
297 let mut pos_by_source: HashMap<String, LongPos> = HashMap::new();
298 let mut stats_by_source: HashMap<String, OrderHistoryStats> = HashMap::new();
299
300 for t in trades {
301 let source = order_source_by_id
302 .get(&t.order_id)
303 .cloned()
304 .unwrap_or_else(|| "UNKNOWN".to_string());
305 let pos = pos_by_source.entry(source.clone()).or_default();
306 let stats = stats_by_source.entry(source).or_default();
307 apply_spot_trade_with_fee(pos, stats, &t, &base_asset, "e_asset);
308 }
309
310 stats_by_source
311}
312
313fn to_persistable_stats_map(
314 strategy_stats: &HashMap<String, OrderHistoryStats>,
315) -> HashMap<String, order_store::StrategyScopedStats> {
316 strategy_stats
317 .iter()
318 .map(|(k, v)| {
319 (
320 k.clone(),
321 order_store::StrategyScopedStats {
322 trade_count: v.trade_count,
323 win_count: v.win_count,
324 lose_count: v.lose_count,
325 realized_pnl: v.realized_pnl,
326 },
327 )
328 })
329 .collect()
330}
331
332fn from_persisted_stats_map(
333 persisted: HashMap<String, order_store::StrategyScopedStats>,
334) -> HashMap<String, OrderHistoryStats> {
335 persisted
336 .into_iter()
337 .map(|(k, v)| {
338 (
339 k,
340 OrderHistoryStats {
341 trade_count: v.trade_count,
342 win_count: v.win_count,
343 lose_count: v.lose_count,
344 realized_pnl: v.realized_pnl,
345 },
346 )
347 })
348 .collect()
349}
350
351impl OrderManager {
352 pub fn new(
361 rest_client: Arc<BinanceRestClient>,
362 symbol: &str,
363 market: MarketKind,
364 order_amount_usdt: f64,
365 risk_config: &RiskConfig,
366 ) -> Self {
367 let mut strategy_limits_by_tag = HashMap::new();
368 let mut symbol_exposure_limit_by_key = HashMap::new();
369 let default_strategy_cooldown_ms = risk_config.default_strategy_cooldown_ms;
370 let default_strategy_max_active_orders = risk_config.default_strategy_max_active_orders.max(1);
371 let default_symbol_max_exposure_usdt = risk_config.default_symbol_max_exposure_usdt.max(0.0);
372 for profile in &risk_config.strategy_limits {
373 let source_tag = profile.source_tag.trim().to_ascii_lowercase();
374 if source_tag.is_empty() {
375 continue;
376 }
377 strategy_limits_by_tag.insert(
378 source_tag,
379 StrategyExecutionLimit {
380 cooldown_ms: profile
381 .cooldown_ms
382 .unwrap_or(default_strategy_cooldown_ms),
383 max_active_orders: profile
384 .max_active_orders
385 .unwrap_or(default_strategy_max_active_orders)
386 .max(1),
387 },
388 );
389 }
390 for limit in &risk_config.symbol_exposure_limits {
391 let symbol = limit.symbol.trim().to_ascii_uppercase();
392 if symbol.is_empty() {
393 continue;
394 }
395 let market = match limit
396 .market
397 .as_deref()
398 .unwrap_or("spot")
399 .trim()
400 .to_ascii_lowercase()
401 .as_str()
402 {
403 "spot" => MarketKind::Spot,
404 "futures" | "future" | "fut" => MarketKind::Futures,
405 _ => continue,
406 };
407 symbol_exposure_limit_by_key.insert(
408 symbol_limit_key(&symbol, market),
409 limit.max_exposure_usdt.max(0.0),
410 );
411 }
412 Self {
413 rest_client: rest_client.clone(),
414 active_orders: HashMap::new(),
415 position: Position::new(symbol.to_string()),
416 symbol: symbol.to_string(),
417 market,
418 order_amount_usdt,
419 balances: HashMap::new(),
420 last_price: 0.0,
421 risk_module: RiskModule::new(
422 rest_client.clone(),
423 risk_config.global_rate_limit_per_minute,
424 EndpointRateLimits {
425 orders_per_minute: risk_config.endpoint_rate_limits.orders_per_minute,
426 account_per_minute: risk_config.endpoint_rate_limits.account_per_minute,
427 market_data_per_minute: risk_config
428 .endpoint_rate_limits
429 .market_data_per_minute,
430 },
431 ),
432 default_strategy_cooldown_ms,
433 default_strategy_max_active_orders,
434 strategy_limits_by_tag,
435 last_strategy_submit_ms: HashMap::new(),
436 default_symbol_max_exposure_usdt,
437 symbol_exposure_limit_by_key,
438 }
439 }
440
441 pub fn position(&self) -> &Position {
446 &self.position
447 }
448
449 pub fn balances(&self) -> &HashMap<String, f64> {
454 &self.balances
455 }
456
457 pub fn update_unrealized_pnl(&mut self, current_price: f64) {
463 self.last_price = current_price;
464 self.position.update_unrealized_pnl(current_price);
465 }
466
467 pub fn rate_budget_snapshot(&self) -> RateBudgetSnapshot {
471 self.risk_module.rate_budget_snapshot()
472 }
473
474 fn strategy_limits_for(&self, source_tag: &str) -> StrategyExecutionLimit {
475 self.strategy_limits_by_tag
476 .get(source_tag)
477 .copied()
478 .unwrap_or(StrategyExecutionLimit {
479 cooldown_ms: self.default_strategy_cooldown_ms,
480 max_active_orders: self.default_strategy_max_active_orders,
481 })
482 }
483
484 fn active_order_count_for_source(&self, source_tag: &str) -> u32 {
485 let prefix = format!("sq-{}-", source_tag);
486 self.active_orders
487 .values()
488 .filter(|o| !o.status.is_terminal() && o.client_order_id.starts_with(&prefix))
489 .count() as u32
490 }
491
492 fn evaluate_strategy_limits(
493 &self,
494 source_tag: &str,
495 created_at_ms: u64,
496 ) -> Option<(String, String)> {
497 let limits = self.strategy_limits_for(source_tag);
498 let active_count = self.active_order_count_for_source(source_tag);
499 if active_count >= limits.max_active_orders {
500 return Some((
501 RejectionReasonCode::RiskStrategyMaxActiveOrdersExceeded
502 .as_str()
503 .to_string(),
504 format!(
505 "Strategy '{}' active order limit exceeded (active {}, limit {})",
506 source_tag, active_count, limits.max_active_orders
507 ),
508 ));
509 }
510
511 if limits.cooldown_ms > 0 {
512 if let Some(last_submit_ms) = self.last_strategy_submit_ms.get(source_tag) {
513 let elapsed = created_at_ms.saturating_sub(*last_submit_ms);
514 if elapsed < limits.cooldown_ms {
515 let remaining = limits.cooldown_ms - elapsed;
516 return Some((
517 RejectionReasonCode::RiskStrategyCooldownActive
518 .as_str()
519 .to_string(),
520 format!(
521 "Strategy '{}' cooldown active ({}ms remaining)",
522 source_tag, remaining
523 ),
524 ));
525 }
526 }
527 }
528
529 None
530 }
531
532 fn mark_strategy_submit(&mut self, source_tag: &str, created_at_ms: u64) {
533 self.last_strategy_submit_ms
534 .insert(source_tag.to_string(), created_at_ms);
535 }
536
537 fn max_symbol_exposure_usdt(&self) -> f64 {
538 self.symbol_exposure_limit_by_key
539 .get(&symbol_limit_key(&self.symbol, self.market))
540 .copied()
541 .unwrap_or(self.default_symbol_max_exposure_usdt)
542 }
543
544 fn projected_notional_after_fill(&self, side: OrderSide, qty: f64) -> (f64, f64) {
545 let price = self.last_price.max(0.0);
546 if price <= f64::EPSILON {
547 return (0.0, 0.0);
548 }
549 let current_qty_signed = match self.position.side {
550 Some(OrderSide::Buy) => self.position.qty,
551 Some(OrderSide::Sell) => -self.position.qty,
552 None => 0.0,
553 };
554 let delta = match side {
555 OrderSide::Buy => qty,
556 OrderSide::Sell => -qty,
557 };
558 let projected_qty_signed = current_qty_signed + delta;
559 (
560 current_qty_signed.abs() * price,
561 projected_qty_signed.abs() * price,
562 )
563 }
564
565 fn evaluate_symbol_exposure_limit(&self, side: OrderSide, qty: f64) -> Option<(String, String)> {
566 let max_exposure = self.max_symbol_exposure_usdt();
567 if max_exposure <= f64::EPSILON {
568 return None;
569 }
570 let (current_notional, projected_notional) = self.projected_notional_after_fill(side, qty);
571 if projected_notional > max_exposure && projected_notional > current_notional + f64::EPSILON {
572 return Some((
573 RejectionReasonCode::RiskSymbolExposureLimitExceeded
574 .as_str()
575 .to_string(),
576 format!(
577 "Symbol exposure limit exceeded for {} ({:?}): projected {:.2} USDT > limit {:.2} USDT",
578 self.symbol, self.market, projected_notional, max_exposure
579 ),
580 ));
581 }
582 None
583 }
584
585 pub fn would_exceed_symbol_exposure_limit(&self, side: OrderSide, qty: f64) -> bool {
589 self.evaluate_symbol_exposure_limit(side, qty).is_some()
590 }
591
592 pub async fn refresh_balances(&mut self) -> Result<HashMap<String, f64>> {
604 if !self
605 .risk_module
606 .reserve_endpoint_budget(ApiEndpointGroup::Account)
607 {
608 return Err(anyhow::anyhow!(
609 "Account endpoint budget exceeded; try again after reset"
610 ));
611 }
612 if self.market == MarketKind::Futures {
613 let account = self.rest_client.get_futures_account().await?;
614 self.balances.clear();
615 for a in &account.assets {
616 if a.wallet_balance.abs() > f64::EPSILON {
617 self.balances.insert(a.asset.clone(), a.available_balance);
618 }
619 }
620 return Ok(self.balances.clone());
621 }
622 let account = self.rest_client.get_account().await?;
623 self.balances.clear();
624 for b in &account.balances {
625 let total = b.free + b.locked;
626 if total > 0.0 {
627 self.balances.insert(b.asset.clone(), b.free);
628 }
629 }
630 tracing::info!(balances = ?self.balances, "Balances refreshed");
631 Ok(self.balances.clone())
632 }
633
634 pub async fn refresh_order_history(&mut self, limit: usize) -> Result<OrderHistorySnapshot> {
643 if !self
644 .risk_module
645 .reserve_endpoint_budget(ApiEndpointGroup::Orders)
646 {
647 return Err(anyhow::anyhow!(
648 "Orders endpoint budget exceeded; try again after reset"
649 ));
650 }
651 if self.market == MarketKind::Futures {
652 let fetch_started = Instant::now();
653 let fetched_at_ms = chrono::Utc::now().timestamp_millis() as u64;
654 let orders_result = self
655 .rest_client
656 .get_futures_all_orders(&self.symbol, limit)
657 .await;
658 let trades_result = self
659 .rest_client
660 .get_futures_my_trades_history(&self.symbol, limit.max(1))
661 .await;
662 let fetch_latency_ms = fetch_started.elapsed().as_millis() as u64;
663
664 if orders_result.is_err() && trades_result.is_err() {
665 let oe = orders_result.err().unwrap();
666 let te = trades_result.err().unwrap();
667 return Err(anyhow::anyhow!(
668 "futures order history fetch failed: allOrders={} | userTrades={}",
669 oe,
670 te
671 ));
672 }
673
674 let mut orders = orders_result.unwrap_or_default();
675 let trades = trades_result.unwrap_or_default();
676 orders.sort_by_key(|o| o.update_time.max(o.time));
677
678 let storage_key = storage_symbol(&self.symbol, self.market);
679 if let Err(e) = order_store::persist_order_snapshot(&storage_key, &orders, &trades) {
680 tracing::warn!(error = %e, "Failed to persist futures order snapshot to sqlite");
681 }
682
683 let mut history = Vec::new();
684 let mut fills = Vec::new();
685 for t in &trades {
686 let side = if t.is_buyer { "BUY" } else { "SELL" };
687 fills.push(OrderHistoryFill {
688 timestamp_ms: t.time,
689 side: if t.is_buyer {
690 OrderSide::Buy
691 } else {
692 OrderSide::Sell
693 },
694 price: t.price,
695 });
696 history.push(format_order_history_row(
697 t.time,
698 "FILLED",
699 side,
700 t.qty,
701 t.price,
702 &format!("order#{}#T{} [FUT]", t.order_id, t.id),
703 ));
704 }
705 for o in &orders {
706 if o.executed_qty <= 0.0 {
707 history.push(format_order_history_row(
708 o.update_time.max(o.time),
709 &o.status,
710 &o.side,
711 display_qty_for_history(&o.status, o.orig_qty, o.executed_qty),
712 if o.executed_qty > 0.0 {
713 o.cummulative_quote_qty / o.executed_qty
714 } else {
715 o.price
716 },
717 &o.client_order_id,
718 ));
719 }
720 }
721
722 let mut stats = OrderHistoryStats::default();
723 for t in &trades {
724 if t.realized_pnl > 0.0 {
725 stats.win_count += 1;
726 stats.trade_count += 1;
727 } else if t.realized_pnl < 0.0 {
728 stats.lose_count += 1;
729 stats.trade_count += 1;
730 }
731 stats.realized_pnl += t.realized_pnl;
732 }
733 let estimated_total_pnl_usdt = Some(stats.realized_pnl);
734 let latest_order_event = orders.iter().map(|o| o.update_time.max(o.time)).max();
735 let latest_trade_event = trades.iter().map(|t| t.time).max();
736 let strategy_stats = match order_store::load_strategy_symbol_stats(&storage_key) {
737 Ok(persisted) => from_persisted_stats_map(persisted),
738 Err(e) => {
739 tracing::warn!(error = %e, "Failed to load persisted strategy stats (futures)");
740 HashMap::new()
741 }
742 };
743 return Ok(OrderHistorySnapshot {
744 rows: history,
745 stats,
746 strategy_stats,
747 fills,
748 open_qty: 0.0,
749 open_entry_price: 0.0,
750 estimated_total_pnl_usdt,
751 trade_data_complete: true,
752 fetched_at_ms,
753 fetch_latency_ms,
754 latest_event_ms: latest_order_event.max(latest_trade_event),
755 });
756 }
757
758 let fetch_started = Instant::now();
759 let fetched_at_ms = chrono::Utc::now().timestamp_millis() as u64;
760 let orders_result = self.rest_client.get_all_orders(&self.symbol, limit).await;
761 let storage_key = storage_symbol(&self.symbol, self.market);
762 let last_trade_id = order_store::load_last_trade_id(&storage_key).ok().flatten();
763 let persisted_trade_count = order_store::load_trade_count(&storage_key).unwrap_or(0);
764 let need_backfill = persisted_trade_count < limit;
765 let trades_result = match (need_backfill, last_trade_id) {
766 (true, _) => {
767 self.rest_client
768 .get_my_trades_history(&self.symbol, limit.max(1))
769 .await
770 }
771 (false, Some(last_id)) => {
772 self.rest_client
773 .get_my_trades_since(&self.symbol, last_id.saturating_add(1), 10)
774 .await
775 }
776 (false, None) => {
777 self.rest_client
778 .get_my_trades_history(&self.symbol, limit.max(1))
779 .await
780 }
781 };
782 let fetch_latency_ms = fetch_started.elapsed().as_millis() as u64;
783 let trade_data_complete = trades_result.is_ok();
784
785 if orders_result.is_err() && trades_result.is_err() {
786 let oe = orders_result.err().unwrap();
787 let te = trades_result.err().unwrap();
788 return Err(anyhow::anyhow!(
789 "order history fetch failed: allOrders={} | myTrades={}",
790 oe,
791 te
792 ));
793 }
794
795 let mut orders = match orders_result {
796 Ok(v) => v,
797 Err(e) => {
798 tracing::warn!(error = %e, "Failed to fetch allOrders; falling back to trade-only history");
799 Vec::new()
800 }
801 };
802 let recent_trades = match trades_result {
803 Ok(t) => t,
804 Err(e) => {
805 tracing::warn!(error = %e, "Failed to fetch myTrades; falling back to order-only history");
806 Vec::new()
807 }
808 };
809 let mut trades = recent_trades.clone();
810 orders.sort_by_key(|o| o.update_time.max(o.time));
811
812 if let Err(e) = order_store::persist_order_snapshot(&storage_key, &orders, &recent_trades) {
813 tracing::warn!(error = %e, "Failed to persist order snapshot to sqlite");
814 }
815 let mut persisted_source_by_order_id: HashMap<u64, String> = HashMap::new();
816 match order_store::load_persisted_trades(&storage_key) {
817 Ok(saved) => {
818 if !saved.is_empty() {
819 trades = saved.iter().map(|r| r.trade.clone()).collect();
820 for row in saved {
821 persisted_source_by_order_id
822 .entry(row.trade.order_id)
823 .or_insert(row.source);
824 }
825 }
826 }
827 Err(e) => {
828 tracing::warn!(error = %e, "Failed to load persisted trades; using recent API trades");
829 }
830 }
831
832 let (stats, open_pos) = compute_trade_state(trades.clone(), &self.symbol);
833 let estimated_total_pnl_usdt = if self.last_price > 0.0 {
834 Some(stats.realized_pnl + (open_pos.qty * self.last_price - open_pos.cost_quote))
835 } else {
836 Some(stats.realized_pnl)
837 };
838 let latest_order_event = orders.iter().map(|o| o.update_time.max(o.time)).max();
839 let latest_trade_event = trades.iter().map(|t| t.time).max();
840 let latest_event_ms = latest_order_event.max(latest_trade_event);
841
842 let mut trades_by_order_id: HashMap<u64, Vec<BinanceMyTrade>> = HashMap::new();
843 for trade in &trades {
844 trades_by_order_id
845 .entry(trade.order_id)
846 .or_default()
847 .push(trade.clone());
848 }
849 for bucket in trades_by_order_id.values_mut() {
850 bucket.sort_by_key(|t| t.time);
851 }
852
853 let mut order_source_by_id = HashMap::new();
854 for o in &orders {
855 order_source_by_id.insert(
856 o.order_id,
857 source_label_from_client_order_id(&o.client_order_id).to_string(),
858 );
859 }
860 for (order_id, source) in persisted_source_by_order_id {
861 order_source_by_id.entry(order_id).or_insert(source);
862 }
863 let mut strategy_stats =
864 compute_trade_stats_by_source(trades.clone(), &order_source_by_id, &self.symbol);
865 let persisted_stats = to_persistable_stats_map(&strategy_stats);
866 if let Err(e) = order_store::persist_strategy_symbol_stats(&storage_key, &persisted_stats) {
867 tracing::warn!(error = %e, "Failed to persist strategy+symbol scoped stats");
868 }
869 if strategy_stats.is_empty() {
870 match order_store::load_strategy_symbol_stats(&storage_key) {
871 Ok(persisted) => {
872 strategy_stats = from_persisted_stats_map(persisted);
873 }
874 Err(e) => {
875 tracing::warn!(error = %e, "Failed to load persisted strategy+symbol stats");
876 }
877 }
878 }
879
880 let mut history = Vec::new();
881 let mut fills = Vec::new();
882 let mut used_trade_ids = std::collections::HashSet::new();
883
884 if orders.is_empty() && !trades.is_empty() {
885 let mut sorted = trades;
886 sorted.sort_by_key(|t| (t.time, t.id));
887 history.extend(sorted.iter().map(|t| {
888 fills.push(OrderHistoryFill {
889 timestamp_ms: t.time,
890 side: if t.is_buyer {
891 OrderSide::Buy
892 } else {
893 OrderSide::Sell
894 },
895 price: t.price,
896 });
897 format_trade_history_row(
898 t,
899 order_source_by_id
900 .get(&t.order_id)
901 .map(String::as_str)
902 .unwrap_or("UNKNOWN"),
903 )
904 }));
905 return Ok(OrderHistorySnapshot {
906 rows: history,
907 stats,
908 strategy_stats,
909 fills,
910 open_qty: open_pos.qty,
911 open_entry_price: if open_pos.qty > f64::EPSILON {
912 open_pos.cost_quote / open_pos.qty
913 } else {
914 0.0
915 },
916 estimated_total_pnl_usdt,
917 trade_data_complete,
918 fetched_at_ms,
919 fetch_latency_ms,
920 latest_event_ms,
921 });
922 }
923
924 for o in orders {
925 if o.executed_qty > 0.0 {
926 if let Some(order_trades) = trades_by_order_id.get(&o.order_id) {
927 for t in order_trades {
928 used_trade_ids.insert(t.id);
929 let side = if t.is_buyer { "BUY" } else { "SELL" };
930 fills.push(OrderHistoryFill {
931 timestamp_ms: t.time,
932 side: if t.is_buyer {
933 OrderSide::Buy
934 } else {
935 OrderSide::Sell
936 },
937 price: t.price,
938 });
939 history.push(format_order_history_row(
940 t.time,
941 "FILLED",
942 side,
943 t.qty,
944 t.price,
945 &format!(
946 "{}#T{} [{}]",
947 o.client_order_id,
948 t.id,
949 source_label_from_client_order_id(&o.client_order_id)
950 ),
951 ));
952 }
953 continue;
954 }
955 }
956
957 let avg_price = if o.executed_qty > 0.0 {
958 o.cummulative_quote_qty / o.executed_qty
959 } else {
960 o.price
961 };
962 history.push(format_order_history_row(
963 o.update_time.max(o.time),
964 &o.status,
965 &o.side,
966 display_qty_for_history(&o.status, o.orig_qty, o.executed_qty),
967 avg_price,
968 &o.client_order_id,
969 ));
970 }
971
972 for bucket in trades_by_order_id.values() {
974 for t in bucket {
975 if !used_trade_ids.contains(&t.id) {
976 fills.push(OrderHistoryFill {
977 timestamp_ms: t.time,
978 side: if t.is_buyer {
979 OrderSide::Buy
980 } else {
981 OrderSide::Sell
982 },
983 price: t.price,
984 });
985 history.push(format_trade_history_row(
986 t,
987 order_source_by_id
988 .get(&t.order_id)
989 .map(String::as_str)
990 .unwrap_or("UNKNOWN"),
991 ));
992 }
993 }
994 }
995 Ok(OrderHistorySnapshot {
996 rows: history,
997 stats,
998 strategy_stats,
999 fills,
1000 open_qty: open_pos.qty,
1001 open_entry_price: if open_pos.qty > f64::EPSILON {
1002 open_pos.cost_quote / open_pos.qty
1003 } else {
1004 0.0
1005 },
1006 estimated_total_pnl_usdt,
1007 trade_data_complete,
1008 fetched_at_ms,
1009 fetch_latency_ms,
1010 latest_event_ms,
1011 })
1012 }
1013
1014 pub async fn submit_order(
1039 &mut self,
1040 signal: Signal,
1041 source_tag: &str,
1042 ) -> Result<Option<OrderUpdate>> {
1043 let side = match &signal {
1044 Signal::Buy => OrderSide::Buy,
1045 Signal::Sell => OrderSide::Sell,
1046 Signal::Hold => return Ok(None),
1047 };
1048 let source_tag = source_tag.to_ascii_lowercase();
1049 let intent = OrderIntent {
1050 intent_id: format!("intent-{}", &uuid::Uuid::new_v4().to_string()[..8]),
1051 source_tag: source_tag.clone(),
1052 symbol: self.symbol.clone(),
1053 market: self.market,
1054 side,
1055 order_amount_usdt: self.order_amount_usdt,
1056 last_price: self.last_price,
1057 created_at_ms: chrono::Utc::now().timestamp_millis() as u64,
1058 };
1059 if let Some((reason_code, reason)) =
1060 self.evaluate_strategy_limits(&intent.source_tag, intent.created_at_ms)
1061 {
1062 return Ok(Some(OrderUpdate::Rejected {
1063 intent_id: intent.intent_id.clone(),
1064 client_order_id: "n/a".to_string(),
1065 reason_code,
1066 reason,
1067 }));
1068 }
1069 let decision = self
1070 .risk_module
1071 .evaluate_intent(&intent, &self.balances)
1072 .await?;
1073 if !decision.approved {
1074 return Ok(Some(OrderUpdate::Rejected {
1075 intent_id: intent.intent_id.clone(),
1076 client_order_id: "n/a".to_string(),
1077 reason_code: decision
1078 .reason_code
1079 .unwrap_or_else(|| RejectionReasonCode::RiskUnknown.as_str().to_string()),
1080 reason: decision
1081 .reason
1082 .unwrap_or_else(|| "Rejected by RiskModule".to_string()),
1083 }));
1084 }
1085 if !self.risk_module.reserve_rate_budget() {
1086 return Ok(Some(OrderUpdate::Rejected {
1087 intent_id: intent.intent_id.clone(),
1088 client_order_id: "n/a".to_string(),
1089 reason_code: RejectionReasonCode::RateGlobalBudgetExceeded
1090 .as_str()
1091 .to_string(),
1092 reason: "Global rate budget exceeded; try again after reset".to_string(),
1093 }));
1094 }
1095 if !self
1096 .risk_module
1097 .reserve_endpoint_budget(ApiEndpointGroup::Orders)
1098 {
1099 return Ok(Some(OrderUpdate::Rejected {
1100 intent_id: intent.intent_id.clone(),
1101 client_order_id: "n/a".to_string(),
1102 reason_code: RejectionReasonCode::RateEndpointBudgetExceeded
1103 .as_str()
1104 .to_string(),
1105 reason: "Orders endpoint budget exceeded; try again after reset".to_string(),
1106 }));
1107 }
1108 let qty = decision.normalized_qty;
1109 if let Some((reason_code, reason)) = self.evaluate_symbol_exposure_limit(side, qty) {
1110 return Ok(Some(OrderUpdate::Rejected {
1111 intent_id: intent.intent_id.clone(),
1112 client_order_id: "n/a".to_string(),
1113 reason_code,
1114 reason,
1115 }));
1116 }
1117 self.mark_strategy_submit(&intent.source_tag, intent.created_at_ms);
1118
1119 let client_order_id = format!(
1120 "sq-{}-{}",
1121 intent.source_tag,
1122 &uuid::Uuid::new_v4().to_string()[..8]
1123 );
1124
1125 let order = Order {
1126 client_order_id: client_order_id.clone(),
1127 server_order_id: None,
1128 symbol: self.symbol.clone(),
1129 side,
1130 order_type: OrderType::Market,
1131 quantity: qty,
1132 price: None,
1133 status: OrderStatus::PendingSubmit,
1134 created_at: chrono::Utc::now(),
1135 updated_at: chrono::Utc::now(),
1136 fills: vec![],
1137 };
1138
1139 self.active_orders.insert(client_order_id.clone(), order);
1140
1141 tracing::info!(
1142 side = %side,
1143 qty,
1144 usdt_amount = intent.order_amount_usdt,
1145 price = intent.last_price,
1146 intent_id = %intent.intent_id,
1147 created_at_ms = intent.created_at_ms,
1148 "Submitting order"
1149 );
1150
1151 let submit_res = if self.market == MarketKind::Futures {
1152 self.rest_client
1153 .place_futures_market_order(&self.symbol, side, qty, &client_order_id)
1154 .await
1155 } else {
1156 self.rest_client
1157 .place_market_order(&self.symbol, side, qty, &client_order_id)
1158 .await
1159 };
1160
1161 match submit_res {
1162 Ok(response) => {
1163 let update = self.process_order_response(
1164 &intent.intent_id,
1165 &client_order_id,
1166 side,
1167 &response,
1168 );
1169
1170 if matches!(update, OrderUpdate::Filled { .. }) {
1172 if let Err(e) = self.refresh_balances().await {
1173 tracing::warn!(error = %e, "Failed to refresh balances after fill");
1174 }
1175 }
1176
1177 Ok(Some(update))
1178 }
1179 Err(e) => {
1180 tracing::error!(
1181 client_order_id,
1182 error = %e,
1183 "Order rejected"
1184 );
1185 if let Some(order) = self.active_orders.get_mut(&client_order_id) {
1186 order.status = OrderStatus::Rejected;
1187 order.updated_at = chrono::Utc::now();
1188 }
1189 Ok(Some(OrderUpdate::Rejected {
1190 intent_id: intent.intent_id.clone(),
1191 client_order_id,
1192 reason_code: RejectionReasonCode::BrokerSubmitFailed.as_str().to_string(),
1193 reason: e.to_string(),
1194 }))
1195 }
1196 }
1197 }
1198
1199 fn process_order_response(
1200 &mut self,
1201 intent_id: &str,
1202 client_order_id: &str,
1203 side: OrderSide,
1204 response: &BinanceOrderResponse,
1205 ) -> OrderUpdate {
1206 let fills: Vec<Fill> = response
1207 .fills
1208 .iter()
1209 .map(|f| Fill {
1210 price: f.price,
1211 qty: f.qty,
1212 commission: f.commission,
1213 commission_asset: f.commission_asset.clone(),
1214 })
1215 .collect();
1216
1217 let status = OrderStatus::from_binance_str(&response.status);
1218
1219 if let Some(order) = self.active_orders.get_mut(client_order_id) {
1220 order.server_order_id = Some(response.order_id);
1221 order.status = status;
1222 order.fills = fills.clone();
1223 order.updated_at = chrono::Utc::now();
1224 }
1225
1226 if status == OrderStatus::Filled || status == OrderStatus::PartiallyFilled {
1227 self.position.apply_fill(side, &fills);
1228
1229 let avg_price = if fills.is_empty() {
1230 0.0
1231 } else {
1232 let total_value: f64 = fills.iter().map(|f| f.price * f.qty).sum();
1233 let total_qty: f64 = fills.iter().map(|f| f.qty).sum();
1234 total_value / total_qty
1235 };
1236
1237 tracing::info!(
1238 client_order_id,
1239 order_id = response.order_id,
1240 side = %side,
1241 avg_price,
1242 filled_qty = response.executed_qty,
1243 "Order filled"
1244 );
1245
1246 OrderUpdate::Filled {
1247 intent_id: intent_id.to_string(),
1248 client_order_id: client_order_id.to_string(),
1249 side,
1250 fills,
1251 avg_price,
1252 }
1253 } else {
1254 OrderUpdate::Submitted {
1255 intent_id: intent_id.to_string(),
1256 client_order_id: client_order_id.to_string(),
1257 server_order_id: response.order_id,
1258 }
1259 }
1260 }
1261}
1262
1263#[cfg(test)]
1264mod tests {
1265 use super::{display_qty_for_history, split_symbol_assets, OrderManager};
1266 use crate::binance::rest::BinanceRestClient;
1267 use crate::config::{EndpointRateLimitConfig, RiskConfig, SymbolExposureLimitConfig};
1268 use crate::model::order::{Order, OrderSide, OrderStatus, OrderType};
1269 use std::sync::Arc;
1270
1271 fn build_test_order_manager() -> OrderManager {
1272 let rest = Arc::new(BinanceRestClient::new(
1273 "https://demo-api.binance.com",
1274 "https://demo-fapi.binance.com",
1275 "k",
1276 "s",
1277 "fk",
1278 "fs",
1279 5000,
1280 ));
1281 let risk = RiskConfig {
1282 global_rate_limit_per_minute: 600,
1283 default_strategy_cooldown_ms: 3_000,
1284 default_strategy_max_active_orders: 1,
1285 default_symbol_max_exposure_usdt: 200.0,
1286 strategy_limits: vec![],
1287 symbol_exposure_limits: vec![SymbolExposureLimitConfig {
1288 symbol: "BTCUSDT".to_string(),
1289 market: Some("spot".to_string()),
1290 max_exposure_usdt: 150.0,
1291 }],
1292 endpoint_rate_limits: EndpointRateLimitConfig {
1293 orders_per_minute: 240,
1294 account_per_minute: 180,
1295 market_data_per_minute: 360,
1296 },
1297 };
1298 OrderManager::new(
1299 rest,
1300 "BTCUSDT",
1301 crate::order_manager::MarketKind::Spot,
1302 10.0,
1303 &risk,
1304 )
1305 }
1306
1307 #[test]
1308 fn valid_state_transitions() {
1309 let from = OrderStatus::PendingSubmit;
1311 let to = OrderStatus::Submitted;
1312 assert!(!from.is_terminal());
1313 assert!(!to.is_terminal());
1314
1315 let to = OrderStatus::Filled;
1317 assert!(to.is_terminal());
1318
1319 let to = OrderStatus::Rejected;
1321 assert!(to.is_terminal());
1322
1323 let to = OrderStatus::Cancelled;
1325 assert!(to.is_terminal());
1326 }
1327
1328 #[test]
1329 fn from_binance_str_mapping() {
1330 assert_eq!(OrderStatus::from_binance_str("NEW"), OrderStatus::Submitted);
1331 assert_eq!(OrderStatus::from_binance_str("FILLED"), OrderStatus::Filled);
1332 assert_eq!(
1333 OrderStatus::from_binance_str("CANCELED"),
1334 OrderStatus::Cancelled
1335 );
1336 assert_eq!(
1337 OrderStatus::from_binance_str("REJECTED"),
1338 OrderStatus::Rejected
1339 );
1340 assert_eq!(
1341 OrderStatus::from_binance_str("EXPIRED"),
1342 OrderStatus::Expired
1343 );
1344 assert_eq!(
1345 OrderStatus::from_binance_str("PARTIALLY_FILLED"),
1346 OrderStatus::PartiallyFilled
1347 );
1348 }
1349
1350 #[test]
1351 fn order_history_uses_executed_qty_for_filled_states() {
1352 assert!((display_qty_for_history("FILLED", 1.0, 0.4) - 0.4).abs() < f64::EPSILON);
1353 assert!((display_qty_for_history("PARTIALLY_FILLED", 1.0, 0.4) - 0.4).abs() < f64::EPSILON);
1354 }
1355
1356 #[test]
1357 fn order_history_uses_orig_qty_for_non_filled_states() {
1358 assert!((display_qty_for_history("NEW", 1.0, 0.4) - 1.0).abs() < f64::EPSILON);
1359 assert!((display_qty_for_history("CANCELED", 1.0, 0.4) - 1.0).abs() < f64::EPSILON);
1360 assert!((display_qty_for_history("REJECTED", 1.0, 0.0) - 1.0).abs() < f64::EPSILON);
1361 }
1362
1363 #[test]
1364 fn split_symbol_assets_parses_known_quote_suffixes() {
1365 assert_eq!(
1366 split_symbol_assets("ETHUSDT"),
1367 ("ETH".to_string(), "USDT".to_string())
1368 );
1369 assert_eq!(
1370 split_symbol_assets("ETHBTC"),
1371 ("ETH".to_string(), "BTC".to_string())
1372 );
1373 }
1374
1375 #[test]
1376 fn split_symbol_assets_falls_back_when_quote_unknown() {
1377 assert_eq!(
1378 split_symbol_assets("FOOBAR"),
1379 ("FOOBAR".to_string(), String::new())
1380 );
1381 }
1382
1383 #[test]
1384 fn strategy_limit_rejects_when_active_orders_reach_limit() {
1385 let mut mgr = build_test_order_manager();
1386 let client_order_id = "sq-cfg-abcdef12".to_string();
1387 mgr.active_orders.insert(
1388 client_order_id.clone(),
1389 Order {
1390 client_order_id,
1391 server_order_id: None,
1392 symbol: "BTCUSDT".to_string(),
1393 side: OrderSide::Buy,
1394 order_type: OrderType::Market,
1395 quantity: 0.1,
1396 price: None,
1397 status: OrderStatus::Submitted,
1398 created_at: chrono::Utc::now(),
1399 updated_at: chrono::Utc::now(),
1400 fills: vec![],
1401 },
1402 );
1403
1404 let rejected = mgr
1405 .evaluate_strategy_limits("cfg", chrono::Utc::now().timestamp_millis() as u64)
1406 .expect("must be rejected");
1407 assert_eq!(
1408 rejected.0,
1409 "risk.strategy_max_active_orders_exceeded".to_string()
1410 );
1411 }
1412
1413 #[test]
1414 fn strategy_limit_rejects_during_cooldown_window() {
1415 let mut mgr = build_test_order_manager();
1416 let now = chrono::Utc::now().timestamp_millis() as u64;
1417 mgr.mark_strategy_submit("cfg", now);
1418
1419 let rejected = mgr
1420 .evaluate_strategy_limits("cfg", now + 500)
1421 .expect("must be rejected");
1422 assert_eq!(rejected.0, "risk.strategy_cooldown_active".to_string());
1423 }
1424
1425 #[test]
1426 fn symbol_exposure_limit_rejects_when_projected_notional_exceeds_limit() {
1427 let mut mgr = build_test_order_manager();
1428 mgr.last_price = 100.0;
1429 let rejected = mgr
1431 .evaluate_symbol_exposure_limit(OrderSide::Buy, 2.0)
1432 .expect("must be rejected");
1433 assert_eq!(rejected.0, "risk.symbol_exposure_limit_exceeded".to_string());
1434 }
1435
1436 #[test]
1437 fn symbol_exposure_limit_allows_risk_reducing_order() {
1438 let mut mgr = build_test_order_manager();
1439 mgr.last_price = 100.0;
1440 mgr.position.side = Some(OrderSide::Buy);
1441 mgr.position.qty = 2.0; let rejected = mgr.evaluate_symbol_exposure_limit(OrderSide::Sell, 1.0);
1445 assert!(rejected.is_none());
1446 }
1447}