1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Instant;
4
5use anyhow::Result;
6use chrono::TimeZone;
7
8use crate::binance::rest::BinanceRestClient;
9use crate::binance::types::{BinanceMyTrade, BinanceOrderResponse};
10use crate::config::RiskConfig;
11use crate::model::order::{Fill, Order, OrderSide, OrderStatus, OrderType};
12use crate::model::position::Position;
13use crate::model::signal::Signal;
14use crate::order_store;
15use crate::risk_module::{OrderIntent, RateBudgetSnapshot, RejectionReasonCode, RiskModule};
16
17pub use crate::risk_module::MarketKind;
18
19#[derive(Debug, Clone)]
20pub enum OrderUpdate {
21 Submitted {
22 intent_id: String,
23 client_order_id: String,
24 server_order_id: u64,
25 },
26 Filled {
27 intent_id: String,
28 client_order_id: String,
29 side: OrderSide,
30 fills: Vec<Fill>,
31 avg_price: f64,
32 },
33 Rejected {
34 intent_id: String,
35 client_order_id: String,
36 reason_code: String,
37 reason: String,
38 },
39}
40
41#[derive(Debug, Clone, Default)]
42pub struct OrderHistoryStats {
43 pub trade_count: u32,
44 pub win_count: u32,
45 pub lose_count: u32,
46 pub realized_pnl: f64,
47}
48
49#[derive(Debug, Clone, Default)]
50pub struct OrderHistorySnapshot {
51 pub rows: Vec<String>,
52 pub stats: OrderHistoryStats,
53 pub strategy_stats: HashMap<String, OrderHistoryStats>,
54 pub fills: Vec<OrderHistoryFill>,
55 pub open_qty: f64,
56 pub open_entry_price: f64,
57 pub estimated_total_pnl_usdt: Option<f64>,
58 pub trade_data_complete: bool,
59 pub fetched_at_ms: u64,
60 pub fetch_latency_ms: u64,
61 pub latest_event_ms: Option<u64>,
62}
63
64#[derive(Debug, Clone)]
65pub struct OrderHistoryFill {
66 pub timestamp_ms: u64,
67 pub side: OrderSide,
68 pub price: f64,
69}
70
71pub struct OrderManager {
72 rest_client: Arc<BinanceRestClient>,
73 active_orders: HashMap<String, Order>,
74 position: Position,
75 symbol: String,
76 market: MarketKind,
77 order_amount_usdt: f64,
78 balances: HashMap<String, f64>,
79 last_price: f64,
80 risk_module: RiskModule,
81 default_strategy_cooldown_ms: u64,
82 default_strategy_max_active_orders: u32,
83 strategy_limits_by_tag: HashMap<String, StrategyExecutionLimit>,
84 last_strategy_submit_ms: HashMap<String, u64>,
85}
86
87#[derive(Debug, Clone, Copy)]
88struct StrategyExecutionLimit {
89 cooldown_ms: u64,
90 max_active_orders: u32,
91}
92
93fn storage_symbol(symbol: &str, market: MarketKind) -> String {
94 match market {
95 MarketKind::Spot => symbol.to_string(),
96 MarketKind::Futures => format!("{}#FUT", symbol),
97 }
98}
99
100fn display_qty_for_history(status: &str, orig_qty: f64, executed_qty: f64) -> f64 {
101 match status {
102 "FILLED" | "PARTIALLY_FILLED" => executed_qty,
103 _ => orig_qty,
104 }
105}
106
107fn format_history_time(timestamp_ms: u64) -> String {
108 chrono::Utc
109 .timestamp_millis_opt(timestamp_ms as i64)
110 .single()
111 .map(|dt| {
112 dt.with_timezone(&chrono::Local)
113 .format("%H:%M:%S")
114 .to_string()
115 })
116 .unwrap_or_else(|| "--:--:--".to_string())
117}
118
119fn format_order_history_row(
120 timestamp_ms: u64,
121 status: &str,
122 side: &str,
123 qty: f64,
124 avg_price: f64,
125 client_order_id: &str,
126) -> String {
127 format!(
128 "{} {:<10} {:<4} {:.5} @ {:.2} {}",
129 format_history_time(timestamp_ms),
130 status,
131 side,
132 qty,
133 avg_price,
134 client_order_id
135 )
136}
137
138fn source_label_from_client_order_id(client_order_id: &str) -> &'static str {
139 if client_order_id.contains("-mnl-") {
140 "MANUAL"
141 } else if client_order_id.contains("-cfg-") {
142 "MA(Config)"
143 } else if client_order_id.contains("-fst-") {
144 "MA(Fast 5/20)"
145 } else if client_order_id.contains("-slw-") {
146 "MA(Slow 20/60)"
147 } else {
148 "UNKNOWN"
149 }
150}
151
152fn format_trade_history_row(t: &BinanceMyTrade, source: &str) -> String {
153 let side = if t.is_buyer { "BUY" } else { "SELL" };
154 format_order_history_row(
155 t.time,
156 "FILLED",
157 side,
158 t.qty,
159 t.price,
160 &format!("order#{}#T{} [{}]", t.order_id, t.id, source),
161 )
162}
163
164fn split_symbol_assets(symbol: &str) -> (String, String) {
165 const QUOTE_SUFFIXES: [&str; 10] = [
166 "USDT", "USDC", "FDUSD", "BUSD", "TUSD", "TRY", "EUR", "BTC", "ETH", "BNB",
167 ];
168 for q in QUOTE_SUFFIXES {
169 if let Some(base) = symbol.strip_suffix(q) {
170 if !base.is_empty() {
171 return (base.to_string(), q.to_string());
172 }
173 }
174 }
175 (symbol.to_string(), String::new())
176}
177
178#[derive(Clone, Copy, Default)]
179struct LongPos {
180 qty: f64,
181 cost_quote: f64,
182}
183
184fn apply_spot_trade_with_fee(
185 pos: &mut LongPos,
186 stats: &mut OrderHistoryStats,
187 t: &BinanceMyTrade,
188 base_asset: &str,
189 quote_asset: &str,
190) {
191 let qty = t.qty.max(0.0);
192 if qty <= f64::EPSILON {
193 return;
194 }
195 let fee_asset = t.commission_asset.as_str();
196 let fee_is_base = !base_asset.is_empty() && fee_asset.eq_ignore_ascii_case(base_asset);
197 let fee_is_quote = !quote_asset.is_empty() && fee_asset.eq_ignore_ascii_case(quote_asset);
198
199 if t.is_buyer {
200 let net_qty = (qty
201 - if fee_is_base {
202 t.commission.max(0.0)
203 } else {
204 0.0
205 })
206 .max(0.0);
207 if net_qty <= f64::EPSILON {
208 return;
209 }
210 let fee_quote = if fee_is_quote {
211 t.commission.max(0.0)
212 } else {
213 0.0
214 };
215 pos.qty += net_qty;
216 pos.cost_quote += qty * t.price + fee_quote;
217 return;
218 }
219
220 if pos.qty <= f64::EPSILON {
222 return;
223 }
224 let close_qty = qty.min(pos.qty);
225 if close_qty <= f64::EPSILON {
226 return;
227 }
228 let avg_cost = pos.cost_quote / pos.qty.max(f64::EPSILON);
229 let fee_quote_total = if fee_is_quote {
230 t.commission.max(0.0)
231 } else if fee_is_base {
232 t.commission.max(0.0) * t.price
234 } else {
235 0.0
236 };
237 let fee_quote = fee_quote_total * (close_qty / qty.max(f64::EPSILON));
238 let pnl_delta = (close_qty * t.price - fee_quote) - (avg_cost * close_qty);
239 if pnl_delta > 0.0 {
240 stats.win_count += 1;
241 stats.trade_count += 1;
242 } else if pnl_delta < 0.0 {
243 stats.lose_count += 1;
244 stats.trade_count += 1;
245 }
246 stats.realized_pnl += pnl_delta;
247
248 pos.qty -= close_qty;
249 pos.cost_quote -= avg_cost * close_qty;
250 if pos.qty <= f64::EPSILON {
251 pos.qty = 0.0;
252 pos.cost_quote = 0.0;
253 }
254}
255
256fn compute_trade_state(
257 mut trades: Vec<BinanceMyTrade>,
258 symbol: &str,
259) -> (OrderHistoryStats, LongPos) {
260 trades.sort_by_key(|t| (t.time, t.id));
261 let (base_asset, quote_asset) = split_symbol_assets(symbol);
262 let mut pos = LongPos::default();
263 let mut stats = OrderHistoryStats::default();
264 for t in trades {
265 apply_spot_trade_with_fee(&mut pos, &mut stats, &t, &base_asset, "e_asset);
266 }
267 (stats, pos)
268}
269
270fn compute_trade_stats_by_source(
271 mut trades: Vec<BinanceMyTrade>,
272 order_source_by_id: &HashMap<u64, String>,
273 symbol: &str,
274) -> HashMap<String, OrderHistoryStats> {
275 trades.sort_by_key(|t| (t.time, t.id));
276 let (base_asset, quote_asset) = split_symbol_assets(symbol);
277 let mut pos_by_source: HashMap<String, LongPos> = HashMap::new();
278 let mut stats_by_source: HashMap<String, OrderHistoryStats> = HashMap::new();
279
280 for t in trades {
281 let source = order_source_by_id
282 .get(&t.order_id)
283 .cloned()
284 .unwrap_or_else(|| "UNKNOWN".to_string());
285 let pos = pos_by_source.entry(source.clone()).or_default();
286 let stats = stats_by_source.entry(source).or_default();
287 apply_spot_trade_with_fee(pos, stats, &t, &base_asset, "e_asset);
288 }
289
290 stats_by_source
291}
292
293impl OrderManager {
294 pub fn new(
303 rest_client: Arc<BinanceRestClient>,
304 symbol: &str,
305 market: MarketKind,
306 order_amount_usdt: f64,
307 risk_config: &RiskConfig,
308 ) -> Self {
309 let mut strategy_limits_by_tag = HashMap::new();
310 let default_strategy_cooldown_ms = risk_config.default_strategy_cooldown_ms;
311 let default_strategy_max_active_orders = risk_config.default_strategy_max_active_orders.max(1);
312 for profile in &risk_config.strategy_limits {
313 let source_tag = profile.source_tag.trim().to_ascii_lowercase();
314 if source_tag.is_empty() {
315 continue;
316 }
317 strategy_limits_by_tag.insert(
318 source_tag,
319 StrategyExecutionLimit {
320 cooldown_ms: profile
321 .cooldown_ms
322 .unwrap_or(default_strategy_cooldown_ms),
323 max_active_orders: profile
324 .max_active_orders
325 .unwrap_or(default_strategy_max_active_orders)
326 .max(1),
327 },
328 );
329 }
330 Self {
331 rest_client: rest_client.clone(),
332 active_orders: HashMap::new(),
333 position: Position::new(symbol.to_string()),
334 symbol: symbol.to_string(),
335 market,
336 order_amount_usdt,
337 balances: HashMap::new(),
338 last_price: 0.0,
339 risk_module: RiskModule::new(
340 rest_client.clone(),
341 risk_config.global_rate_limit_per_minute,
342 ),
343 default_strategy_cooldown_ms,
344 default_strategy_max_active_orders,
345 strategy_limits_by_tag,
346 last_strategy_submit_ms: HashMap::new(),
347 }
348 }
349
350 pub fn position(&self) -> &Position {
355 &self.position
356 }
357
358 pub fn balances(&self) -> &HashMap<String, f64> {
363 &self.balances
364 }
365
366 pub fn update_unrealized_pnl(&mut self, current_price: f64) {
372 self.last_price = current_price;
373 self.position.update_unrealized_pnl(current_price);
374 }
375
376 pub fn rate_budget_snapshot(&self) -> RateBudgetSnapshot {
380 self.risk_module.rate_budget_snapshot()
381 }
382
383 fn strategy_limits_for(&self, source_tag: &str) -> StrategyExecutionLimit {
384 self.strategy_limits_by_tag
385 .get(source_tag)
386 .copied()
387 .unwrap_or(StrategyExecutionLimit {
388 cooldown_ms: self.default_strategy_cooldown_ms,
389 max_active_orders: self.default_strategy_max_active_orders,
390 })
391 }
392
393 fn active_order_count_for_source(&self, source_tag: &str) -> u32 {
394 let prefix = format!("sq-{}-", source_tag);
395 self.active_orders
396 .values()
397 .filter(|o| !o.status.is_terminal() && o.client_order_id.starts_with(&prefix))
398 .count() as u32
399 }
400
401 fn evaluate_strategy_limits(
402 &self,
403 source_tag: &str,
404 created_at_ms: u64,
405 ) -> Option<(String, String)> {
406 let limits = self.strategy_limits_for(source_tag);
407 let active_count = self.active_order_count_for_source(source_tag);
408 if active_count >= limits.max_active_orders {
409 return Some((
410 RejectionReasonCode::RiskStrategyMaxActiveOrdersExceeded
411 .as_str()
412 .to_string(),
413 format!(
414 "Strategy '{}' active order limit exceeded (active {}, limit {})",
415 source_tag, active_count, limits.max_active_orders
416 ),
417 ));
418 }
419
420 if limits.cooldown_ms > 0 {
421 if let Some(last_submit_ms) = self.last_strategy_submit_ms.get(source_tag) {
422 let elapsed = created_at_ms.saturating_sub(*last_submit_ms);
423 if elapsed < limits.cooldown_ms {
424 let remaining = limits.cooldown_ms - elapsed;
425 return Some((
426 RejectionReasonCode::RiskStrategyCooldownActive
427 .as_str()
428 .to_string(),
429 format!(
430 "Strategy '{}' cooldown active ({}ms remaining)",
431 source_tag, remaining
432 ),
433 ));
434 }
435 }
436 }
437
438 None
439 }
440
441 fn mark_strategy_submit(&mut self, source_tag: &str, created_at_ms: u64) {
442 self.last_strategy_submit_ms
443 .insert(source_tag.to_string(), created_at_ms);
444 }
445
446 pub async fn refresh_balances(&mut self) -> Result<HashMap<String, f64>> {
458 if self.market == MarketKind::Futures {
459 let account = self.rest_client.get_futures_account().await?;
460 self.balances.clear();
461 for a in &account.assets {
462 if a.wallet_balance.abs() > f64::EPSILON {
463 self.balances.insert(a.asset.clone(), a.available_balance);
464 }
465 }
466 return Ok(self.balances.clone());
467 }
468 let account = self.rest_client.get_account().await?;
469 self.balances.clear();
470 for b in &account.balances {
471 let total = b.free + b.locked;
472 if total > 0.0 {
473 self.balances.insert(b.asset.clone(), b.free);
474 }
475 }
476 tracing::info!(balances = ?self.balances, "Balances refreshed");
477 Ok(self.balances.clone())
478 }
479
480 pub async fn refresh_order_history(&self, limit: usize) -> Result<OrderHistorySnapshot> {
489 if self.market == MarketKind::Futures {
490 let fetch_started = Instant::now();
491 let fetched_at_ms = chrono::Utc::now().timestamp_millis() as u64;
492 let orders_result = self
493 .rest_client
494 .get_futures_all_orders(&self.symbol, limit)
495 .await;
496 let trades_result = self
497 .rest_client
498 .get_futures_my_trades_history(&self.symbol, limit.max(1))
499 .await;
500 let fetch_latency_ms = fetch_started.elapsed().as_millis() as u64;
501
502 if orders_result.is_err() && trades_result.is_err() {
503 let oe = orders_result.err().unwrap();
504 let te = trades_result.err().unwrap();
505 return Err(anyhow::anyhow!(
506 "futures order history fetch failed: allOrders={} | userTrades={}",
507 oe,
508 te
509 ));
510 }
511
512 let mut orders = orders_result.unwrap_or_default();
513 let trades = trades_result.unwrap_or_default();
514 orders.sort_by_key(|o| o.update_time.max(o.time));
515
516 let storage_key = storage_symbol(&self.symbol, self.market);
517 if let Err(e) = order_store::persist_order_snapshot(&storage_key, &orders, &trades) {
518 tracing::warn!(error = %e, "Failed to persist futures order snapshot to sqlite");
519 }
520
521 let mut history = Vec::new();
522 let mut fills = Vec::new();
523 for t in &trades {
524 let side = if t.is_buyer { "BUY" } else { "SELL" };
525 fills.push(OrderHistoryFill {
526 timestamp_ms: t.time,
527 side: if t.is_buyer {
528 OrderSide::Buy
529 } else {
530 OrderSide::Sell
531 },
532 price: t.price,
533 });
534 history.push(format_order_history_row(
535 t.time,
536 "FILLED",
537 side,
538 t.qty,
539 t.price,
540 &format!("order#{}#T{} [FUT]", t.order_id, t.id),
541 ));
542 }
543 for o in &orders {
544 if o.executed_qty <= 0.0 {
545 history.push(format_order_history_row(
546 o.update_time.max(o.time),
547 &o.status,
548 &o.side,
549 display_qty_for_history(&o.status, o.orig_qty, o.executed_qty),
550 if o.executed_qty > 0.0 {
551 o.cummulative_quote_qty / o.executed_qty
552 } else {
553 o.price
554 },
555 &o.client_order_id,
556 ));
557 }
558 }
559
560 let mut stats = OrderHistoryStats::default();
561 for t in &trades {
562 if t.realized_pnl > 0.0 {
563 stats.win_count += 1;
564 stats.trade_count += 1;
565 } else if t.realized_pnl < 0.0 {
566 stats.lose_count += 1;
567 stats.trade_count += 1;
568 }
569 stats.realized_pnl += t.realized_pnl;
570 }
571 let estimated_total_pnl_usdt = Some(stats.realized_pnl);
572 let latest_order_event = orders.iter().map(|o| o.update_time.max(o.time)).max();
573 let latest_trade_event = trades.iter().map(|t| t.time).max();
574 return Ok(OrderHistorySnapshot {
575 rows: history,
576 stats,
577 strategy_stats: HashMap::new(),
578 fills,
579 open_qty: 0.0,
580 open_entry_price: 0.0,
581 estimated_total_pnl_usdt,
582 trade_data_complete: true,
583 fetched_at_ms,
584 fetch_latency_ms,
585 latest_event_ms: latest_order_event.max(latest_trade_event),
586 });
587 }
588
589 let fetch_started = Instant::now();
590 let fetched_at_ms = chrono::Utc::now().timestamp_millis() as u64;
591 let orders_result = self.rest_client.get_all_orders(&self.symbol, limit).await;
592 let storage_key = storage_symbol(&self.symbol, self.market);
593 let last_trade_id = order_store::load_last_trade_id(&storage_key).ok().flatten();
594 let persisted_trade_count = order_store::load_trade_count(&storage_key).unwrap_or(0);
595 let need_backfill = persisted_trade_count < limit;
596 let trades_result = match (need_backfill, last_trade_id) {
597 (true, _) => {
598 self.rest_client
599 .get_my_trades_history(&self.symbol, limit.max(1))
600 .await
601 }
602 (false, Some(last_id)) => {
603 self.rest_client
604 .get_my_trades_since(&self.symbol, last_id.saturating_add(1), 10)
605 .await
606 }
607 (false, None) => {
608 self.rest_client
609 .get_my_trades_history(&self.symbol, limit.max(1))
610 .await
611 }
612 };
613 let fetch_latency_ms = fetch_started.elapsed().as_millis() as u64;
614 let trade_data_complete = trades_result.is_ok();
615
616 if orders_result.is_err() && trades_result.is_err() {
617 let oe = orders_result.err().unwrap();
618 let te = trades_result.err().unwrap();
619 return Err(anyhow::anyhow!(
620 "order history fetch failed: allOrders={} | myTrades={}",
621 oe,
622 te
623 ));
624 }
625
626 let mut orders = match orders_result {
627 Ok(v) => v,
628 Err(e) => {
629 tracing::warn!(error = %e, "Failed to fetch allOrders; falling back to trade-only history");
630 Vec::new()
631 }
632 };
633 let recent_trades = match trades_result {
634 Ok(t) => t,
635 Err(e) => {
636 tracing::warn!(error = %e, "Failed to fetch myTrades; falling back to order-only history");
637 Vec::new()
638 }
639 };
640 let mut trades = recent_trades.clone();
641 orders.sort_by_key(|o| o.update_time.max(o.time));
642
643 if let Err(e) = order_store::persist_order_snapshot(&storage_key, &orders, &recent_trades) {
644 tracing::warn!(error = %e, "Failed to persist order snapshot to sqlite");
645 }
646 let mut persisted_source_by_order_id: HashMap<u64, String> = HashMap::new();
647 match order_store::load_persisted_trades(&storage_key) {
648 Ok(saved) => {
649 if !saved.is_empty() {
650 trades = saved.iter().map(|r| r.trade.clone()).collect();
651 for row in saved {
652 persisted_source_by_order_id
653 .entry(row.trade.order_id)
654 .or_insert(row.source);
655 }
656 }
657 }
658 Err(e) => {
659 tracing::warn!(error = %e, "Failed to load persisted trades; using recent API trades");
660 }
661 }
662
663 let (stats, open_pos) = compute_trade_state(trades.clone(), &self.symbol);
664 let estimated_total_pnl_usdt = if self.last_price > 0.0 {
665 Some(stats.realized_pnl + (open_pos.qty * self.last_price - open_pos.cost_quote))
666 } else {
667 Some(stats.realized_pnl)
668 };
669 let latest_order_event = orders.iter().map(|o| o.update_time.max(o.time)).max();
670 let latest_trade_event = trades.iter().map(|t| t.time).max();
671 let latest_event_ms = latest_order_event.max(latest_trade_event);
672
673 let mut trades_by_order_id: HashMap<u64, Vec<BinanceMyTrade>> = HashMap::new();
674 for trade in &trades {
675 trades_by_order_id
676 .entry(trade.order_id)
677 .or_default()
678 .push(trade.clone());
679 }
680 for bucket in trades_by_order_id.values_mut() {
681 bucket.sort_by_key(|t| t.time);
682 }
683
684 let mut order_source_by_id = HashMap::new();
685 for o in &orders {
686 order_source_by_id.insert(
687 o.order_id,
688 source_label_from_client_order_id(&o.client_order_id).to_string(),
689 );
690 }
691 for (order_id, source) in persisted_source_by_order_id {
692 order_source_by_id.entry(order_id).or_insert(source);
693 }
694 let strategy_stats =
695 compute_trade_stats_by_source(trades.clone(), &order_source_by_id, &self.symbol);
696
697 let mut history = Vec::new();
698 let mut fills = Vec::new();
699 let mut used_trade_ids = std::collections::HashSet::new();
700
701 if orders.is_empty() && !trades.is_empty() {
702 let mut sorted = trades;
703 sorted.sort_by_key(|t| (t.time, t.id));
704 history.extend(sorted.iter().map(|t| {
705 fills.push(OrderHistoryFill {
706 timestamp_ms: t.time,
707 side: if t.is_buyer {
708 OrderSide::Buy
709 } else {
710 OrderSide::Sell
711 },
712 price: t.price,
713 });
714 format_trade_history_row(
715 t,
716 order_source_by_id
717 .get(&t.order_id)
718 .map(String::as_str)
719 .unwrap_or("UNKNOWN"),
720 )
721 }));
722 return Ok(OrderHistorySnapshot {
723 rows: history,
724 stats,
725 strategy_stats,
726 fills,
727 open_qty: open_pos.qty,
728 open_entry_price: if open_pos.qty > f64::EPSILON {
729 open_pos.cost_quote / open_pos.qty
730 } else {
731 0.0
732 },
733 estimated_total_pnl_usdt,
734 trade_data_complete,
735 fetched_at_ms,
736 fetch_latency_ms,
737 latest_event_ms,
738 });
739 }
740
741 for o in orders {
742 if o.executed_qty > 0.0 {
743 if let Some(order_trades) = trades_by_order_id.get(&o.order_id) {
744 for t in order_trades {
745 used_trade_ids.insert(t.id);
746 let side = if t.is_buyer { "BUY" } else { "SELL" };
747 fills.push(OrderHistoryFill {
748 timestamp_ms: t.time,
749 side: if t.is_buyer {
750 OrderSide::Buy
751 } else {
752 OrderSide::Sell
753 },
754 price: t.price,
755 });
756 history.push(format_order_history_row(
757 t.time,
758 "FILLED",
759 side,
760 t.qty,
761 t.price,
762 &format!(
763 "{}#T{} [{}]",
764 o.client_order_id,
765 t.id,
766 source_label_from_client_order_id(&o.client_order_id)
767 ),
768 ));
769 }
770 continue;
771 }
772 }
773
774 let avg_price = if o.executed_qty > 0.0 {
775 o.cummulative_quote_qty / o.executed_qty
776 } else {
777 o.price
778 };
779 history.push(format_order_history_row(
780 o.update_time.max(o.time),
781 &o.status,
782 &o.side,
783 display_qty_for_history(&o.status, o.orig_qty, o.executed_qty),
784 avg_price,
785 &o.client_order_id,
786 ));
787 }
788
789 for bucket in trades_by_order_id.values() {
791 for t in bucket {
792 if !used_trade_ids.contains(&t.id) {
793 fills.push(OrderHistoryFill {
794 timestamp_ms: t.time,
795 side: if t.is_buyer {
796 OrderSide::Buy
797 } else {
798 OrderSide::Sell
799 },
800 price: t.price,
801 });
802 history.push(format_trade_history_row(
803 t,
804 order_source_by_id
805 .get(&t.order_id)
806 .map(String::as_str)
807 .unwrap_or("UNKNOWN"),
808 ));
809 }
810 }
811 }
812 Ok(OrderHistorySnapshot {
813 rows: history,
814 stats,
815 strategy_stats,
816 fills,
817 open_qty: open_pos.qty,
818 open_entry_price: if open_pos.qty > f64::EPSILON {
819 open_pos.cost_quote / open_pos.qty
820 } else {
821 0.0
822 },
823 estimated_total_pnl_usdt,
824 trade_data_complete,
825 fetched_at_ms,
826 fetch_latency_ms,
827 latest_event_ms,
828 })
829 }
830
831 pub async fn submit_order(
856 &mut self,
857 signal: Signal,
858 source_tag: &str,
859 ) -> Result<Option<OrderUpdate>> {
860 let side = match &signal {
861 Signal::Buy => OrderSide::Buy,
862 Signal::Sell => OrderSide::Sell,
863 Signal::Hold => return Ok(None),
864 };
865 let source_tag = source_tag.to_ascii_lowercase();
866 let intent = OrderIntent {
867 intent_id: format!("intent-{}", &uuid::Uuid::new_v4().to_string()[..8]),
868 source_tag: source_tag.clone(),
869 symbol: self.symbol.clone(),
870 market: self.market,
871 side,
872 order_amount_usdt: self.order_amount_usdt,
873 last_price: self.last_price,
874 created_at_ms: chrono::Utc::now().timestamp_millis() as u64,
875 };
876 if let Some((reason_code, reason)) =
877 self.evaluate_strategy_limits(&intent.source_tag, intent.created_at_ms)
878 {
879 return Ok(Some(OrderUpdate::Rejected {
880 intent_id: intent.intent_id.clone(),
881 client_order_id: "n/a".to_string(),
882 reason_code,
883 reason,
884 }));
885 }
886 let decision = self
887 .risk_module
888 .evaluate_intent(&intent, &self.balances)
889 .await?;
890 if !decision.approved {
891 return Ok(Some(OrderUpdate::Rejected {
892 intent_id: intent.intent_id.clone(),
893 client_order_id: "n/a".to_string(),
894 reason_code: decision
895 .reason_code
896 .unwrap_or_else(|| RejectionReasonCode::RiskUnknown.as_str().to_string()),
897 reason: decision
898 .reason
899 .unwrap_or_else(|| "Rejected by RiskModule".to_string()),
900 }));
901 }
902 if !self.risk_module.reserve_rate_budget() {
903 return Ok(Some(OrderUpdate::Rejected {
904 intent_id: intent.intent_id.clone(),
905 client_order_id: "n/a".to_string(),
906 reason_code: RejectionReasonCode::RateGlobalBudgetExceeded
907 .as_str()
908 .to_string(),
909 reason: "Global rate budget exceeded; try again after reset".to_string(),
910 }));
911 }
912 let qty = decision.normalized_qty;
913 self.mark_strategy_submit(&intent.source_tag, intent.created_at_ms);
914
915 let client_order_id = format!(
916 "sq-{}-{}",
917 intent.source_tag,
918 &uuid::Uuid::new_v4().to_string()[..8]
919 );
920
921 let order = Order {
922 client_order_id: client_order_id.clone(),
923 server_order_id: None,
924 symbol: self.symbol.clone(),
925 side,
926 order_type: OrderType::Market,
927 quantity: qty,
928 price: None,
929 status: OrderStatus::PendingSubmit,
930 created_at: chrono::Utc::now(),
931 updated_at: chrono::Utc::now(),
932 fills: vec![],
933 };
934
935 self.active_orders.insert(client_order_id.clone(), order);
936
937 tracing::info!(
938 side = %side,
939 qty,
940 usdt_amount = intent.order_amount_usdt,
941 price = intent.last_price,
942 intent_id = %intent.intent_id,
943 created_at_ms = intent.created_at_ms,
944 "Submitting order"
945 );
946
947 let submit_res = if self.market == MarketKind::Futures {
948 self.rest_client
949 .place_futures_market_order(&self.symbol, side, qty, &client_order_id)
950 .await
951 } else {
952 self.rest_client
953 .place_market_order(&self.symbol, side, qty, &client_order_id)
954 .await
955 };
956
957 match submit_res {
958 Ok(response) => {
959 let update = self.process_order_response(
960 &intent.intent_id,
961 &client_order_id,
962 side,
963 &response,
964 );
965
966 if matches!(update, OrderUpdate::Filled { .. }) {
968 if let Err(e) = self.refresh_balances().await {
969 tracing::warn!(error = %e, "Failed to refresh balances after fill");
970 }
971 }
972
973 Ok(Some(update))
974 }
975 Err(e) => {
976 tracing::error!(
977 client_order_id,
978 error = %e,
979 "Order rejected"
980 );
981 if let Some(order) = self.active_orders.get_mut(&client_order_id) {
982 order.status = OrderStatus::Rejected;
983 order.updated_at = chrono::Utc::now();
984 }
985 Ok(Some(OrderUpdate::Rejected {
986 intent_id: intent.intent_id.clone(),
987 client_order_id,
988 reason_code: RejectionReasonCode::BrokerSubmitFailed.as_str().to_string(),
989 reason: e.to_string(),
990 }))
991 }
992 }
993 }
994
995 fn process_order_response(
996 &mut self,
997 intent_id: &str,
998 client_order_id: &str,
999 side: OrderSide,
1000 response: &BinanceOrderResponse,
1001 ) -> OrderUpdate {
1002 let fills: Vec<Fill> = response
1003 .fills
1004 .iter()
1005 .map(|f| Fill {
1006 price: f.price,
1007 qty: f.qty,
1008 commission: f.commission,
1009 commission_asset: f.commission_asset.clone(),
1010 })
1011 .collect();
1012
1013 let status = OrderStatus::from_binance_str(&response.status);
1014
1015 if let Some(order) = self.active_orders.get_mut(client_order_id) {
1016 order.server_order_id = Some(response.order_id);
1017 order.status = status;
1018 order.fills = fills.clone();
1019 order.updated_at = chrono::Utc::now();
1020 }
1021
1022 if status == OrderStatus::Filled || status == OrderStatus::PartiallyFilled {
1023 self.position.apply_fill(side, &fills);
1024
1025 let avg_price = if fills.is_empty() {
1026 0.0
1027 } else {
1028 let total_value: f64 = fills.iter().map(|f| f.price * f.qty).sum();
1029 let total_qty: f64 = fills.iter().map(|f| f.qty).sum();
1030 total_value / total_qty
1031 };
1032
1033 tracing::info!(
1034 client_order_id,
1035 order_id = response.order_id,
1036 side = %side,
1037 avg_price,
1038 filled_qty = response.executed_qty,
1039 "Order filled"
1040 );
1041
1042 OrderUpdate::Filled {
1043 intent_id: intent_id.to_string(),
1044 client_order_id: client_order_id.to_string(),
1045 side,
1046 fills,
1047 avg_price,
1048 }
1049 } else {
1050 OrderUpdate::Submitted {
1051 intent_id: intent_id.to_string(),
1052 client_order_id: client_order_id.to_string(),
1053 server_order_id: response.order_id,
1054 }
1055 }
1056 }
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061 use super::{display_qty_for_history, split_symbol_assets, OrderManager};
1062 use crate::binance::rest::BinanceRestClient;
1063 use crate::config::RiskConfig;
1064 use crate::model::order::{Order, OrderSide, OrderStatus, OrderType};
1065 use std::sync::Arc;
1066
1067 fn build_test_order_manager() -> OrderManager {
1068 let rest = Arc::new(BinanceRestClient::new(
1069 "https://demo-api.binance.com",
1070 "https://demo-fapi.binance.com",
1071 "k",
1072 "s",
1073 "fk",
1074 "fs",
1075 5000,
1076 ));
1077 let risk = RiskConfig {
1078 global_rate_limit_per_minute: 600,
1079 default_strategy_cooldown_ms: 3_000,
1080 default_strategy_max_active_orders: 1,
1081 strategy_limits: vec![],
1082 };
1083 OrderManager::new(
1084 rest,
1085 "BTCUSDT",
1086 crate::order_manager::MarketKind::Spot,
1087 10.0,
1088 &risk,
1089 )
1090 }
1091
1092 #[test]
1093 fn valid_state_transitions() {
1094 let from = OrderStatus::PendingSubmit;
1096 let to = OrderStatus::Submitted;
1097 assert!(!from.is_terminal());
1098 assert!(!to.is_terminal());
1099
1100 let to = OrderStatus::Filled;
1102 assert!(to.is_terminal());
1103
1104 let to = OrderStatus::Rejected;
1106 assert!(to.is_terminal());
1107
1108 let to = OrderStatus::Cancelled;
1110 assert!(to.is_terminal());
1111 }
1112
1113 #[test]
1114 fn from_binance_str_mapping() {
1115 assert_eq!(OrderStatus::from_binance_str("NEW"), OrderStatus::Submitted);
1116 assert_eq!(OrderStatus::from_binance_str("FILLED"), OrderStatus::Filled);
1117 assert_eq!(
1118 OrderStatus::from_binance_str("CANCELED"),
1119 OrderStatus::Cancelled
1120 );
1121 assert_eq!(
1122 OrderStatus::from_binance_str("REJECTED"),
1123 OrderStatus::Rejected
1124 );
1125 assert_eq!(
1126 OrderStatus::from_binance_str("EXPIRED"),
1127 OrderStatus::Expired
1128 );
1129 assert_eq!(
1130 OrderStatus::from_binance_str("PARTIALLY_FILLED"),
1131 OrderStatus::PartiallyFilled
1132 );
1133 }
1134
1135 #[test]
1136 fn order_history_uses_executed_qty_for_filled_states() {
1137 assert!((display_qty_for_history("FILLED", 1.0, 0.4) - 0.4).abs() < f64::EPSILON);
1138 assert!((display_qty_for_history("PARTIALLY_FILLED", 1.0, 0.4) - 0.4).abs() < f64::EPSILON);
1139 }
1140
1141 #[test]
1142 fn order_history_uses_orig_qty_for_non_filled_states() {
1143 assert!((display_qty_for_history("NEW", 1.0, 0.4) - 1.0).abs() < f64::EPSILON);
1144 assert!((display_qty_for_history("CANCELED", 1.0, 0.4) - 1.0).abs() < f64::EPSILON);
1145 assert!((display_qty_for_history("REJECTED", 1.0, 0.0) - 1.0).abs() < f64::EPSILON);
1146 }
1147
1148 #[test]
1149 fn split_symbol_assets_parses_known_quote_suffixes() {
1150 assert_eq!(
1151 split_symbol_assets("ETHUSDT"),
1152 ("ETH".to_string(), "USDT".to_string())
1153 );
1154 assert_eq!(
1155 split_symbol_assets("ETHBTC"),
1156 ("ETH".to_string(), "BTC".to_string())
1157 );
1158 }
1159
1160 #[test]
1161 fn split_symbol_assets_falls_back_when_quote_unknown() {
1162 assert_eq!(
1163 split_symbol_assets("FOOBAR"),
1164 ("FOOBAR".to_string(), String::new())
1165 );
1166 }
1167
1168 #[test]
1169 fn strategy_limit_rejects_when_active_orders_reach_limit() {
1170 let mut mgr = build_test_order_manager();
1171 let client_order_id = "sq-cfg-abcdef12".to_string();
1172 mgr.active_orders.insert(
1173 client_order_id.clone(),
1174 Order {
1175 client_order_id,
1176 server_order_id: None,
1177 symbol: "BTCUSDT".to_string(),
1178 side: OrderSide::Buy,
1179 order_type: OrderType::Market,
1180 quantity: 0.1,
1181 price: None,
1182 status: OrderStatus::Submitted,
1183 created_at: chrono::Utc::now(),
1184 updated_at: chrono::Utc::now(),
1185 fills: vec![],
1186 },
1187 );
1188
1189 let rejected = mgr
1190 .evaluate_strategy_limits("cfg", chrono::Utc::now().timestamp_millis() as u64)
1191 .expect("must be rejected");
1192 assert_eq!(
1193 rejected.0,
1194 "risk.strategy_max_active_orders_exceeded".to_string()
1195 );
1196 }
1197
1198 #[test]
1199 fn strategy_limit_rejects_during_cooldown_window() {
1200 let mut mgr = build_test_order_manager();
1201 let now = chrono::Utc::now().timestamp_millis() as u64;
1202 mgr.mark_strategy_submit("cfg", now);
1203
1204 let rejected = mgr
1205 .evaluate_strategy_limits("cfg", now + 500)
1206 .expect("must be rejected");
1207 assert_eq!(rejected.0, "risk.strategy_cooldown_active".to_string());
1208 }
1209}