1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Instant;
4
5use anyhow::Result;
6use chrono::TimeZone;
7
8use crate::binance::rest::BinanceRestClient;
9use crate::binance::types::{BinanceMyTrade, BinanceOrderResponse};
10use crate::config::RiskConfig;
11use crate::model::order::{Fill, Order, OrderSide, OrderStatus, OrderType};
12use crate::model::position::Position;
13use crate::model::signal::Signal;
14use crate::order_store;
15use crate::risk_module::{
16 ApiEndpointGroup, EndpointRateLimits, OrderIntent, RateBudgetSnapshot, RejectionReasonCode,
17 RiskModule,
18};
19
20pub use crate::risk_module::MarketKind;
21
22#[derive(Debug, Clone)]
23pub enum OrderUpdate {
24 Submitted {
25 intent_id: String,
26 client_order_id: String,
27 server_order_id: u64,
28 },
29 Filled {
30 intent_id: String,
31 client_order_id: String,
32 side: OrderSide,
33 fills: Vec<Fill>,
34 avg_price: f64,
35 },
36 Rejected {
37 intent_id: String,
38 client_order_id: String,
39 reason_code: String,
40 reason: String,
41 },
42}
43
44#[derive(Debug, Clone, Default)]
45pub struct OrderHistoryStats {
46 pub trade_count: u32,
47 pub win_count: u32,
48 pub lose_count: u32,
49 pub realized_pnl: f64,
50}
51
52#[derive(Debug, Clone, Default)]
53pub struct OrderHistorySnapshot {
54 pub rows: Vec<String>,
55 pub stats: OrderHistoryStats,
56 pub strategy_stats: HashMap<String, OrderHistoryStats>,
57 pub fills: Vec<OrderHistoryFill>,
58 pub open_qty: f64,
59 pub open_entry_price: f64,
60 pub estimated_total_pnl_usdt: Option<f64>,
61 pub trade_data_complete: bool,
62 pub fetched_at_ms: u64,
63 pub fetch_latency_ms: u64,
64 pub latest_event_ms: Option<u64>,
65}
66
67#[derive(Debug, Clone)]
68pub struct OrderHistoryFill {
69 pub timestamp_ms: u64,
70 pub side: OrderSide,
71 pub price: f64,
72}
73
74pub struct OrderManager {
75 rest_client: Arc<BinanceRestClient>,
76 active_orders: HashMap<String, Order>,
77 position: Position,
78 symbol: String,
79 market: MarketKind,
80 order_amount_usdt: f64,
81 balances: HashMap<String, f64>,
82 last_price: f64,
83 risk_module: RiskModule,
84 default_strategy_cooldown_ms: u64,
85 default_strategy_max_active_orders: u32,
86 strategy_limits_by_tag: HashMap<String, StrategyExecutionLimit>,
87 last_strategy_submit_ms: HashMap<String, u64>,
88 default_symbol_max_exposure_usdt: f64,
89 symbol_exposure_limit_by_key: HashMap<String, f64>,
90}
91
92#[derive(Debug, Clone, Copy)]
93struct StrategyExecutionLimit {
94 cooldown_ms: u64,
95 max_active_orders: u32,
96}
97
98fn normalize_market_label(market: MarketKind) -> &'static str {
99 match market {
100 MarketKind::Spot => "spot",
101 MarketKind::Futures => "futures",
102 }
103}
104
105fn symbol_limit_key(symbol: &str, market: MarketKind) -> String {
106 format!(
107 "{}:{}",
108 symbol.trim().to_ascii_uppercase(),
109 normalize_market_label(market)
110 )
111}
112
113fn storage_symbol(symbol: &str, market: MarketKind) -> String {
114 match market {
115 MarketKind::Spot => symbol.to_string(),
116 MarketKind::Futures => format!("{}#FUT", symbol),
117 }
118}
119
120fn display_qty_for_history(status: &str, orig_qty: f64, executed_qty: f64) -> f64 {
121 match status {
122 "FILLED" | "PARTIALLY_FILLED" => executed_qty,
123 _ => orig_qty,
124 }
125}
126
127fn format_history_time(timestamp_ms: u64) -> String {
128 chrono::Utc
129 .timestamp_millis_opt(timestamp_ms as i64)
130 .single()
131 .map(|dt| {
132 dt.with_timezone(&chrono::Local)
133 .format("%H:%M:%S")
134 .to_string()
135 })
136 .unwrap_or_else(|| "--:--:--".to_string())
137}
138
139fn format_order_history_row(
140 timestamp_ms: u64,
141 status: &str,
142 side: &str,
143 qty: f64,
144 avg_price: f64,
145 client_order_id: &str,
146) -> String {
147 format!(
148 "{} {:<10} {:<4} {:.5} @ {:.2} {}",
149 format_history_time(timestamp_ms),
150 status,
151 side,
152 qty,
153 avg_price,
154 client_order_id
155 )
156}
157
158fn source_label_from_client_order_id(client_order_id: &str) -> &'static str {
159 if client_order_id.contains("-mnl-") {
160 "MANUAL"
161 } else if client_order_id.contains("-cfg-") {
162 "MA(Config)"
163 } else if client_order_id.contains("-fst-") {
164 "MA(Fast 5/20)"
165 } else if client_order_id.contains("-slw-") {
166 "MA(Slow 20/60)"
167 } else {
168 "UNKNOWN"
169 }
170}
171
172fn format_trade_history_row(t: &BinanceMyTrade, source: &str) -> String {
173 let side = if t.is_buyer { "BUY" } else { "SELL" };
174 format_order_history_row(
175 t.time,
176 "FILLED",
177 side,
178 t.qty,
179 t.price,
180 &format!("order#{}#T{} [{}]", t.order_id, t.id, source),
181 )
182}
183
184fn split_symbol_assets(symbol: &str) -> (String, String) {
185 const QUOTE_SUFFIXES: [&str; 10] = [
186 "USDT", "USDC", "FDUSD", "BUSD", "TUSD", "TRY", "EUR", "BTC", "ETH", "BNB",
187 ];
188 for q in QUOTE_SUFFIXES {
189 if let Some(base) = symbol.strip_suffix(q) {
190 if !base.is_empty() {
191 return (base.to_string(), q.to_string());
192 }
193 }
194 }
195 (symbol.to_string(), String::new())
196}
197
198#[derive(Clone, Copy, Default)]
199struct LongPos {
200 qty: f64,
201 cost_quote: f64,
202}
203
204fn apply_spot_trade_with_fee(
205 pos: &mut LongPos,
206 stats: &mut OrderHistoryStats,
207 t: &BinanceMyTrade,
208 base_asset: &str,
209 quote_asset: &str,
210) {
211 let qty = t.qty.max(0.0);
212 if qty <= f64::EPSILON {
213 return;
214 }
215 let fee_asset = t.commission_asset.as_str();
216 let fee_is_base = !base_asset.is_empty() && fee_asset.eq_ignore_ascii_case(base_asset);
217 let fee_is_quote = !quote_asset.is_empty() && fee_asset.eq_ignore_ascii_case(quote_asset);
218
219 if t.is_buyer {
220 let net_qty = (qty
221 - if fee_is_base {
222 t.commission.max(0.0)
223 } else {
224 0.0
225 })
226 .max(0.0);
227 if net_qty <= f64::EPSILON {
228 return;
229 }
230 let fee_quote = if fee_is_quote {
231 t.commission.max(0.0)
232 } else {
233 0.0
234 };
235 pos.qty += net_qty;
236 pos.cost_quote += qty * t.price + fee_quote;
237 return;
238 }
239
240 if pos.qty <= f64::EPSILON {
242 return;
243 }
244 let close_qty = qty.min(pos.qty);
245 if close_qty <= f64::EPSILON {
246 return;
247 }
248 let avg_cost = pos.cost_quote / pos.qty.max(f64::EPSILON);
249 let fee_quote_total = if fee_is_quote {
250 t.commission.max(0.0)
251 } else if fee_is_base {
252 t.commission.max(0.0) * t.price
254 } else {
255 0.0
256 };
257 let fee_quote = fee_quote_total * (close_qty / qty.max(f64::EPSILON));
258 let pnl_delta = (close_qty * t.price - fee_quote) - (avg_cost * close_qty);
259 if pnl_delta > 0.0 {
260 stats.win_count += 1;
261 stats.trade_count += 1;
262 } else if pnl_delta < 0.0 {
263 stats.lose_count += 1;
264 stats.trade_count += 1;
265 }
266 stats.realized_pnl += pnl_delta;
267
268 pos.qty -= close_qty;
269 pos.cost_quote -= avg_cost * close_qty;
270 if pos.qty <= f64::EPSILON {
271 pos.qty = 0.0;
272 pos.cost_quote = 0.0;
273 }
274}
275
276fn compute_trade_state(
277 mut trades: Vec<BinanceMyTrade>,
278 symbol: &str,
279) -> (OrderHistoryStats, LongPos) {
280 trades.sort_by_key(|t| (t.time, t.id));
281 let (base_asset, quote_asset) = split_symbol_assets(symbol);
282 let mut pos = LongPos::default();
283 let mut stats = OrderHistoryStats::default();
284 for t in trades {
285 apply_spot_trade_with_fee(&mut pos, &mut stats, &t, &base_asset, "e_asset);
286 }
287 (stats, pos)
288}
289
290fn compute_trade_stats_by_source(
291 mut trades: Vec<BinanceMyTrade>,
292 order_source_by_id: &HashMap<u64, String>,
293 symbol: &str,
294) -> HashMap<String, OrderHistoryStats> {
295 trades.sort_by_key(|t| (t.time, t.id));
296 let (base_asset, quote_asset) = split_symbol_assets(symbol);
297 let mut pos_by_source: HashMap<String, LongPos> = HashMap::new();
298 let mut stats_by_source: HashMap<String, OrderHistoryStats> = HashMap::new();
299
300 for t in trades {
301 let source = order_source_by_id
302 .get(&t.order_id)
303 .cloned()
304 .unwrap_or_else(|| "UNKNOWN".to_string());
305 let pos = pos_by_source.entry(source.clone()).or_default();
306 let stats = stats_by_source.entry(source).or_default();
307 apply_spot_trade_with_fee(pos, stats, &t, &base_asset, "e_asset);
308 }
309
310 stats_by_source
311}
312
313fn to_persistable_stats_map(
314 strategy_stats: &HashMap<String, OrderHistoryStats>,
315) -> HashMap<String, order_store::StrategyScopedStats> {
316 strategy_stats
317 .iter()
318 .map(|(k, v)| {
319 (
320 k.clone(),
321 order_store::StrategyScopedStats {
322 trade_count: v.trade_count,
323 win_count: v.win_count,
324 lose_count: v.lose_count,
325 realized_pnl: v.realized_pnl,
326 },
327 )
328 })
329 .collect()
330}
331
332fn from_persisted_stats_map(
333 persisted: HashMap<String, order_store::StrategyScopedStats>,
334) -> HashMap<String, OrderHistoryStats> {
335 persisted
336 .into_iter()
337 .map(|(k, v)| {
338 (
339 k,
340 OrderHistoryStats {
341 trade_count: v.trade_count,
342 win_count: v.win_count,
343 lose_count: v.lose_count,
344 realized_pnl: v.realized_pnl,
345 },
346 )
347 })
348 .collect()
349}
350
351impl OrderManager {
352 pub fn new(
361 rest_client: Arc<BinanceRestClient>,
362 symbol: &str,
363 market: MarketKind,
364 order_amount_usdt: f64,
365 risk_config: &RiskConfig,
366 ) -> Self {
367 let mut strategy_limits_by_tag = HashMap::new();
368 let mut symbol_exposure_limit_by_key = HashMap::new();
369 let default_strategy_cooldown_ms = risk_config.default_strategy_cooldown_ms;
370 let default_strategy_max_active_orders = risk_config.default_strategy_max_active_orders.max(1);
371 let default_symbol_max_exposure_usdt = risk_config.default_symbol_max_exposure_usdt.max(0.0);
372 for profile in &risk_config.strategy_limits {
373 let source_tag = profile.source_tag.trim().to_ascii_lowercase();
374 if source_tag.is_empty() {
375 continue;
376 }
377 strategy_limits_by_tag.insert(
378 source_tag,
379 StrategyExecutionLimit {
380 cooldown_ms: profile
381 .cooldown_ms
382 .unwrap_or(default_strategy_cooldown_ms),
383 max_active_orders: profile
384 .max_active_orders
385 .unwrap_or(default_strategy_max_active_orders)
386 .max(1),
387 },
388 );
389 }
390 for limit in &risk_config.symbol_exposure_limits {
391 let symbol = limit.symbol.trim().to_ascii_uppercase();
392 if symbol.is_empty() {
393 continue;
394 }
395 let market = match limit
396 .market
397 .as_deref()
398 .unwrap_or("spot")
399 .trim()
400 .to_ascii_lowercase()
401 .as_str()
402 {
403 "spot" => MarketKind::Spot,
404 "futures" | "future" | "fut" => MarketKind::Futures,
405 _ => continue,
406 };
407 symbol_exposure_limit_by_key.insert(
408 symbol_limit_key(&symbol, market),
409 limit.max_exposure_usdt.max(0.0),
410 );
411 }
412 Self {
413 rest_client: rest_client.clone(),
414 active_orders: HashMap::new(),
415 position: Position::new(symbol.to_string()),
416 symbol: symbol.to_string(),
417 market,
418 order_amount_usdt,
419 balances: HashMap::new(),
420 last_price: 0.0,
421 risk_module: RiskModule::new(
422 rest_client.clone(),
423 risk_config.global_rate_limit_per_minute,
424 EndpointRateLimits {
425 orders_per_minute: risk_config.endpoint_rate_limits.orders_per_minute,
426 account_per_minute: risk_config.endpoint_rate_limits.account_per_minute,
427 market_data_per_minute: risk_config
428 .endpoint_rate_limits
429 .market_data_per_minute,
430 },
431 ),
432 default_strategy_cooldown_ms,
433 default_strategy_max_active_orders,
434 strategy_limits_by_tag,
435 last_strategy_submit_ms: HashMap::new(),
436 default_symbol_max_exposure_usdt,
437 symbol_exposure_limit_by_key,
438 }
439 }
440
441 pub fn position(&self) -> &Position {
446 &self.position
447 }
448
449 pub fn balances(&self) -> &HashMap<String, f64> {
454 &self.balances
455 }
456
457 pub fn update_unrealized_pnl(&mut self, current_price: f64) {
463 self.last_price = current_price;
464 self.position.update_unrealized_pnl(current_price);
465 }
466
467 pub fn rate_budget_snapshot(&self) -> RateBudgetSnapshot {
471 self.risk_module.rate_budget_snapshot()
472 }
473
474 pub fn orders_rate_budget_snapshot(&self) -> RateBudgetSnapshot {
475 self.risk_module
476 .endpoint_budget_snapshot(ApiEndpointGroup::Orders)
477 }
478
479 pub fn account_rate_budget_snapshot(&self) -> RateBudgetSnapshot {
480 self.risk_module
481 .endpoint_budget_snapshot(ApiEndpointGroup::Account)
482 }
483
484 pub fn market_data_rate_budget_snapshot(&self) -> RateBudgetSnapshot {
485 self.risk_module
486 .endpoint_budget_snapshot(ApiEndpointGroup::MarketData)
487 }
488
489 fn strategy_limits_for(&self, source_tag: &str) -> StrategyExecutionLimit {
490 self.strategy_limits_by_tag
491 .get(source_tag)
492 .copied()
493 .unwrap_or(StrategyExecutionLimit {
494 cooldown_ms: self.default_strategy_cooldown_ms,
495 max_active_orders: self.default_strategy_max_active_orders,
496 })
497 }
498
499 fn active_order_count_for_source(&self, source_tag: &str) -> u32 {
500 let prefix = format!("sq-{}-", source_tag);
501 self.active_orders
502 .values()
503 .filter(|o| !o.status.is_terminal() && o.client_order_id.starts_with(&prefix))
504 .count() as u32
505 }
506
507 fn evaluate_strategy_limits(
508 &self,
509 source_tag: &str,
510 created_at_ms: u64,
511 ) -> Option<(String, String)> {
512 let limits = self.strategy_limits_for(source_tag);
513 let active_count = self.active_order_count_for_source(source_tag);
514 if active_count >= limits.max_active_orders {
515 return Some((
516 RejectionReasonCode::RiskStrategyMaxActiveOrdersExceeded
517 .as_str()
518 .to_string(),
519 format!(
520 "Strategy '{}' active order limit exceeded (active {}, limit {})",
521 source_tag, active_count, limits.max_active_orders
522 ),
523 ));
524 }
525
526 if limits.cooldown_ms > 0 {
527 if let Some(last_submit_ms) = self.last_strategy_submit_ms.get(source_tag) {
528 let elapsed = created_at_ms.saturating_sub(*last_submit_ms);
529 if elapsed < limits.cooldown_ms {
530 let remaining = limits.cooldown_ms - elapsed;
531 return Some((
532 RejectionReasonCode::RiskStrategyCooldownActive
533 .as_str()
534 .to_string(),
535 format!(
536 "Strategy '{}' cooldown active ({}ms remaining)",
537 source_tag, remaining
538 ),
539 ));
540 }
541 }
542 }
543
544 None
545 }
546
547 fn mark_strategy_submit(&mut self, source_tag: &str, created_at_ms: u64) {
548 self.last_strategy_submit_ms
549 .insert(source_tag.to_string(), created_at_ms);
550 }
551
552 fn max_symbol_exposure_usdt(&self) -> f64 {
553 self.symbol_exposure_limit_by_key
554 .get(&symbol_limit_key(&self.symbol, self.market))
555 .copied()
556 .unwrap_or(self.default_symbol_max_exposure_usdt)
557 }
558
559 fn projected_notional_after_fill(&self, side: OrderSide, qty: f64) -> (f64, f64) {
560 let price = self.last_price.max(0.0);
561 if price <= f64::EPSILON {
562 return (0.0, 0.0);
563 }
564 let current_qty_signed = match self.position.side {
565 Some(OrderSide::Buy) => self.position.qty,
566 Some(OrderSide::Sell) => -self.position.qty,
567 None => 0.0,
568 };
569 let delta = match side {
570 OrderSide::Buy => qty,
571 OrderSide::Sell => -qty,
572 };
573 let projected_qty_signed = current_qty_signed + delta;
574 (
575 current_qty_signed.abs() * price,
576 projected_qty_signed.abs() * price,
577 )
578 }
579
580 fn evaluate_symbol_exposure_limit(&self, side: OrderSide, qty: f64) -> Option<(String, String)> {
581 let max_exposure = self.max_symbol_exposure_usdt();
582 if max_exposure <= f64::EPSILON {
583 return None;
584 }
585 let (current_notional, projected_notional) = self.projected_notional_after_fill(side, qty);
586 if projected_notional > max_exposure && projected_notional > current_notional + f64::EPSILON {
587 return Some((
588 RejectionReasonCode::RiskSymbolExposureLimitExceeded
589 .as_str()
590 .to_string(),
591 format!(
592 "Symbol exposure limit exceeded for {} ({:?}): projected {:.2} USDT > limit {:.2} USDT",
593 self.symbol, self.market, projected_notional, max_exposure
594 ),
595 ));
596 }
597 None
598 }
599
600 pub fn would_exceed_symbol_exposure_limit(&self, side: OrderSide, qty: f64) -> bool {
604 self.evaluate_symbol_exposure_limit(side, qty).is_some()
605 }
606
607 pub async fn refresh_balances(&mut self) -> Result<HashMap<String, f64>> {
619 if !self
620 .risk_module
621 .reserve_endpoint_budget(ApiEndpointGroup::Account)
622 {
623 return Err(anyhow::anyhow!(
624 "Account endpoint budget exceeded; try again after reset"
625 ));
626 }
627 if self.market == MarketKind::Futures {
628 let account = self.rest_client.get_futures_account().await?;
629 self.balances.clear();
630 for a in &account.assets {
631 if a.wallet_balance.abs() > f64::EPSILON {
632 self.balances.insert(a.asset.clone(), a.available_balance);
633 }
634 }
635 return Ok(self.balances.clone());
636 }
637 let account = self.rest_client.get_account().await?;
638 self.balances.clear();
639 for b in &account.balances {
640 let total = b.free + b.locked;
641 if total > 0.0 {
642 self.balances.insert(b.asset.clone(), b.free);
643 }
644 }
645 tracing::info!(balances = ?self.balances, "Balances refreshed");
646 Ok(self.balances.clone())
647 }
648
649 pub async fn refresh_order_history(&mut self, limit: usize) -> Result<OrderHistorySnapshot> {
658 if !self
659 .risk_module
660 .reserve_endpoint_budget(ApiEndpointGroup::Orders)
661 {
662 return Err(anyhow::anyhow!(
663 "Orders endpoint budget exceeded; try again after reset"
664 ));
665 }
666 if self.market == MarketKind::Futures {
667 let fetch_started = Instant::now();
668 let fetched_at_ms = chrono::Utc::now().timestamp_millis() as u64;
669 let orders_result = self
670 .rest_client
671 .get_futures_all_orders(&self.symbol, limit)
672 .await;
673 let trades_result = self
674 .rest_client
675 .get_futures_my_trades_history(&self.symbol, limit.max(1))
676 .await;
677 let fetch_latency_ms = fetch_started.elapsed().as_millis() as u64;
678
679 if orders_result.is_err() && trades_result.is_err() {
680 let oe = orders_result.err().unwrap();
681 let te = trades_result.err().unwrap();
682 return Err(anyhow::anyhow!(
683 "futures order history fetch failed: allOrders={} | userTrades={}",
684 oe,
685 te
686 ));
687 }
688
689 let mut orders = orders_result.unwrap_or_default();
690 let trades = trades_result.unwrap_or_default();
691 orders.sort_by_key(|o| o.update_time.max(o.time));
692
693 let storage_key = storage_symbol(&self.symbol, self.market);
694 if let Err(e) = order_store::persist_order_snapshot(&storage_key, &orders, &trades) {
695 tracing::warn!(error = %e, "Failed to persist futures order snapshot to sqlite");
696 }
697
698 let mut history = Vec::new();
699 let mut fills = Vec::new();
700 for t in &trades {
701 let side = if t.is_buyer { "BUY" } else { "SELL" };
702 fills.push(OrderHistoryFill {
703 timestamp_ms: t.time,
704 side: if t.is_buyer {
705 OrderSide::Buy
706 } else {
707 OrderSide::Sell
708 },
709 price: t.price,
710 });
711 history.push(format_order_history_row(
712 t.time,
713 "FILLED",
714 side,
715 t.qty,
716 t.price,
717 &format!("order#{}#T{} [FUT]", t.order_id, t.id),
718 ));
719 }
720 for o in &orders {
721 if o.executed_qty <= 0.0 {
722 history.push(format_order_history_row(
723 o.update_time.max(o.time),
724 &o.status,
725 &o.side,
726 display_qty_for_history(&o.status, o.orig_qty, o.executed_qty),
727 if o.executed_qty > 0.0 {
728 o.cummulative_quote_qty / o.executed_qty
729 } else {
730 o.price
731 },
732 &o.client_order_id,
733 ));
734 }
735 }
736
737 let mut stats = OrderHistoryStats::default();
738 for t in &trades {
739 if t.realized_pnl > 0.0 {
740 stats.win_count += 1;
741 stats.trade_count += 1;
742 } else if t.realized_pnl < 0.0 {
743 stats.lose_count += 1;
744 stats.trade_count += 1;
745 }
746 stats.realized_pnl += t.realized_pnl;
747 }
748 let estimated_total_pnl_usdt = Some(stats.realized_pnl);
749 let latest_order_event = orders.iter().map(|o| o.update_time.max(o.time)).max();
750 let latest_trade_event = trades.iter().map(|t| t.time).max();
751 let strategy_stats = match order_store::load_strategy_symbol_stats(&storage_key) {
752 Ok(persisted) => from_persisted_stats_map(persisted),
753 Err(e) => {
754 tracing::warn!(error = %e, "Failed to load persisted strategy stats (futures)");
755 HashMap::new()
756 }
757 };
758 return Ok(OrderHistorySnapshot {
759 rows: history,
760 stats,
761 strategy_stats,
762 fills,
763 open_qty: 0.0,
764 open_entry_price: 0.0,
765 estimated_total_pnl_usdt,
766 trade_data_complete: true,
767 fetched_at_ms,
768 fetch_latency_ms,
769 latest_event_ms: latest_order_event.max(latest_trade_event),
770 });
771 }
772
773 let fetch_started = Instant::now();
774 let fetched_at_ms = chrono::Utc::now().timestamp_millis() as u64;
775 let orders_result = self.rest_client.get_all_orders(&self.symbol, limit).await;
776 let storage_key = storage_symbol(&self.symbol, self.market);
777 let last_trade_id = order_store::load_last_trade_id(&storage_key).ok().flatten();
778 let persisted_trade_count = order_store::load_trade_count(&storage_key).unwrap_or(0);
779 let need_backfill = persisted_trade_count < limit;
780 let trades_result = match (need_backfill, last_trade_id) {
781 (true, _) => {
782 self.rest_client
783 .get_my_trades_history(&self.symbol, limit.max(1))
784 .await
785 }
786 (false, Some(last_id)) => {
787 self.rest_client
788 .get_my_trades_since(&self.symbol, last_id.saturating_add(1), 10)
789 .await
790 }
791 (false, None) => {
792 self.rest_client
793 .get_my_trades_history(&self.symbol, limit.max(1))
794 .await
795 }
796 };
797 let fetch_latency_ms = fetch_started.elapsed().as_millis() as u64;
798 let trade_data_complete = trades_result.is_ok();
799
800 if orders_result.is_err() && trades_result.is_err() {
801 let oe = orders_result.err().unwrap();
802 let te = trades_result.err().unwrap();
803 return Err(anyhow::anyhow!(
804 "order history fetch failed: allOrders={} | myTrades={}",
805 oe,
806 te
807 ));
808 }
809
810 let mut orders = match orders_result {
811 Ok(v) => v,
812 Err(e) => {
813 tracing::warn!(error = %e, "Failed to fetch allOrders; falling back to trade-only history");
814 Vec::new()
815 }
816 };
817 let recent_trades = match trades_result {
818 Ok(t) => t,
819 Err(e) => {
820 tracing::warn!(error = %e, "Failed to fetch myTrades; falling back to order-only history");
821 Vec::new()
822 }
823 };
824 let mut trades = recent_trades.clone();
825 orders.sort_by_key(|o| o.update_time.max(o.time));
826
827 if let Err(e) = order_store::persist_order_snapshot(&storage_key, &orders, &recent_trades) {
828 tracing::warn!(error = %e, "Failed to persist order snapshot to sqlite");
829 }
830 let mut persisted_source_by_order_id: HashMap<u64, String> = HashMap::new();
831 match order_store::load_persisted_trades(&storage_key) {
832 Ok(saved) => {
833 if !saved.is_empty() {
834 trades = saved.iter().map(|r| r.trade.clone()).collect();
835 for row in saved {
836 persisted_source_by_order_id
837 .entry(row.trade.order_id)
838 .or_insert(row.source);
839 }
840 }
841 }
842 Err(e) => {
843 tracing::warn!(error = %e, "Failed to load persisted trades; using recent API trades");
844 }
845 }
846
847 let (stats, open_pos) = compute_trade_state(trades.clone(), &self.symbol);
848 let estimated_total_pnl_usdt = if self.last_price > 0.0 {
849 Some(stats.realized_pnl + (open_pos.qty * self.last_price - open_pos.cost_quote))
850 } else {
851 Some(stats.realized_pnl)
852 };
853 let latest_order_event = orders.iter().map(|o| o.update_time.max(o.time)).max();
854 let latest_trade_event = trades.iter().map(|t| t.time).max();
855 let latest_event_ms = latest_order_event.max(latest_trade_event);
856
857 let mut trades_by_order_id: HashMap<u64, Vec<BinanceMyTrade>> = HashMap::new();
858 for trade in &trades {
859 trades_by_order_id
860 .entry(trade.order_id)
861 .or_default()
862 .push(trade.clone());
863 }
864 for bucket in trades_by_order_id.values_mut() {
865 bucket.sort_by_key(|t| t.time);
866 }
867
868 let mut order_source_by_id = HashMap::new();
869 for o in &orders {
870 order_source_by_id.insert(
871 o.order_id,
872 source_label_from_client_order_id(&o.client_order_id).to_string(),
873 );
874 }
875 for (order_id, source) in persisted_source_by_order_id {
876 order_source_by_id.entry(order_id).or_insert(source);
877 }
878 let mut strategy_stats =
879 compute_trade_stats_by_source(trades.clone(), &order_source_by_id, &self.symbol);
880 let persisted_stats = to_persistable_stats_map(&strategy_stats);
881 if let Err(e) = order_store::persist_strategy_symbol_stats(&storage_key, &persisted_stats) {
882 tracing::warn!(error = %e, "Failed to persist strategy+symbol scoped stats");
883 }
884 if strategy_stats.is_empty() {
885 match order_store::load_strategy_symbol_stats(&storage_key) {
886 Ok(persisted) => {
887 strategy_stats = from_persisted_stats_map(persisted);
888 }
889 Err(e) => {
890 tracing::warn!(error = %e, "Failed to load persisted strategy+symbol stats");
891 }
892 }
893 }
894
895 let mut history = Vec::new();
896 let mut fills = Vec::new();
897 let mut used_trade_ids = std::collections::HashSet::new();
898
899 if orders.is_empty() && !trades.is_empty() {
900 let mut sorted = trades;
901 sorted.sort_by_key(|t| (t.time, t.id));
902 history.extend(sorted.iter().map(|t| {
903 fills.push(OrderHistoryFill {
904 timestamp_ms: t.time,
905 side: if t.is_buyer {
906 OrderSide::Buy
907 } else {
908 OrderSide::Sell
909 },
910 price: t.price,
911 });
912 format_trade_history_row(
913 t,
914 order_source_by_id
915 .get(&t.order_id)
916 .map(String::as_str)
917 .unwrap_or("UNKNOWN"),
918 )
919 }));
920 return Ok(OrderHistorySnapshot {
921 rows: history,
922 stats,
923 strategy_stats,
924 fills,
925 open_qty: open_pos.qty,
926 open_entry_price: if open_pos.qty > f64::EPSILON {
927 open_pos.cost_quote / open_pos.qty
928 } else {
929 0.0
930 },
931 estimated_total_pnl_usdt,
932 trade_data_complete,
933 fetched_at_ms,
934 fetch_latency_ms,
935 latest_event_ms,
936 });
937 }
938
939 for o in orders {
940 if o.executed_qty > 0.0 {
941 if let Some(order_trades) = trades_by_order_id.get(&o.order_id) {
942 for t in order_trades {
943 used_trade_ids.insert(t.id);
944 let side = if t.is_buyer { "BUY" } else { "SELL" };
945 fills.push(OrderHistoryFill {
946 timestamp_ms: t.time,
947 side: if t.is_buyer {
948 OrderSide::Buy
949 } else {
950 OrderSide::Sell
951 },
952 price: t.price,
953 });
954 history.push(format_order_history_row(
955 t.time,
956 "FILLED",
957 side,
958 t.qty,
959 t.price,
960 &format!(
961 "{}#T{} [{}]",
962 o.client_order_id,
963 t.id,
964 source_label_from_client_order_id(&o.client_order_id)
965 ),
966 ));
967 }
968 continue;
969 }
970 }
971
972 let avg_price = if o.executed_qty > 0.0 {
973 o.cummulative_quote_qty / o.executed_qty
974 } else {
975 o.price
976 };
977 history.push(format_order_history_row(
978 o.update_time.max(o.time),
979 &o.status,
980 &o.side,
981 display_qty_for_history(&o.status, o.orig_qty, o.executed_qty),
982 avg_price,
983 &o.client_order_id,
984 ));
985 }
986
987 for bucket in trades_by_order_id.values() {
989 for t in bucket {
990 if !used_trade_ids.contains(&t.id) {
991 fills.push(OrderHistoryFill {
992 timestamp_ms: t.time,
993 side: if t.is_buyer {
994 OrderSide::Buy
995 } else {
996 OrderSide::Sell
997 },
998 price: t.price,
999 });
1000 history.push(format_trade_history_row(
1001 t,
1002 order_source_by_id
1003 .get(&t.order_id)
1004 .map(String::as_str)
1005 .unwrap_or("UNKNOWN"),
1006 ));
1007 }
1008 }
1009 }
1010 Ok(OrderHistorySnapshot {
1011 rows: history,
1012 stats,
1013 strategy_stats,
1014 fills,
1015 open_qty: open_pos.qty,
1016 open_entry_price: if open_pos.qty > f64::EPSILON {
1017 open_pos.cost_quote / open_pos.qty
1018 } else {
1019 0.0
1020 },
1021 estimated_total_pnl_usdt,
1022 trade_data_complete,
1023 fetched_at_ms,
1024 fetch_latency_ms,
1025 latest_event_ms,
1026 })
1027 }
1028
1029 pub async fn submit_order(
1054 &mut self,
1055 signal: Signal,
1056 source_tag: &str,
1057 ) -> Result<Option<OrderUpdate>> {
1058 let side = match &signal {
1059 Signal::Buy => OrderSide::Buy,
1060 Signal::Sell => OrderSide::Sell,
1061 Signal::Hold => return Ok(None),
1062 };
1063 let source_tag = source_tag.to_ascii_lowercase();
1064 let intent = OrderIntent {
1065 intent_id: format!("intent-{}", &uuid::Uuid::new_v4().to_string()[..8]),
1066 source_tag: source_tag.clone(),
1067 symbol: self.symbol.clone(),
1068 market: self.market,
1069 side,
1070 order_amount_usdt: self.order_amount_usdt,
1071 last_price: self.last_price,
1072 created_at_ms: chrono::Utc::now().timestamp_millis() as u64,
1073 };
1074 if let Some((reason_code, reason)) =
1075 self.evaluate_strategy_limits(&intent.source_tag, intent.created_at_ms)
1076 {
1077 return Ok(Some(OrderUpdate::Rejected {
1078 intent_id: intent.intent_id.clone(),
1079 client_order_id: "n/a".to_string(),
1080 reason_code,
1081 reason,
1082 }));
1083 }
1084 let decision = self
1085 .risk_module
1086 .evaluate_intent(&intent, &self.balances)
1087 .await?;
1088 if !decision.approved {
1089 return Ok(Some(OrderUpdate::Rejected {
1090 intent_id: intent.intent_id.clone(),
1091 client_order_id: "n/a".to_string(),
1092 reason_code: decision
1093 .reason_code
1094 .unwrap_or_else(|| RejectionReasonCode::RiskUnknown.as_str().to_string()),
1095 reason: decision
1096 .reason
1097 .unwrap_or_else(|| "Rejected by RiskModule".to_string()),
1098 }));
1099 }
1100 if !self.risk_module.reserve_rate_budget() {
1101 return Ok(Some(OrderUpdate::Rejected {
1102 intent_id: intent.intent_id.clone(),
1103 client_order_id: "n/a".to_string(),
1104 reason_code: RejectionReasonCode::RateGlobalBudgetExceeded
1105 .as_str()
1106 .to_string(),
1107 reason: "Global rate budget exceeded; try again after reset".to_string(),
1108 }));
1109 }
1110 if !self
1111 .risk_module
1112 .reserve_endpoint_budget(ApiEndpointGroup::Orders)
1113 {
1114 return Ok(Some(OrderUpdate::Rejected {
1115 intent_id: intent.intent_id.clone(),
1116 client_order_id: "n/a".to_string(),
1117 reason_code: RejectionReasonCode::RateEndpointBudgetExceeded
1118 .as_str()
1119 .to_string(),
1120 reason: "Orders endpoint budget exceeded; try again after reset".to_string(),
1121 }));
1122 }
1123 let qty = decision.normalized_qty;
1124 if let Some((reason_code, reason)) = self.evaluate_symbol_exposure_limit(side, qty) {
1125 return Ok(Some(OrderUpdate::Rejected {
1126 intent_id: intent.intent_id.clone(),
1127 client_order_id: "n/a".to_string(),
1128 reason_code,
1129 reason,
1130 }));
1131 }
1132 self.mark_strategy_submit(&intent.source_tag, intent.created_at_ms);
1133
1134 let client_order_id = format!(
1135 "sq-{}-{}",
1136 intent.source_tag,
1137 &uuid::Uuid::new_v4().to_string()[..8]
1138 );
1139
1140 let order = Order {
1141 client_order_id: client_order_id.clone(),
1142 server_order_id: None,
1143 symbol: self.symbol.clone(),
1144 side,
1145 order_type: OrderType::Market,
1146 quantity: qty,
1147 price: None,
1148 status: OrderStatus::PendingSubmit,
1149 created_at: chrono::Utc::now(),
1150 updated_at: chrono::Utc::now(),
1151 fills: vec![],
1152 };
1153
1154 self.active_orders.insert(client_order_id.clone(), order);
1155
1156 tracing::info!(
1157 side = %side,
1158 qty,
1159 usdt_amount = intent.order_amount_usdt,
1160 price = intent.last_price,
1161 intent_id = %intent.intent_id,
1162 created_at_ms = intent.created_at_ms,
1163 "Submitting order"
1164 );
1165
1166 let submit_res = if self.market == MarketKind::Futures {
1167 self.rest_client
1168 .place_futures_market_order(&self.symbol, side, qty, &client_order_id)
1169 .await
1170 } else {
1171 self.rest_client
1172 .place_market_order(&self.symbol, side, qty, &client_order_id)
1173 .await
1174 };
1175
1176 match submit_res {
1177 Ok(response) => {
1178 let update = self.process_order_response(
1179 &intent.intent_id,
1180 &client_order_id,
1181 side,
1182 &response,
1183 );
1184
1185 if matches!(update, OrderUpdate::Filled { .. }) {
1187 if let Err(e) = self.refresh_balances().await {
1188 tracing::warn!(error = %e, "Failed to refresh balances after fill");
1189 }
1190 }
1191
1192 Ok(Some(update))
1193 }
1194 Err(e) => {
1195 tracing::error!(
1196 client_order_id,
1197 error = %e,
1198 "Order rejected"
1199 );
1200 if let Some(order) = self.active_orders.get_mut(&client_order_id) {
1201 order.status = OrderStatus::Rejected;
1202 order.updated_at = chrono::Utc::now();
1203 }
1204 Ok(Some(OrderUpdate::Rejected {
1205 intent_id: intent.intent_id.clone(),
1206 client_order_id,
1207 reason_code: RejectionReasonCode::BrokerSubmitFailed.as_str().to_string(),
1208 reason: e.to_string(),
1209 }))
1210 }
1211 }
1212 }
1213
1214 fn process_order_response(
1215 &mut self,
1216 intent_id: &str,
1217 client_order_id: &str,
1218 side: OrderSide,
1219 response: &BinanceOrderResponse,
1220 ) -> OrderUpdate {
1221 let fills: Vec<Fill> = response
1222 .fills
1223 .iter()
1224 .map(|f| Fill {
1225 price: f.price,
1226 qty: f.qty,
1227 commission: f.commission,
1228 commission_asset: f.commission_asset.clone(),
1229 })
1230 .collect();
1231
1232 let status = OrderStatus::from_binance_str(&response.status);
1233
1234 if let Some(order) = self.active_orders.get_mut(client_order_id) {
1235 order.server_order_id = Some(response.order_id);
1236 order.status = status;
1237 order.fills = fills.clone();
1238 order.updated_at = chrono::Utc::now();
1239 }
1240
1241 if status == OrderStatus::Filled || status == OrderStatus::PartiallyFilled {
1242 self.position.apply_fill(side, &fills);
1243
1244 let avg_price = if fills.is_empty() {
1245 0.0
1246 } else {
1247 let total_value: f64 = fills.iter().map(|f| f.price * f.qty).sum();
1248 let total_qty: f64 = fills.iter().map(|f| f.qty).sum();
1249 total_value / total_qty
1250 };
1251
1252 tracing::info!(
1253 client_order_id,
1254 order_id = response.order_id,
1255 side = %side,
1256 avg_price,
1257 filled_qty = response.executed_qty,
1258 "Order filled"
1259 );
1260
1261 OrderUpdate::Filled {
1262 intent_id: intent_id.to_string(),
1263 client_order_id: client_order_id.to_string(),
1264 side,
1265 fills,
1266 avg_price,
1267 }
1268 } else {
1269 OrderUpdate::Submitted {
1270 intent_id: intent_id.to_string(),
1271 client_order_id: client_order_id.to_string(),
1272 server_order_id: response.order_id,
1273 }
1274 }
1275 }
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280 use super::{display_qty_for_history, split_symbol_assets, OrderManager};
1281 use crate::binance::rest::BinanceRestClient;
1282 use crate::config::{EndpointRateLimitConfig, RiskConfig, SymbolExposureLimitConfig};
1283 use crate::model::order::{Order, OrderSide, OrderStatus, OrderType};
1284 use std::sync::Arc;
1285
1286 fn build_test_order_manager() -> OrderManager {
1287 let rest = Arc::new(BinanceRestClient::new(
1288 "https://demo-api.binance.com",
1289 "https://demo-fapi.binance.com",
1290 "k",
1291 "s",
1292 "fk",
1293 "fs",
1294 5000,
1295 ));
1296 let risk = RiskConfig {
1297 global_rate_limit_per_minute: 600,
1298 default_strategy_cooldown_ms: 3_000,
1299 default_strategy_max_active_orders: 1,
1300 default_symbol_max_exposure_usdt: 200.0,
1301 strategy_limits: vec![],
1302 symbol_exposure_limits: vec![SymbolExposureLimitConfig {
1303 symbol: "BTCUSDT".to_string(),
1304 market: Some("spot".to_string()),
1305 max_exposure_usdt: 150.0,
1306 }],
1307 endpoint_rate_limits: EndpointRateLimitConfig {
1308 orders_per_minute: 240,
1309 account_per_minute: 180,
1310 market_data_per_minute: 360,
1311 },
1312 };
1313 OrderManager::new(
1314 rest,
1315 "BTCUSDT",
1316 crate::order_manager::MarketKind::Spot,
1317 10.0,
1318 &risk,
1319 )
1320 }
1321
1322 #[test]
1323 fn valid_state_transitions() {
1324 let from = OrderStatus::PendingSubmit;
1326 let to = OrderStatus::Submitted;
1327 assert!(!from.is_terminal());
1328 assert!(!to.is_terminal());
1329
1330 let to = OrderStatus::Filled;
1332 assert!(to.is_terminal());
1333
1334 let to = OrderStatus::Rejected;
1336 assert!(to.is_terminal());
1337
1338 let to = OrderStatus::Cancelled;
1340 assert!(to.is_terminal());
1341 }
1342
1343 #[test]
1344 fn from_binance_str_mapping() {
1345 assert_eq!(OrderStatus::from_binance_str("NEW"), OrderStatus::Submitted);
1346 assert_eq!(OrderStatus::from_binance_str("FILLED"), OrderStatus::Filled);
1347 assert_eq!(
1348 OrderStatus::from_binance_str("CANCELED"),
1349 OrderStatus::Cancelled
1350 );
1351 assert_eq!(
1352 OrderStatus::from_binance_str("REJECTED"),
1353 OrderStatus::Rejected
1354 );
1355 assert_eq!(
1356 OrderStatus::from_binance_str("EXPIRED"),
1357 OrderStatus::Expired
1358 );
1359 assert_eq!(
1360 OrderStatus::from_binance_str("PARTIALLY_FILLED"),
1361 OrderStatus::PartiallyFilled
1362 );
1363 }
1364
1365 #[test]
1366 fn order_history_uses_executed_qty_for_filled_states() {
1367 assert!((display_qty_for_history("FILLED", 1.0, 0.4) - 0.4).abs() < f64::EPSILON);
1368 assert!((display_qty_for_history("PARTIALLY_FILLED", 1.0, 0.4) - 0.4).abs() < f64::EPSILON);
1369 }
1370
1371 #[test]
1372 fn order_history_uses_orig_qty_for_non_filled_states() {
1373 assert!((display_qty_for_history("NEW", 1.0, 0.4) - 1.0).abs() < f64::EPSILON);
1374 assert!((display_qty_for_history("CANCELED", 1.0, 0.4) - 1.0).abs() < f64::EPSILON);
1375 assert!((display_qty_for_history("REJECTED", 1.0, 0.0) - 1.0).abs() < f64::EPSILON);
1376 }
1377
1378 #[test]
1379 fn split_symbol_assets_parses_known_quote_suffixes() {
1380 assert_eq!(
1381 split_symbol_assets("ETHUSDT"),
1382 ("ETH".to_string(), "USDT".to_string())
1383 );
1384 assert_eq!(
1385 split_symbol_assets("ETHBTC"),
1386 ("ETH".to_string(), "BTC".to_string())
1387 );
1388 }
1389
1390 #[test]
1391 fn split_symbol_assets_falls_back_when_quote_unknown() {
1392 assert_eq!(
1393 split_symbol_assets("FOOBAR"),
1394 ("FOOBAR".to_string(), String::new())
1395 );
1396 }
1397
1398 #[test]
1399 fn strategy_limit_rejects_when_active_orders_reach_limit() {
1400 let mut mgr = build_test_order_manager();
1401 let client_order_id = "sq-cfg-abcdef12".to_string();
1402 mgr.active_orders.insert(
1403 client_order_id.clone(),
1404 Order {
1405 client_order_id,
1406 server_order_id: None,
1407 symbol: "BTCUSDT".to_string(),
1408 side: OrderSide::Buy,
1409 order_type: OrderType::Market,
1410 quantity: 0.1,
1411 price: None,
1412 status: OrderStatus::Submitted,
1413 created_at: chrono::Utc::now(),
1414 updated_at: chrono::Utc::now(),
1415 fills: vec![],
1416 },
1417 );
1418
1419 let rejected = mgr
1420 .evaluate_strategy_limits("cfg", chrono::Utc::now().timestamp_millis() as u64)
1421 .expect("must be rejected");
1422 assert_eq!(
1423 rejected.0,
1424 "risk.strategy_max_active_orders_exceeded".to_string()
1425 );
1426 }
1427
1428 #[test]
1429 fn strategy_limit_rejects_during_cooldown_window() {
1430 let mut mgr = build_test_order_manager();
1431 let now = chrono::Utc::now().timestamp_millis() as u64;
1432 mgr.mark_strategy_submit("cfg", now);
1433
1434 let rejected = mgr
1435 .evaluate_strategy_limits("cfg", now + 500)
1436 .expect("must be rejected");
1437 assert_eq!(rejected.0, "risk.strategy_cooldown_active".to_string());
1438 }
1439
1440 #[test]
1441 fn symbol_exposure_limit_rejects_when_projected_notional_exceeds_limit() {
1442 let mut mgr = build_test_order_manager();
1443 mgr.last_price = 100.0;
1444 let rejected = mgr
1446 .evaluate_symbol_exposure_limit(OrderSide::Buy, 2.0)
1447 .expect("must be rejected");
1448 assert_eq!(rejected.0, "risk.symbol_exposure_limit_exceeded".to_string());
1449 }
1450
1451 #[test]
1452 fn symbol_exposure_limit_allows_risk_reducing_order() {
1453 let mut mgr = build_test_order_manager();
1454 mgr.last_price = 100.0;
1455 mgr.position.side = Some(OrderSide::Buy);
1456 mgr.position.qty = 2.0; let rejected = mgr.evaluate_symbol_exposure_limit(OrderSide::Sell, 1.0);
1460 assert!(rejected.is_none());
1461 }
1462}