1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Instant;
4
5use anyhow::Result;
6use chrono::TimeZone;
7
8use crate::binance::rest::BinanceRestClient;
9use crate::binance::types::{BinanceMyTrade, BinanceOrderResponse};
10use crate::config::RiskConfig;
11use crate::model::order::{Fill, Order, OrderSide, OrderStatus, OrderType};
12use crate::model::position::Position;
13use crate::model::signal::Signal;
14use crate::order_store;
15use crate::risk_module::{
16 ApiEndpointGroup, EndpointRateLimits, OrderIntent, RateBudgetSnapshot, RejectionReasonCode,
17 RiskModule,
18};
19
20pub use crate::risk_module::MarketKind;
21
22#[derive(Debug, Clone)]
23pub enum OrderUpdate {
24 Submitted {
25 intent_id: String,
26 client_order_id: String,
27 server_order_id: u64,
28 },
29 Filled {
30 intent_id: String,
31 client_order_id: String,
32 side: OrderSide,
33 fills: Vec<Fill>,
34 avg_price: f64,
35 },
36 Rejected {
37 intent_id: String,
38 client_order_id: String,
39 reason_code: String,
40 reason: String,
41 },
42}
43
44#[derive(Debug, Clone, Default)]
45pub struct OrderHistoryStats {
46 pub trade_count: u32,
47 pub win_count: u32,
48 pub lose_count: u32,
49 pub realized_pnl: f64,
50}
51
52#[derive(Debug, Clone, Default)]
53pub struct OrderHistorySnapshot {
54 pub rows: Vec<String>,
55 pub stats: OrderHistoryStats,
56 pub strategy_stats: HashMap<String, OrderHistoryStats>,
57 pub fills: Vec<OrderHistoryFill>,
58 pub open_qty: f64,
59 pub open_entry_price: f64,
60 pub estimated_total_pnl_usdt: Option<f64>,
61 pub trade_data_complete: bool,
62 pub fetched_at_ms: u64,
63 pub fetch_latency_ms: u64,
64 pub latest_event_ms: Option<u64>,
65}
66
67#[derive(Debug, Clone)]
68pub struct OrderHistoryFill {
69 pub timestamp_ms: u64,
70 pub side: OrderSide,
71 pub price: f64,
72}
73
74pub struct OrderManager {
75 rest_client: Arc<BinanceRestClient>,
76 active_orders: HashMap<String, Order>,
77 position: Position,
78 symbol: String,
79 market: MarketKind,
80 order_amount_usdt: f64,
81 balances: HashMap<String, f64>,
82 last_price: f64,
83 risk_module: RiskModule,
84 default_strategy_cooldown_ms: u64,
85 default_strategy_max_active_orders: u32,
86 strategy_limits_by_tag: HashMap<String, StrategyExecutionLimit>,
87 last_strategy_submit_ms: HashMap<String, u64>,
88 default_symbol_max_exposure_usdt: f64,
89 symbol_exposure_limit_by_key: HashMap<String, f64>,
90}
91
92#[derive(Debug, Clone, Copy)]
93struct StrategyExecutionLimit {
94 cooldown_ms: u64,
95 max_active_orders: u32,
96}
97
98fn normalize_market_label(market: MarketKind) -> &'static str {
99 match market {
100 MarketKind::Spot => "spot",
101 MarketKind::Futures => "futures",
102 }
103}
104
105fn symbol_limit_key(symbol: &str, market: MarketKind) -> String {
106 format!(
107 "{}:{}",
108 symbol.trim().to_ascii_uppercase(),
109 normalize_market_label(market)
110 )
111}
112
113fn storage_symbol(symbol: &str, market: MarketKind) -> String {
114 match market {
115 MarketKind::Spot => symbol.to_string(),
116 MarketKind::Futures => format!("{}#FUT", symbol),
117 }
118}
119
120fn display_qty_for_history(status: &str, orig_qty: f64, executed_qty: f64) -> f64 {
121 match status {
122 "FILLED" | "PARTIALLY_FILLED" => executed_qty,
123 _ => orig_qty,
124 }
125}
126
127fn format_history_time(timestamp_ms: u64) -> String {
128 chrono::Utc
129 .timestamp_millis_opt(timestamp_ms as i64)
130 .single()
131 .map(|dt| {
132 dt.with_timezone(&chrono::Local)
133 .format("%H:%M:%S")
134 .to_string()
135 })
136 .unwrap_or_else(|| "--:--:--".to_string())
137}
138
139fn format_order_history_row(
140 timestamp_ms: u64,
141 status: &str,
142 side: &str,
143 qty: f64,
144 avg_price: f64,
145 client_order_id: &str,
146) -> String {
147 format!(
148 "{} {:<10} {:<4} {:.5} @ {:.2} {}",
149 format_history_time(timestamp_ms),
150 status,
151 side,
152 qty,
153 avg_price,
154 client_order_id
155 )
156}
157
158fn source_label_from_client_order_id(client_order_id: &str) -> String {
159 if client_order_id.contains("-mnl-") {
160 "MANUAL".to_string()
161 } else if client_order_id.contains("-cfg-") {
162 "MA(Config)".to_string()
163 } else if client_order_id.contains("-fst-") {
164 "MA(Fast 5/20)".to_string()
165 } else if client_order_id.contains("-slw-") {
166 "MA(Slow 20/60)".to_string()
167 } else if let Some(source_tag) = parse_source_tag_from_client_order_id(client_order_id) {
168 source_tag.to_ascii_lowercase()
169 } else {
170 "UNKNOWN".to_string()
171 }
172}
173
174fn parse_source_tag_from_client_order_id(client_order_id: &str) -> Option<&str> {
175 let body = client_order_id.strip_prefix("sq-")?;
176 let (source_tag, _) = body.split_once('-')?;
177 if source_tag.is_empty() {
178 None
179 } else {
180 Some(source_tag)
181 }
182}
183
184fn format_trade_history_row(t: &BinanceMyTrade, source: &str) -> String {
185 let side = if t.is_buyer { "BUY" } else { "SELL" };
186 format_order_history_row(
187 t.time,
188 "FILLED",
189 side,
190 t.qty,
191 t.price,
192 &format!("order#{}#T{} [{}]", t.order_id, t.id, source),
193 )
194}
195
196fn split_symbol_assets(symbol: &str) -> (String, String) {
197 const QUOTE_SUFFIXES: [&str; 10] = [
198 "USDT", "USDC", "FDUSD", "BUSD", "TUSD", "TRY", "EUR", "BTC", "ETH", "BNB",
199 ];
200 for q in QUOTE_SUFFIXES {
201 if let Some(base) = symbol.strip_suffix(q) {
202 if !base.is_empty() {
203 return (base.to_string(), q.to_string());
204 }
205 }
206 }
207 (symbol.to_string(), String::new())
208}
209
210#[derive(Clone, Copy, Default)]
211struct LongPos {
212 qty: f64,
213 cost_quote: f64,
214}
215
216fn apply_spot_trade_with_fee(
217 pos: &mut LongPos,
218 stats: &mut OrderHistoryStats,
219 t: &BinanceMyTrade,
220 base_asset: &str,
221 quote_asset: &str,
222) {
223 let qty = t.qty.max(0.0);
224 if qty <= f64::EPSILON {
225 return;
226 }
227 let fee_asset = t.commission_asset.as_str();
228 let fee_is_base = !base_asset.is_empty() && fee_asset.eq_ignore_ascii_case(base_asset);
229 let fee_is_quote = !quote_asset.is_empty() && fee_asset.eq_ignore_ascii_case(quote_asset);
230
231 if t.is_buyer {
232 let net_qty = (qty
233 - if fee_is_base {
234 t.commission.max(0.0)
235 } else {
236 0.0
237 })
238 .max(0.0);
239 if net_qty <= f64::EPSILON {
240 return;
241 }
242 let fee_quote = if fee_is_quote {
243 t.commission.max(0.0)
244 } else {
245 0.0
246 };
247 pos.qty += net_qty;
248 pos.cost_quote += qty * t.price + fee_quote;
249 return;
250 }
251
252 if pos.qty <= f64::EPSILON {
254 return;
255 }
256 let close_qty = qty.min(pos.qty);
257 if close_qty <= f64::EPSILON {
258 return;
259 }
260 let avg_cost = pos.cost_quote / pos.qty.max(f64::EPSILON);
261 let fee_quote_total = if fee_is_quote {
262 t.commission.max(0.0)
263 } else if fee_is_base {
264 t.commission.max(0.0) * t.price
266 } else {
267 0.0
268 };
269 let fee_quote = fee_quote_total * (close_qty / qty.max(f64::EPSILON));
270 let pnl_delta = (close_qty * t.price - fee_quote) - (avg_cost * close_qty);
271 if pnl_delta > 0.0 {
272 stats.win_count += 1;
273 stats.trade_count += 1;
274 } else if pnl_delta < 0.0 {
275 stats.lose_count += 1;
276 stats.trade_count += 1;
277 }
278 stats.realized_pnl += pnl_delta;
279
280 pos.qty -= close_qty;
281 pos.cost_quote -= avg_cost * close_qty;
282 if pos.qty <= f64::EPSILON {
283 pos.qty = 0.0;
284 pos.cost_quote = 0.0;
285 }
286}
287
288fn compute_trade_state(
289 mut trades: Vec<BinanceMyTrade>,
290 symbol: &str,
291) -> (OrderHistoryStats, LongPos) {
292 trades.sort_by_key(|t| (t.time, t.id));
293 let (base_asset, quote_asset) = split_symbol_assets(symbol);
294 let mut pos = LongPos::default();
295 let mut stats = OrderHistoryStats::default();
296 for t in trades {
297 apply_spot_trade_with_fee(&mut pos, &mut stats, &t, &base_asset, "e_asset);
298 }
299 (stats, pos)
300}
301
302fn compute_trade_stats_by_source(
303 mut trades: Vec<BinanceMyTrade>,
304 order_source_by_id: &HashMap<u64, String>,
305 symbol: &str,
306) -> HashMap<String, OrderHistoryStats> {
307 trades.sort_by_key(|t| (t.time, t.id));
308 let (base_asset, quote_asset) = split_symbol_assets(symbol);
309 let mut pos_by_source: HashMap<String, LongPos> = HashMap::new();
310 let mut stats_by_source: HashMap<String, OrderHistoryStats> = HashMap::new();
311
312 for t in trades {
313 let source = order_source_by_id
314 .get(&t.order_id)
315 .cloned()
316 .unwrap_or_else(|| "UNKNOWN".to_string());
317 let pos = pos_by_source.entry(source.clone()).or_default();
318 let stats = stats_by_source.entry(source).or_default();
319 apply_spot_trade_with_fee(pos, stats, &t, &base_asset, "e_asset);
320 }
321
322 stats_by_source
323}
324
325fn to_persistable_stats_map(
326 strategy_stats: &HashMap<String, OrderHistoryStats>,
327) -> HashMap<String, order_store::StrategyScopedStats> {
328 strategy_stats
329 .iter()
330 .map(|(k, v)| {
331 (
332 k.clone(),
333 order_store::StrategyScopedStats {
334 trade_count: v.trade_count,
335 win_count: v.win_count,
336 lose_count: v.lose_count,
337 realized_pnl: v.realized_pnl,
338 },
339 )
340 })
341 .collect()
342}
343
344fn from_persisted_stats_map(
345 persisted: HashMap<String, order_store::StrategyScopedStats>,
346) -> HashMap<String, OrderHistoryStats> {
347 persisted
348 .into_iter()
349 .map(|(k, v)| {
350 (
351 k,
352 OrderHistoryStats {
353 trade_count: v.trade_count,
354 win_count: v.win_count,
355 lose_count: v.lose_count,
356 realized_pnl: v.realized_pnl,
357 },
358 )
359 })
360 .collect()
361}
362
363impl OrderManager {
364 pub fn new(
373 rest_client: Arc<BinanceRestClient>,
374 symbol: &str,
375 market: MarketKind,
376 order_amount_usdt: f64,
377 risk_config: &RiskConfig,
378 ) -> Self {
379 let mut strategy_limits_by_tag = HashMap::new();
380 let mut symbol_exposure_limit_by_key = HashMap::new();
381 let default_strategy_cooldown_ms = risk_config.default_strategy_cooldown_ms;
382 let default_strategy_max_active_orders = risk_config.default_strategy_max_active_orders.max(1);
383 let default_symbol_max_exposure_usdt = risk_config.default_symbol_max_exposure_usdt.max(0.0);
384 for profile in &risk_config.strategy_limits {
385 let source_tag = profile.source_tag.trim().to_ascii_lowercase();
386 if source_tag.is_empty() {
387 continue;
388 }
389 strategy_limits_by_tag.insert(
390 source_tag,
391 StrategyExecutionLimit {
392 cooldown_ms: profile
393 .cooldown_ms
394 .unwrap_or(default_strategy_cooldown_ms),
395 max_active_orders: profile
396 .max_active_orders
397 .unwrap_or(default_strategy_max_active_orders)
398 .max(1),
399 },
400 );
401 }
402 for limit in &risk_config.symbol_exposure_limits {
403 let symbol = limit.symbol.trim().to_ascii_uppercase();
404 if symbol.is_empty() {
405 continue;
406 }
407 let market = match limit
408 .market
409 .as_deref()
410 .unwrap_or("spot")
411 .trim()
412 .to_ascii_lowercase()
413 .as_str()
414 {
415 "spot" => MarketKind::Spot,
416 "futures" | "future" | "fut" => MarketKind::Futures,
417 _ => continue,
418 };
419 symbol_exposure_limit_by_key.insert(
420 symbol_limit_key(&symbol, market),
421 limit.max_exposure_usdt.max(0.0),
422 );
423 }
424 Self {
425 rest_client: rest_client.clone(),
426 active_orders: HashMap::new(),
427 position: Position::new(symbol.to_string()),
428 symbol: symbol.to_string(),
429 market,
430 order_amount_usdt,
431 balances: HashMap::new(),
432 last_price: 0.0,
433 risk_module: RiskModule::new(
434 rest_client.clone(),
435 risk_config.global_rate_limit_per_minute,
436 EndpointRateLimits {
437 orders_per_minute: risk_config.endpoint_rate_limits.orders_per_minute,
438 account_per_minute: risk_config.endpoint_rate_limits.account_per_minute,
439 market_data_per_minute: risk_config
440 .endpoint_rate_limits
441 .market_data_per_minute,
442 },
443 ),
444 default_strategy_cooldown_ms,
445 default_strategy_max_active_orders,
446 strategy_limits_by_tag,
447 last_strategy_submit_ms: HashMap::new(),
448 default_symbol_max_exposure_usdt,
449 symbol_exposure_limit_by_key,
450 }
451 }
452
453 pub fn position(&self) -> &Position {
458 &self.position
459 }
460
461 pub fn balances(&self) -> &HashMap<String, f64> {
466 &self.balances
467 }
468
469 pub fn update_unrealized_pnl(&mut self, current_price: f64) {
475 self.last_price = current_price;
476 self.position.update_unrealized_pnl(current_price);
477 }
478
479 pub fn rate_budget_snapshot(&self) -> RateBudgetSnapshot {
483 self.risk_module.rate_budget_snapshot()
484 }
485
486 pub fn orders_rate_budget_snapshot(&self) -> RateBudgetSnapshot {
487 self.risk_module
488 .endpoint_budget_snapshot(ApiEndpointGroup::Orders)
489 }
490
491 pub fn account_rate_budget_snapshot(&self) -> RateBudgetSnapshot {
492 self.risk_module
493 .endpoint_budget_snapshot(ApiEndpointGroup::Account)
494 }
495
496 pub fn market_data_rate_budget_snapshot(&self) -> RateBudgetSnapshot {
497 self.risk_module
498 .endpoint_budget_snapshot(ApiEndpointGroup::MarketData)
499 }
500
501 fn strategy_limits_for(&self, source_tag: &str) -> StrategyExecutionLimit {
502 self.strategy_limits_by_tag
503 .get(source_tag)
504 .copied()
505 .unwrap_or(StrategyExecutionLimit {
506 cooldown_ms: self.default_strategy_cooldown_ms,
507 max_active_orders: self.default_strategy_max_active_orders,
508 })
509 }
510
511 fn active_order_count_for_source(&self, source_tag: &str) -> u32 {
512 let prefix = format!("sq-{}-", source_tag);
513 self.active_orders
514 .values()
515 .filter(|o| !o.status.is_terminal() && o.client_order_id.starts_with(&prefix))
516 .count() as u32
517 }
518
519 fn evaluate_strategy_limits(
520 &self,
521 source_tag: &str,
522 created_at_ms: u64,
523 ) -> Option<(String, String)> {
524 let limits = self.strategy_limits_for(source_tag);
525 let active_count = self.active_order_count_for_source(source_tag);
526 if active_count >= limits.max_active_orders {
527 return Some((
528 RejectionReasonCode::RiskStrategyMaxActiveOrdersExceeded
529 .as_str()
530 .to_string(),
531 format!(
532 "Strategy '{}' active order limit exceeded (active {}, limit {})",
533 source_tag, active_count, limits.max_active_orders
534 ),
535 ));
536 }
537
538 if limits.cooldown_ms > 0 {
539 if let Some(last_submit_ms) = self.last_strategy_submit_ms.get(source_tag) {
540 let elapsed = created_at_ms.saturating_sub(*last_submit_ms);
541 if elapsed < limits.cooldown_ms {
542 let remaining = limits.cooldown_ms - elapsed;
543 return Some((
544 RejectionReasonCode::RiskStrategyCooldownActive
545 .as_str()
546 .to_string(),
547 format!(
548 "Strategy '{}' cooldown active ({}ms remaining)",
549 source_tag, remaining
550 ),
551 ));
552 }
553 }
554 }
555
556 None
557 }
558
559 fn mark_strategy_submit(&mut self, source_tag: &str, created_at_ms: u64) {
560 self.last_strategy_submit_ms
561 .insert(source_tag.to_string(), created_at_ms);
562 }
563
564 fn max_symbol_exposure_usdt(&self) -> f64 {
565 self.symbol_exposure_limit_by_key
566 .get(&symbol_limit_key(&self.symbol, self.market))
567 .copied()
568 .unwrap_or(self.default_symbol_max_exposure_usdt)
569 }
570
571 fn projected_notional_after_fill(&self, side: OrderSide, qty: f64) -> (f64, f64) {
572 let price = self.last_price.max(0.0);
573 if price <= f64::EPSILON {
574 return (0.0, 0.0);
575 }
576 let current_qty_signed = match self.position.side {
577 Some(OrderSide::Buy) => self.position.qty,
578 Some(OrderSide::Sell) => -self.position.qty,
579 None => 0.0,
580 };
581 let delta = match side {
582 OrderSide::Buy => qty,
583 OrderSide::Sell => -qty,
584 };
585 let projected_qty_signed = current_qty_signed + delta;
586 (
587 current_qty_signed.abs() * price,
588 projected_qty_signed.abs() * price,
589 )
590 }
591
592 fn evaluate_symbol_exposure_limit(&self, side: OrderSide, qty: f64) -> Option<(String, String)> {
593 let max_exposure = self.max_symbol_exposure_usdt();
594 if max_exposure <= f64::EPSILON {
595 return None;
596 }
597 let (current_notional, projected_notional) = self.projected_notional_after_fill(side, qty);
598 if projected_notional > max_exposure && projected_notional > current_notional + f64::EPSILON {
599 return Some((
600 RejectionReasonCode::RiskSymbolExposureLimitExceeded
601 .as_str()
602 .to_string(),
603 format!(
604 "Symbol exposure limit exceeded for {} ({:?}): projected {:.2} USDT > limit {:.2} USDT",
605 self.symbol, self.market, projected_notional, max_exposure
606 ),
607 ));
608 }
609 None
610 }
611
612 pub fn would_exceed_symbol_exposure_limit(&self, side: OrderSide, qty: f64) -> bool {
616 self.evaluate_symbol_exposure_limit(side, qty).is_some()
617 }
618
619 pub async fn refresh_balances(&mut self) -> Result<HashMap<String, f64>> {
631 if !self
632 .risk_module
633 .reserve_endpoint_budget(ApiEndpointGroup::Account)
634 {
635 return Err(anyhow::anyhow!(
636 "Account endpoint budget exceeded; try again after reset"
637 ));
638 }
639 if self.market == MarketKind::Futures {
640 let account = self.rest_client.get_futures_account().await?;
641 self.balances.clear();
642 for a in &account.assets {
643 if a.wallet_balance.abs() > f64::EPSILON {
644 self.balances.insert(a.asset.clone(), a.available_balance);
645 }
646 }
647 return Ok(self.balances.clone());
648 }
649 let account = self.rest_client.get_account().await?;
650 self.balances.clear();
651 for b in &account.balances {
652 let total = b.free + b.locked;
653 if total > 0.0 {
654 self.balances.insert(b.asset.clone(), b.free);
655 }
656 }
657 tracing::info!(balances = ?self.balances, "Balances refreshed");
658 Ok(self.balances.clone())
659 }
660
661 pub async fn refresh_order_history(&mut self, limit: usize) -> Result<OrderHistorySnapshot> {
670 if !self
671 .risk_module
672 .reserve_endpoint_budget(ApiEndpointGroup::Orders)
673 {
674 return Err(anyhow::anyhow!(
675 "Orders endpoint budget exceeded; try again after reset"
676 ));
677 }
678 if self.market == MarketKind::Futures {
679 let fetch_started = Instant::now();
680 let fetched_at_ms = chrono::Utc::now().timestamp_millis() as u64;
681 let orders_result = self
682 .rest_client
683 .get_futures_all_orders(&self.symbol, limit)
684 .await;
685 let trades_result = self
686 .rest_client
687 .get_futures_my_trades_history(&self.symbol, limit.max(1))
688 .await;
689 let fetch_latency_ms = fetch_started.elapsed().as_millis() as u64;
690
691 if orders_result.is_err() && trades_result.is_err() {
692 let oe = orders_result.err().unwrap();
693 let te = trades_result.err().unwrap();
694 return Err(anyhow::anyhow!(
695 "futures order history fetch failed: allOrders={} | userTrades={}",
696 oe,
697 te
698 ));
699 }
700
701 let mut orders = orders_result.unwrap_or_default();
702 let trades = trades_result.unwrap_or_default();
703 orders.sort_by_key(|o| o.update_time.max(o.time));
704
705 let storage_key = storage_symbol(&self.symbol, self.market);
706 if let Err(e) = order_store::persist_order_snapshot(&storage_key, &orders, &trades) {
707 tracing::warn!(error = %e, "Failed to persist futures order snapshot to sqlite");
708 }
709
710 let mut history = Vec::new();
711 let mut fills = Vec::new();
712 for t in &trades {
713 let side = if t.is_buyer { "BUY" } else { "SELL" };
714 fills.push(OrderHistoryFill {
715 timestamp_ms: t.time,
716 side: if t.is_buyer {
717 OrderSide::Buy
718 } else {
719 OrderSide::Sell
720 },
721 price: t.price,
722 });
723 history.push(format_order_history_row(
724 t.time,
725 "FILLED",
726 side,
727 t.qty,
728 t.price,
729 &format!("order#{}#T{} [FUT]", t.order_id, t.id),
730 ));
731 }
732 for o in &orders {
733 if o.executed_qty <= 0.0 {
734 history.push(format_order_history_row(
735 o.update_time.max(o.time),
736 &o.status,
737 &o.side,
738 display_qty_for_history(&o.status, o.orig_qty, o.executed_qty),
739 if o.executed_qty > 0.0 {
740 o.cummulative_quote_qty / o.executed_qty
741 } else {
742 o.price
743 },
744 &o.client_order_id,
745 ));
746 }
747 }
748
749 let mut stats = OrderHistoryStats::default();
750 for t in &trades {
751 if t.realized_pnl > 0.0 {
752 stats.win_count += 1;
753 stats.trade_count += 1;
754 } else if t.realized_pnl < 0.0 {
755 stats.lose_count += 1;
756 stats.trade_count += 1;
757 }
758 stats.realized_pnl += t.realized_pnl;
759 }
760 let estimated_total_pnl_usdt = Some(stats.realized_pnl);
761 let latest_order_event = orders.iter().map(|o| o.update_time.max(o.time)).max();
762 let latest_trade_event = trades.iter().map(|t| t.time).max();
763 let strategy_stats = match order_store::load_strategy_symbol_stats(&storage_key) {
764 Ok(persisted) => from_persisted_stats_map(persisted),
765 Err(e) => {
766 tracing::warn!(error = %e, "Failed to load persisted strategy stats (futures)");
767 HashMap::new()
768 }
769 };
770 return Ok(OrderHistorySnapshot {
771 rows: history,
772 stats,
773 strategy_stats,
774 fills,
775 open_qty: 0.0,
776 open_entry_price: 0.0,
777 estimated_total_pnl_usdt,
778 trade_data_complete: true,
779 fetched_at_ms,
780 fetch_latency_ms,
781 latest_event_ms: latest_order_event.max(latest_trade_event),
782 });
783 }
784
785 let fetch_started = Instant::now();
786 let fetched_at_ms = chrono::Utc::now().timestamp_millis() as u64;
787 let orders_result = self.rest_client.get_all_orders(&self.symbol, limit).await;
788 let storage_key = storage_symbol(&self.symbol, self.market);
789 let last_trade_id = order_store::load_last_trade_id(&storage_key).ok().flatten();
790 let persisted_trade_count = order_store::load_trade_count(&storage_key).unwrap_or(0);
791 let need_backfill = persisted_trade_count < limit;
792 let trades_result = match (need_backfill, last_trade_id) {
793 (true, _) => {
794 self.rest_client
795 .get_my_trades_history(&self.symbol, limit.max(1))
796 .await
797 }
798 (false, Some(last_id)) => {
799 self.rest_client
800 .get_my_trades_since(&self.symbol, last_id.saturating_add(1), 10)
801 .await
802 }
803 (false, None) => {
804 self.rest_client
805 .get_my_trades_history(&self.symbol, limit.max(1))
806 .await
807 }
808 };
809 let fetch_latency_ms = fetch_started.elapsed().as_millis() as u64;
810 let trade_data_complete = trades_result.is_ok();
811
812 if orders_result.is_err() && trades_result.is_err() {
813 let oe = orders_result.err().unwrap();
814 let te = trades_result.err().unwrap();
815 return Err(anyhow::anyhow!(
816 "order history fetch failed: allOrders={} | myTrades={}",
817 oe,
818 te
819 ));
820 }
821
822 let mut orders = match orders_result {
823 Ok(v) => v,
824 Err(e) => {
825 tracing::warn!(error = %e, "Failed to fetch allOrders; falling back to trade-only history");
826 Vec::new()
827 }
828 };
829 let recent_trades = match trades_result {
830 Ok(t) => t,
831 Err(e) => {
832 tracing::warn!(error = %e, "Failed to fetch myTrades; falling back to order-only history");
833 Vec::new()
834 }
835 };
836 let mut trades = recent_trades.clone();
837 orders.sort_by_key(|o| o.update_time.max(o.time));
838
839 if let Err(e) = order_store::persist_order_snapshot(&storage_key, &orders, &recent_trades) {
840 tracing::warn!(error = %e, "Failed to persist order snapshot to sqlite");
841 }
842 let mut persisted_source_by_order_id: HashMap<u64, String> = HashMap::new();
843 match order_store::load_persisted_trades(&storage_key) {
844 Ok(saved) => {
845 if !saved.is_empty() {
846 trades = saved.iter().map(|r| r.trade.clone()).collect();
847 for row in saved {
848 persisted_source_by_order_id
849 .entry(row.trade.order_id)
850 .or_insert(row.source);
851 }
852 }
853 }
854 Err(e) => {
855 tracing::warn!(error = %e, "Failed to load persisted trades; using recent API trades");
856 }
857 }
858
859 let (stats, open_pos) = compute_trade_state(trades.clone(), &self.symbol);
860 let estimated_total_pnl_usdt = if self.last_price > 0.0 {
861 Some(stats.realized_pnl + (open_pos.qty * self.last_price - open_pos.cost_quote))
862 } else {
863 Some(stats.realized_pnl)
864 };
865 let latest_order_event = orders.iter().map(|o| o.update_time.max(o.time)).max();
866 let latest_trade_event = trades.iter().map(|t| t.time).max();
867 let latest_event_ms = latest_order_event.max(latest_trade_event);
868
869 let mut trades_by_order_id: HashMap<u64, Vec<BinanceMyTrade>> = HashMap::new();
870 for trade in &trades {
871 trades_by_order_id
872 .entry(trade.order_id)
873 .or_default()
874 .push(trade.clone());
875 }
876 for bucket in trades_by_order_id.values_mut() {
877 bucket.sort_by_key(|t| t.time);
878 }
879
880 let mut order_source_by_id = HashMap::new();
881 for o in &orders {
882 order_source_by_id.insert(o.order_id, source_label_from_client_order_id(&o.client_order_id));
883 }
884 for (order_id, source) in persisted_source_by_order_id {
885 order_source_by_id.entry(order_id).or_insert(source);
886 }
887 let mut strategy_stats =
888 compute_trade_stats_by_source(trades.clone(), &order_source_by_id, &self.symbol);
889 let persisted_stats = to_persistable_stats_map(&strategy_stats);
890 if let Err(e) = order_store::persist_strategy_symbol_stats(&storage_key, &persisted_stats) {
891 tracing::warn!(error = %e, "Failed to persist strategy+symbol scoped stats");
892 }
893 if strategy_stats.is_empty() {
894 match order_store::load_strategy_symbol_stats(&storage_key) {
895 Ok(persisted) => {
896 strategy_stats = from_persisted_stats_map(persisted);
897 }
898 Err(e) => {
899 tracing::warn!(error = %e, "Failed to load persisted strategy+symbol stats");
900 }
901 }
902 }
903
904 let mut history = Vec::new();
905 let mut fills = Vec::new();
906 let mut used_trade_ids = std::collections::HashSet::new();
907
908 if orders.is_empty() && !trades.is_empty() {
909 let mut sorted = trades;
910 sorted.sort_by_key(|t| (t.time, t.id));
911 history.extend(sorted.iter().map(|t| {
912 fills.push(OrderHistoryFill {
913 timestamp_ms: t.time,
914 side: if t.is_buyer {
915 OrderSide::Buy
916 } else {
917 OrderSide::Sell
918 },
919 price: t.price,
920 });
921 format_trade_history_row(
922 t,
923 order_source_by_id
924 .get(&t.order_id)
925 .map(String::as_str)
926 .unwrap_or("UNKNOWN"),
927 )
928 }));
929 return Ok(OrderHistorySnapshot {
930 rows: history,
931 stats,
932 strategy_stats,
933 fills,
934 open_qty: open_pos.qty,
935 open_entry_price: if open_pos.qty > f64::EPSILON {
936 open_pos.cost_quote / open_pos.qty
937 } else {
938 0.0
939 },
940 estimated_total_pnl_usdt,
941 trade_data_complete,
942 fetched_at_ms,
943 fetch_latency_ms,
944 latest_event_ms,
945 });
946 }
947
948 for o in orders {
949 if o.executed_qty > 0.0 {
950 if let Some(order_trades) = trades_by_order_id.get(&o.order_id) {
951 for t in order_trades {
952 used_trade_ids.insert(t.id);
953 let side = if t.is_buyer { "BUY" } else { "SELL" };
954 fills.push(OrderHistoryFill {
955 timestamp_ms: t.time,
956 side: if t.is_buyer {
957 OrderSide::Buy
958 } else {
959 OrderSide::Sell
960 },
961 price: t.price,
962 });
963 history.push(format_order_history_row(
964 t.time,
965 "FILLED",
966 side,
967 t.qty,
968 t.price,
969 &format!(
970 "{}#T{} [{}]",
971 o.client_order_id,
972 t.id,
973 source_label_from_client_order_id(&o.client_order_id)
974 ),
975 ));
976 }
977 continue;
978 }
979 }
980
981 let avg_price = if o.executed_qty > 0.0 {
982 o.cummulative_quote_qty / o.executed_qty
983 } else {
984 o.price
985 };
986 history.push(format_order_history_row(
987 o.update_time.max(o.time),
988 &o.status,
989 &o.side,
990 display_qty_for_history(&o.status, o.orig_qty, o.executed_qty),
991 avg_price,
992 &o.client_order_id,
993 ));
994 }
995
996 for bucket in trades_by_order_id.values() {
998 for t in bucket {
999 if !used_trade_ids.contains(&t.id) {
1000 fills.push(OrderHistoryFill {
1001 timestamp_ms: t.time,
1002 side: if t.is_buyer {
1003 OrderSide::Buy
1004 } else {
1005 OrderSide::Sell
1006 },
1007 price: t.price,
1008 });
1009 history.push(format_trade_history_row(
1010 t,
1011 order_source_by_id
1012 .get(&t.order_id)
1013 .map(String::as_str)
1014 .unwrap_or("UNKNOWN"),
1015 ));
1016 }
1017 }
1018 }
1019 Ok(OrderHistorySnapshot {
1020 rows: history,
1021 stats,
1022 strategy_stats,
1023 fills,
1024 open_qty: open_pos.qty,
1025 open_entry_price: if open_pos.qty > f64::EPSILON {
1026 open_pos.cost_quote / open_pos.qty
1027 } else {
1028 0.0
1029 },
1030 estimated_total_pnl_usdt,
1031 trade_data_complete,
1032 fetched_at_ms,
1033 fetch_latency_ms,
1034 latest_event_ms,
1035 })
1036 }
1037
1038 pub async fn submit_order(
1063 &mut self,
1064 signal: Signal,
1065 source_tag: &str,
1066 ) -> Result<Option<OrderUpdate>> {
1067 let side = match &signal {
1068 Signal::Buy => OrderSide::Buy,
1069 Signal::Sell => OrderSide::Sell,
1070 Signal::Hold => return Ok(None),
1071 };
1072 let source_tag = source_tag.to_ascii_lowercase();
1073 let intent = OrderIntent {
1074 intent_id: format!("intent-{}", &uuid::Uuid::new_v4().to_string()[..8]),
1075 source_tag: source_tag.clone(),
1076 symbol: self.symbol.clone(),
1077 market: self.market,
1078 side,
1079 order_amount_usdt: self.order_amount_usdt,
1080 last_price: self.last_price,
1081 created_at_ms: chrono::Utc::now().timestamp_millis() as u64,
1082 };
1083 if let Some((reason_code, reason)) =
1084 self.evaluate_strategy_limits(&intent.source_tag, intent.created_at_ms)
1085 {
1086 return Ok(Some(OrderUpdate::Rejected {
1087 intent_id: intent.intent_id.clone(),
1088 client_order_id: "n/a".to_string(),
1089 reason_code,
1090 reason,
1091 }));
1092 }
1093 let decision = self
1094 .risk_module
1095 .evaluate_intent(&intent, &self.balances)
1096 .await?;
1097 if !decision.approved {
1098 return Ok(Some(OrderUpdate::Rejected {
1099 intent_id: intent.intent_id.clone(),
1100 client_order_id: "n/a".to_string(),
1101 reason_code: decision
1102 .reason_code
1103 .unwrap_or_else(|| RejectionReasonCode::RiskUnknown.as_str().to_string()),
1104 reason: decision
1105 .reason
1106 .unwrap_or_else(|| "Rejected by RiskModule".to_string()),
1107 }));
1108 }
1109 if !self.risk_module.reserve_rate_budget() {
1110 return Ok(Some(OrderUpdate::Rejected {
1111 intent_id: intent.intent_id.clone(),
1112 client_order_id: "n/a".to_string(),
1113 reason_code: RejectionReasonCode::RateGlobalBudgetExceeded
1114 .as_str()
1115 .to_string(),
1116 reason: "Global rate budget exceeded; try again after reset".to_string(),
1117 }));
1118 }
1119 if !self
1120 .risk_module
1121 .reserve_endpoint_budget(ApiEndpointGroup::Orders)
1122 {
1123 return Ok(Some(OrderUpdate::Rejected {
1124 intent_id: intent.intent_id.clone(),
1125 client_order_id: "n/a".to_string(),
1126 reason_code: RejectionReasonCode::RateEndpointBudgetExceeded
1127 .as_str()
1128 .to_string(),
1129 reason: "Orders endpoint budget exceeded; try again after reset".to_string(),
1130 }));
1131 }
1132 let qty = decision.normalized_qty;
1133 if let Some((reason_code, reason)) = self.evaluate_symbol_exposure_limit(side, qty) {
1134 return Ok(Some(OrderUpdate::Rejected {
1135 intent_id: intent.intent_id.clone(),
1136 client_order_id: "n/a".to_string(),
1137 reason_code,
1138 reason,
1139 }));
1140 }
1141 self.mark_strategy_submit(&intent.source_tag, intent.created_at_ms);
1142
1143 let client_order_id = format!(
1144 "sq-{}-{}",
1145 intent.source_tag,
1146 &uuid::Uuid::new_v4().to_string()[..8]
1147 );
1148
1149 let order = Order {
1150 client_order_id: client_order_id.clone(),
1151 server_order_id: None,
1152 symbol: self.symbol.clone(),
1153 side,
1154 order_type: OrderType::Market,
1155 quantity: qty,
1156 price: None,
1157 status: OrderStatus::PendingSubmit,
1158 created_at: chrono::Utc::now(),
1159 updated_at: chrono::Utc::now(),
1160 fills: vec![],
1161 };
1162
1163 self.active_orders.insert(client_order_id.clone(), order);
1164
1165 tracing::info!(
1166 side = %side,
1167 qty,
1168 usdt_amount = intent.order_amount_usdt,
1169 price = intent.last_price,
1170 intent_id = %intent.intent_id,
1171 created_at_ms = intent.created_at_ms,
1172 "Submitting order"
1173 );
1174
1175 let submit_res = if self.market == MarketKind::Futures {
1176 self.rest_client
1177 .place_futures_market_order(&self.symbol, side, qty, &client_order_id)
1178 .await
1179 } else {
1180 self.rest_client
1181 .place_market_order(&self.symbol, side, qty, &client_order_id)
1182 .await
1183 };
1184
1185 match submit_res {
1186 Ok(response) => {
1187 let update = self.process_order_response(
1188 &intent.intent_id,
1189 &client_order_id,
1190 side,
1191 &response,
1192 );
1193
1194 if matches!(update, OrderUpdate::Filled { .. }) {
1196 if let Err(e) = self.refresh_balances().await {
1197 tracing::warn!(error = %e, "Failed to refresh balances after fill");
1198 }
1199 }
1200
1201 Ok(Some(update))
1202 }
1203 Err(e) => {
1204 tracing::error!(
1205 client_order_id,
1206 error = %e,
1207 "Order rejected"
1208 );
1209 if let Some(order) = self.active_orders.get_mut(&client_order_id) {
1210 order.status = OrderStatus::Rejected;
1211 order.updated_at = chrono::Utc::now();
1212 }
1213 Ok(Some(OrderUpdate::Rejected {
1214 intent_id: intent.intent_id.clone(),
1215 client_order_id,
1216 reason_code: RejectionReasonCode::BrokerSubmitFailed.as_str().to_string(),
1217 reason: e.to_string(),
1218 }))
1219 }
1220 }
1221 }
1222
1223 fn process_order_response(
1224 &mut self,
1225 intent_id: &str,
1226 client_order_id: &str,
1227 side: OrderSide,
1228 response: &BinanceOrderResponse,
1229 ) -> OrderUpdate {
1230 let fills: Vec<Fill> = response
1231 .fills
1232 .iter()
1233 .map(|f| Fill {
1234 price: f.price,
1235 qty: f.qty,
1236 commission: f.commission,
1237 commission_asset: f.commission_asset.clone(),
1238 })
1239 .collect();
1240
1241 let status = OrderStatus::from_binance_str(&response.status);
1242
1243 if let Some(order) = self.active_orders.get_mut(client_order_id) {
1244 order.server_order_id = Some(response.order_id);
1245 order.status = status;
1246 order.fills = fills.clone();
1247 order.updated_at = chrono::Utc::now();
1248 }
1249
1250 if status == OrderStatus::Filled || status == OrderStatus::PartiallyFilled {
1251 self.position.apply_fill(side, &fills);
1252
1253 let avg_price = if fills.is_empty() {
1254 0.0
1255 } else {
1256 let total_value: f64 = fills.iter().map(|f| f.price * f.qty).sum();
1257 let total_qty: f64 = fills.iter().map(|f| f.qty).sum();
1258 total_value / total_qty
1259 };
1260
1261 tracing::info!(
1262 client_order_id,
1263 order_id = response.order_id,
1264 side = %side,
1265 avg_price,
1266 filled_qty = response.executed_qty,
1267 "Order filled"
1268 );
1269
1270 OrderUpdate::Filled {
1271 intent_id: intent_id.to_string(),
1272 client_order_id: client_order_id.to_string(),
1273 side,
1274 fills,
1275 avg_price,
1276 }
1277 } else {
1278 OrderUpdate::Submitted {
1279 intent_id: intent_id.to_string(),
1280 client_order_id: client_order_id.to_string(),
1281 server_order_id: response.order_id,
1282 }
1283 }
1284 }
1285}
1286
1287#[cfg(test)]
1288mod tests {
1289 use super::{display_qty_for_history, split_symbol_assets, OrderManager};
1290 use crate::binance::rest::BinanceRestClient;
1291 use crate::config::{EndpointRateLimitConfig, RiskConfig, SymbolExposureLimitConfig};
1292 use crate::model::order::{Order, OrderSide, OrderStatus, OrderType};
1293 use std::sync::Arc;
1294
1295 fn build_test_order_manager() -> OrderManager {
1296 let rest = Arc::new(BinanceRestClient::new(
1297 "https://demo-api.binance.com",
1298 "https://demo-fapi.binance.com",
1299 "k",
1300 "s",
1301 "fk",
1302 "fs",
1303 5000,
1304 ));
1305 let risk = RiskConfig {
1306 global_rate_limit_per_minute: 600,
1307 default_strategy_cooldown_ms: 3_000,
1308 default_strategy_max_active_orders: 1,
1309 default_symbol_max_exposure_usdt: 200.0,
1310 strategy_limits: vec![],
1311 symbol_exposure_limits: vec![SymbolExposureLimitConfig {
1312 symbol: "BTCUSDT".to_string(),
1313 market: Some("spot".to_string()),
1314 max_exposure_usdt: 150.0,
1315 }],
1316 endpoint_rate_limits: EndpointRateLimitConfig {
1317 orders_per_minute: 240,
1318 account_per_minute: 180,
1319 market_data_per_minute: 360,
1320 },
1321 };
1322 OrderManager::new(
1323 rest,
1324 "BTCUSDT",
1325 crate::order_manager::MarketKind::Spot,
1326 10.0,
1327 &risk,
1328 )
1329 }
1330
1331 #[test]
1332 fn valid_state_transitions() {
1333 let from = OrderStatus::PendingSubmit;
1335 let to = OrderStatus::Submitted;
1336 assert!(!from.is_terminal());
1337 assert!(!to.is_terminal());
1338
1339 let to = OrderStatus::Filled;
1341 assert!(to.is_terminal());
1342
1343 let to = OrderStatus::Rejected;
1345 assert!(to.is_terminal());
1346
1347 let to = OrderStatus::Cancelled;
1349 assert!(to.is_terminal());
1350 }
1351
1352 #[test]
1353 fn from_binance_str_mapping() {
1354 assert_eq!(OrderStatus::from_binance_str("NEW"), OrderStatus::Submitted);
1355 assert_eq!(OrderStatus::from_binance_str("FILLED"), OrderStatus::Filled);
1356 assert_eq!(
1357 OrderStatus::from_binance_str("CANCELED"),
1358 OrderStatus::Cancelled
1359 );
1360 assert_eq!(
1361 OrderStatus::from_binance_str("REJECTED"),
1362 OrderStatus::Rejected
1363 );
1364 assert_eq!(
1365 OrderStatus::from_binance_str("EXPIRED"),
1366 OrderStatus::Expired
1367 );
1368 assert_eq!(
1369 OrderStatus::from_binance_str("PARTIALLY_FILLED"),
1370 OrderStatus::PartiallyFilled
1371 );
1372 }
1373
1374 #[test]
1375 fn order_history_uses_executed_qty_for_filled_states() {
1376 assert!((display_qty_for_history("FILLED", 1.0, 0.4) - 0.4).abs() < f64::EPSILON);
1377 assert!((display_qty_for_history("PARTIALLY_FILLED", 1.0, 0.4) - 0.4).abs() < f64::EPSILON);
1378 }
1379
1380 #[test]
1381 fn order_history_uses_orig_qty_for_non_filled_states() {
1382 assert!((display_qty_for_history("NEW", 1.0, 0.4) - 1.0).abs() < f64::EPSILON);
1383 assert!((display_qty_for_history("CANCELED", 1.0, 0.4) - 1.0).abs() < f64::EPSILON);
1384 assert!((display_qty_for_history("REJECTED", 1.0, 0.0) - 1.0).abs() < f64::EPSILON);
1385 }
1386
1387 #[test]
1388 fn split_symbol_assets_parses_known_quote_suffixes() {
1389 assert_eq!(
1390 split_symbol_assets("ETHUSDT"),
1391 ("ETH".to_string(), "USDT".to_string())
1392 );
1393 assert_eq!(
1394 split_symbol_assets("ETHBTC"),
1395 ("ETH".to_string(), "BTC".to_string())
1396 );
1397 }
1398
1399 #[test]
1400 fn split_symbol_assets_falls_back_when_quote_unknown() {
1401 assert_eq!(
1402 split_symbol_assets("FOOBAR"),
1403 ("FOOBAR".to_string(), String::new())
1404 );
1405 }
1406
1407 #[test]
1408 fn strategy_limit_rejects_when_active_orders_reach_limit() {
1409 let mut mgr = build_test_order_manager();
1410 let client_order_id = "sq-cfg-abcdef12".to_string();
1411 mgr.active_orders.insert(
1412 client_order_id.clone(),
1413 Order {
1414 client_order_id,
1415 server_order_id: None,
1416 symbol: "BTCUSDT".to_string(),
1417 side: OrderSide::Buy,
1418 order_type: OrderType::Market,
1419 quantity: 0.1,
1420 price: None,
1421 status: OrderStatus::Submitted,
1422 created_at: chrono::Utc::now(),
1423 updated_at: chrono::Utc::now(),
1424 fills: vec![],
1425 },
1426 );
1427
1428 let rejected = mgr
1429 .evaluate_strategy_limits("cfg", chrono::Utc::now().timestamp_millis() as u64)
1430 .expect("must be rejected");
1431 assert_eq!(
1432 rejected.0,
1433 "risk.strategy_max_active_orders_exceeded".to_string()
1434 );
1435 }
1436
1437 #[test]
1438 fn strategy_limit_rejects_during_cooldown_window() {
1439 let mut mgr = build_test_order_manager();
1440 let now = chrono::Utc::now().timestamp_millis() as u64;
1441 mgr.mark_strategy_submit("cfg", now);
1442
1443 let rejected = mgr
1444 .evaluate_strategy_limits("cfg", now + 500)
1445 .expect("must be rejected");
1446 assert_eq!(rejected.0, "risk.strategy_cooldown_active".to_string());
1447 }
1448
1449 #[test]
1450 fn symbol_exposure_limit_rejects_when_projected_notional_exceeds_limit() {
1451 let mut mgr = build_test_order_manager();
1452 mgr.last_price = 100.0;
1453 let rejected = mgr
1455 .evaluate_symbol_exposure_limit(OrderSide::Buy, 2.0)
1456 .expect("must be rejected");
1457 assert_eq!(rejected.0, "risk.symbol_exposure_limit_exceeded".to_string());
1458 }
1459
1460 #[test]
1461 fn symbol_exposure_limit_allows_risk_reducing_order() {
1462 let mut mgr = build_test_order_manager();
1463 mgr.last_price = 100.0;
1464 mgr.position.side = Some(OrderSide::Buy);
1465 mgr.position.qty = 2.0; let rejected = mgr.evaluate_symbol_exposure_limit(OrderSide::Sell, 1.0);
1469 assert!(rejected.is_none());
1470 }
1471}