Skip to main content

sandbox_quant/
order_manager.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Instant;
4
5use anyhow::Result;
6use chrono::TimeZone;
7
8use crate::binance::rest::BinanceRestClient;
9use crate::binance::types::{BinanceMyTrade, BinanceOrderResponse};
10use crate::config::RiskConfig;
11use crate::model::order::{Fill, Order, OrderSide, OrderStatus, OrderType};
12use crate::model::position::Position;
13use crate::model::signal::Signal;
14use crate::order_store;
15use crate::risk_module::{OrderIntent, RateBudgetSnapshot, RejectionReasonCode, RiskModule};
16
17pub use crate::risk_module::MarketKind;
18
19#[derive(Debug, Clone)]
20pub enum OrderUpdate {
21    Submitted {
22        intent_id: String,
23        client_order_id: String,
24        server_order_id: u64,
25    },
26    Filled {
27        intent_id: String,
28        client_order_id: String,
29        side: OrderSide,
30        fills: Vec<Fill>,
31        avg_price: f64,
32    },
33    Rejected {
34        intent_id: String,
35        client_order_id: String,
36        reason_code: String,
37        reason: String,
38    },
39}
40
41#[derive(Debug, Clone, Default)]
42pub struct OrderHistoryStats {
43    pub trade_count: u32,
44    pub win_count: u32,
45    pub lose_count: u32,
46    pub realized_pnl: f64,
47}
48
49#[derive(Debug, Clone, Default)]
50pub struct OrderHistorySnapshot {
51    pub rows: Vec<String>,
52    pub stats: OrderHistoryStats,
53    pub strategy_stats: HashMap<String, OrderHistoryStats>,
54    pub fills: Vec<OrderHistoryFill>,
55    pub open_qty: f64,
56    pub open_entry_price: f64,
57    pub estimated_total_pnl_usdt: Option<f64>,
58    pub trade_data_complete: bool,
59    pub fetched_at_ms: u64,
60    pub fetch_latency_ms: u64,
61    pub latest_event_ms: Option<u64>,
62}
63
64#[derive(Debug, Clone)]
65pub struct OrderHistoryFill {
66    pub timestamp_ms: u64,
67    pub side: OrderSide,
68    pub price: f64,
69}
70
71pub struct OrderManager {
72    rest_client: Arc<BinanceRestClient>,
73    active_orders: HashMap<String, Order>,
74    position: Position,
75    symbol: String,
76    market: MarketKind,
77    order_amount_usdt: f64,
78    balances: HashMap<String, f64>,
79    last_price: f64,
80    risk_module: RiskModule,
81    default_strategy_cooldown_ms: u64,
82    default_strategy_max_active_orders: u32,
83    strategy_limits_by_tag: HashMap<String, StrategyExecutionLimit>,
84    last_strategy_submit_ms: HashMap<String, u64>,
85}
86
87#[derive(Debug, Clone, Copy)]
88struct StrategyExecutionLimit {
89    cooldown_ms: u64,
90    max_active_orders: u32,
91}
92
93fn storage_symbol(symbol: &str, market: MarketKind) -> String {
94    match market {
95        MarketKind::Spot => symbol.to_string(),
96        MarketKind::Futures => format!("{}#FUT", symbol),
97    }
98}
99
100fn display_qty_for_history(status: &str, orig_qty: f64, executed_qty: f64) -> f64 {
101    match status {
102        "FILLED" | "PARTIALLY_FILLED" => executed_qty,
103        _ => orig_qty,
104    }
105}
106
107fn format_history_time(timestamp_ms: u64) -> String {
108    chrono::Utc
109        .timestamp_millis_opt(timestamp_ms as i64)
110        .single()
111        .map(|dt| {
112            dt.with_timezone(&chrono::Local)
113                .format("%H:%M:%S")
114                .to_string()
115        })
116        .unwrap_or_else(|| "--:--:--".to_string())
117}
118
119fn format_order_history_row(
120    timestamp_ms: u64,
121    status: &str,
122    side: &str,
123    qty: f64,
124    avg_price: f64,
125    client_order_id: &str,
126) -> String {
127    format!(
128        "{} {:<10} {:<4} {:.5} @ {:.2}  {}",
129        format_history_time(timestamp_ms),
130        status,
131        side,
132        qty,
133        avg_price,
134        client_order_id
135    )
136}
137
138fn source_label_from_client_order_id(client_order_id: &str) -> &'static str {
139    if client_order_id.contains("-mnl-") {
140        "MANUAL"
141    } else if client_order_id.contains("-cfg-") {
142        "MA(Config)"
143    } else if client_order_id.contains("-fst-") {
144        "MA(Fast 5/20)"
145    } else if client_order_id.contains("-slw-") {
146        "MA(Slow 20/60)"
147    } else {
148        "UNKNOWN"
149    }
150}
151
152fn format_trade_history_row(t: &BinanceMyTrade, source: &str) -> String {
153    let side = if t.is_buyer { "BUY" } else { "SELL" };
154    format_order_history_row(
155        t.time,
156        "FILLED",
157        side,
158        t.qty,
159        t.price,
160        &format!("order#{}#T{} [{}]", t.order_id, t.id, source),
161    )
162}
163
164fn split_symbol_assets(symbol: &str) -> (String, String) {
165    const QUOTE_SUFFIXES: [&str; 10] = [
166        "USDT", "USDC", "FDUSD", "BUSD", "TUSD", "TRY", "EUR", "BTC", "ETH", "BNB",
167    ];
168    for q in QUOTE_SUFFIXES {
169        if let Some(base) = symbol.strip_suffix(q) {
170            if !base.is_empty() {
171                return (base.to_string(), q.to_string());
172            }
173        }
174    }
175    (symbol.to_string(), String::new())
176}
177
178#[derive(Clone, Copy, Default)]
179struct LongPos {
180    qty: f64,
181    cost_quote: f64,
182}
183
184fn apply_spot_trade_with_fee(
185    pos: &mut LongPos,
186    stats: &mut OrderHistoryStats,
187    t: &BinanceMyTrade,
188    base_asset: &str,
189    quote_asset: &str,
190) {
191    let qty = t.qty.max(0.0);
192    if qty <= f64::EPSILON {
193        return;
194    }
195    let fee_asset = t.commission_asset.as_str();
196    let fee_is_base = !base_asset.is_empty() && fee_asset.eq_ignore_ascii_case(base_asset);
197    let fee_is_quote = !quote_asset.is_empty() && fee_asset.eq_ignore_ascii_case(quote_asset);
198
199    if t.is_buyer {
200        let net_qty = (qty
201            - if fee_is_base {
202                t.commission.max(0.0)
203            } else {
204                0.0
205            })
206        .max(0.0);
207        if net_qty <= f64::EPSILON {
208            return;
209        }
210        let fee_quote = if fee_is_quote {
211            t.commission.max(0.0)
212        } else {
213            0.0
214        };
215        pos.qty += net_qty;
216        pos.cost_quote += qty * t.price + fee_quote;
217        return;
218    }
219
220    // Spot sell: close against existing long inventory.
221    if pos.qty <= f64::EPSILON {
222        return;
223    }
224    let close_qty = qty.min(pos.qty);
225    if close_qty <= f64::EPSILON {
226        return;
227    }
228    let avg_cost = pos.cost_quote / pos.qty.max(f64::EPSILON);
229    let fee_quote_total = if fee_is_quote {
230        t.commission.max(0.0)
231    } else if fee_is_base {
232        // If fee is charged in base on sell, approximate its quote impact at fill price.
233        t.commission.max(0.0) * t.price
234    } else {
235        0.0
236    };
237    let fee_quote = fee_quote_total * (close_qty / qty.max(f64::EPSILON));
238    let pnl_delta = (close_qty * t.price - fee_quote) - (avg_cost * close_qty);
239    if pnl_delta > 0.0 {
240        stats.win_count += 1;
241        stats.trade_count += 1;
242    } else if pnl_delta < 0.0 {
243        stats.lose_count += 1;
244        stats.trade_count += 1;
245    }
246    stats.realized_pnl += pnl_delta;
247
248    pos.qty -= close_qty;
249    pos.cost_quote -= avg_cost * close_qty;
250    if pos.qty <= f64::EPSILON {
251        pos.qty = 0.0;
252        pos.cost_quote = 0.0;
253    }
254}
255
256fn compute_trade_state(
257    mut trades: Vec<BinanceMyTrade>,
258    symbol: &str,
259) -> (OrderHistoryStats, LongPos) {
260    trades.sort_by_key(|t| (t.time, t.id));
261    let (base_asset, quote_asset) = split_symbol_assets(symbol);
262    let mut pos = LongPos::default();
263    let mut stats = OrderHistoryStats::default();
264    for t in trades {
265        apply_spot_trade_with_fee(&mut pos, &mut stats, &t, &base_asset, &quote_asset);
266    }
267    (stats, pos)
268}
269
270fn compute_trade_stats_by_source(
271    mut trades: Vec<BinanceMyTrade>,
272    order_source_by_id: &HashMap<u64, String>,
273    symbol: &str,
274) -> HashMap<String, OrderHistoryStats> {
275    trades.sort_by_key(|t| (t.time, t.id));
276    let (base_asset, quote_asset) = split_symbol_assets(symbol);
277    let mut pos_by_source: HashMap<String, LongPos> = HashMap::new();
278    let mut stats_by_source: HashMap<String, OrderHistoryStats> = HashMap::new();
279
280    for t in trades {
281        let source = order_source_by_id
282            .get(&t.order_id)
283            .cloned()
284            .unwrap_or_else(|| "UNKNOWN".to_string());
285        let pos = pos_by_source.entry(source.clone()).or_default();
286        let stats = stats_by_source.entry(source).or_default();
287        apply_spot_trade_with_fee(pos, stats, &t, &base_asset, &quote_asset);
288    }
289
290    stats_by_source
291}
292
293impl OrderManager {
294    /// Create a new order manager bound to a single symbol/market context.
295    ///
296    /// The instance keeps in-memory position, cached balances, and an embedded
297    /// `RiskModule` that enforces pre-trade checks and global rate budget.
298    ///
299    /// # Caution
300    /// This manager is stateful (`last_price`, balances, active orders). Reuse
301    /// the same instance for a symbol stream instead of recreating per tick.
302    pub fn new(
303        rest_client: Arc<BinanceRestClient>,
304        symbol: &str,
305        market: MarketKind,
306        order_amount_usdt: f64,
307        risk_config: &RiskConfig,
308    ) -> Self {
309        let mut strategy_limits_by_tag = HashMap::new();
310        let default_strategy_cooldown_ms = risk_config.default_strategy_cooldown_ms;
311        let default_strategy_max_active_orders = risk_config.default_strategy_max_active_orders.max(1);
312        for profile in &risk_config.strategy_limits {
313            let source_tag = profile.source_tag.trim().to_ascii_lowercase();
314            if source_tag.is_empty() {
315                continue;
316            }
317            strategy_limits_by_tag.insert(
318                source_tag,
319                StrategyExecutionLimit {
320                    cooldown_ms: profile
321                        .cooldown_ms
322                        .unwrap_or(default_strategy_cooldown_ms),
323                    max_active_orders: profile
324                        .max_active_orders
325                        .unwrap_or(default_strategy_max_active_orders)
326                        .max(1),
327                },
328            );
329        }
330        Self {
331            rest_client: rest_client.clone(),
332            active_orders: HashMap::new(),
333            position: Position::new(symbol.to_string()),
334            symbol: symbol.to_string(),
335            market,
336            order_amount_usdt,
337            balances: HashMap::new(),
338            last_price: 0.0,
339            risk_module: RiskModule::new(
340                rest_client.clone(),
341                risk_config.global_rate_limit_per_minute,
342            ),
343            default_strategy_cooldown_ms,
344            default_strategy_max_active_orders,
345            strategy_limits_by_tag,
346            last_strategy_submit_ms: HashMap::new(),
347        }
348    }
349
350    /// Return current in-memory position snapshot.
351    ///
352    /// Values reflect fills processed by this process. They are not a full
353    /// exchange reconciliation snapshot.
354    pub fn position(&self) -> &Position {
355        &self.position
356    }
357
358    /// Return latest cached free balances.
359    ///
360    /// Cache is updated by `refresh_balances`. Missing assets should be treated
361    /// as zero balance.
362    pub fn balances(&self) -> &HashMap<String, f64> {
363        &self.balances
364    }
365
366    /// Update last price and recompute unrealized PnL.
367    ///
368    /// # Usage
369    /// Call on every market data tick before `submit_order`, so risk checks use
370    /// a valid `last_price`.
371    pub fn update_unrealized_pnl(&mut self, current_price: f64) {
372        self.last_price = current_price;
373        self.position.update_unrealized_pnl(current_price);
374    }
375
376    /// Return current global rate-budget snapshot from the risk module.
377    ///
378    /// Intended for UI display and observability.
379    pub fn rate_budget_snapshot(&self) -> RateBudgetSnapshot {
380        self.risk_module.rate_budget_snapshot()
381    }
382
383    fn strategy_limits_for(&self, source_tag: &str) -> StrategyExecutionLimit {
384        self.strategy_limits_by_tag
385            .get(source_tag)
386            .copied()
387            .unwrap_or(StrategyExecutionLimit {
388                cooldown_ms: self.default_strategy_cooldown_ms,
389                max_active_orders: self.default_strategy_max_active_orders,
390            })
391    }
392
393    fn active_order_count_for_source(&self, source_tag: &str) -> u32 {
394        let prefix = format!("sq-{}-", source_tag);
395        self.active_orders
396            .values()
397            .filter(|o| !o.status.is_terminal() && o.client_order_id.starts_with(&prefix))
398            .count() as u32
399    }
400
401    fn evaluate_strategy_limits(
402        &self,
403        source_tag: &str,
404        created_at_ms: u64,
405    ) -> Option<(String, String)> {
406        let limits = self.strategy_limits_for(source_tag);
407        let active_count = self.active_order_count_for_source(source_tag);
408        if active_count >= limits.max_active_orders {
409            return Some((
410                RejectionReasonCode::RiskStrategyMaxActiveOrdersExceeded
411                    .as_str()
412                    .to_string(),
413                format!(
414                    "Strategy '{}' active order limit exceeded (active {}, limit {})",
415                    source_tag, active_count, limits.max_active_orders
416                ),
417            ));
418        }
419
420        if limits.cooldown_ms > 0 {
421            if let Some(last_submit_ms) = self.last_strategy_submit_ms.get(source_tag) {
422                let elapsed = created_at_ms.saturating_sub(*last_submit_ms);
423                if elapsed < limits.cooldown_ms {
424                    let remaining = limits.cooldown_ms - elapsed;
425                    return Some((
426                        RejectionReasonCode::RiskStrategyCooldownActive
427                            .as_str()
428                            .to_string(),
429                        format!(
430                            "Strategy '{}' cooldown active ({}ms remaining)",
431                            source_tag, remaining
432                        ),
433                    ));
434                }
435            }
436        }
437
438        None
439    }
440
441    fn mark_strategy_submit(&mut self, source_tag: &str, created_at_ms: u64) {
442        self.last_strategy_submit_ms
443            .insert(source_tag.to_string(), created_at_ms);
444    }
445
446    /// Fetch account balances from Binance and update internal state.
447    ///
448    /// Returns the map `asset -> free` for assets with non-zero total (spot) or
449    /// non-trivial wallet balance (futures).
450    ///
451    /// # Usage
452    /// Refresh before order submission cycles to reduce false "insufficient
453    /// balance" rejections from stale cache.
454    ///
455    /// # Caution
456    /// Network/API failures return `Err(_)` and leave previous cache untouched.
457    pub async fn refresh_balances(&mut self) -> Result<HashMap<String, f64>> {
458        if self.market == MarketKind::Futures {
459            let account = self.rest_client.get_futures_account().await?;
460            self.balances.clear();
461            for a in &account.assets {
462                if a.wallet_balance.abs() > f64::EPSILON {
463                    self.balances.insert(a.asset.clone(), a.available_balance);
464                }
465            }
466            return Ok(self.balances.clone());
467        }
468        let account = self.rest_client.get_account().await?;
469        self.balances.clear();
470        for b in &account.balances {
471            let total = b.free + b.locked;
472            if total > 0.0 {
473                self.balances.insert(b.asset.clone(), b.free);
474            }
475        }
476        tracing::info!(balances = ?self.balances, "Balances refreshed");
477        Ok(self.balances.clone())
478    }
479
480    /// Fetch order history from exchange and format rows for UI display.
481    ///
482    /// This method combines order and trade endpoints, persists snapshots to
483    /// local sqlite, and emits a best-effort history view even if one endpoint
484    /// fails.
485    ///
486    /// # Caution
487    /// `trade_data_complete = false` means derived PnL may be partial.
488    pub async fn refresh_order_history(&self, limit: usize) -> Result<OrderHistorySnapshot> {
489        if self.market == MarketKind::Futures {
490            let fetch_started = Instant::now();
491            let fetched_at_ms = chrono::Utc::now().timestamp_millis() as u64;
492            let orders_result = self
493                .rest_client
494                .get_futures_all_orders(&self.symbol, limit)
495                .await;
496            let trades_result = self
497                .rest_client
498                .get_futures_my_trades_history(&self.symbol, limit.max(1))
499                .await;
500            let fetch_latency_ms = fetch_started.elapsed().as_millis() as u64;
501
502            if orders_result.is_err() && trades_result.is_err() {
503                let oe = orders_result.err().unwrap();
504                let te = trades_result.err().unwrap();
505                return Err(anyhow::anyhow!(
506                    "futures order history fetch failed: allOrders={} | userTrades={}",
507                    oe,
508                    te
509                ));
510            }
511
512            let mut orders = orders_result.unwrap_or_default();
513            let trades = trades_result.unwrap_or_default();
514            orders.sort_by_key(|o| o.update_time.max(o.time));
515
516            let storage_key = storage_symbol(&self.symbol, self.market);
517            if let Err(e) = order_store::persist_order_snapshot(&storage_key, &orders, &trades) {
518                tracing::warn!(error = %e, "Failed to persist futures order snapshot to sqlite");
519            }
520
521            let mut history = Vec::new();
522            let mut fills = Vec::new();
523            for t in &trades {
524                let side = if t.is_buyer { "BUY" } else { "SELL" };
525                fills.push(OrderHistoryFill {
526                    timestamp_ms: t.time,
527                    side: if t.is_buyer {
528                        OrderSide::Buy
529                    } else {
530                        OrderSide::Sell
531                    },
532                    price: t.price,
533                });
534                history.push(format_order_history_row(
535                    t.time,
536                    "FILLED",
537                    side,
538                    t.qty,
539                    t.price,
540                    &format!("order#{}#T{} [FUT]", t.order_id, t.id),
541                ));
542            }
543            for o in &orders {
544                if o.executed_qty <= 0.0 {
545                    history.push(format_order_history_row(
546                        o.update_time.max(o.time),
547                        &o.status,
548                        &o.side,
549                        display_qty_for_history(&o.status, o.orig_qty, o.executed_qty),
550                        if o.executed_qty > 0.0 {
551                            o.cummulative_quote_qty / o.executed_qty
552                        } else {
553                            o.price
554                        },
555                        &o.client_order_id,
556                    ));
557                }
558            }
559
560            let mut stats = OrderHistoryStats::default();
561            for t in &trades {
562                if t.realized_pnl > 0.0 {
563                    stats.win_count += 1;
564                    stats.trade_count += 1;
565                } else if t.realized_pnl < 0.0 {
566                    stats.lose_count += 1;
567                    stats.trade_count += 1;
568                }
569                stats.realized_pnl += t.realized_pnl;
570            }
571            let estimated_total_pnl_usdt = Some(stats.realized_pnl);
572            let latest_order_event = orders.iter().map(|o| o.update_time.max(o.time)).max();
573            let latest_trade_event = trades.iter().map(|t| t.time).max();
574            return Ok(OrderHistorySnapshot {
575                rows: history,
576                stats,
577                strategy_stats: HashMap::new(),
578                fills,
579                open_qty: 0.0,
580                open_entry_price: 0.0,
581                estimated_total_pnl_usdt,
582                trade_data_complete: true,
583                fetched_at_ms,
584                fetch_latency_ms,
585                latest_event_ms: latest_order_event.max(latest_trade_event),
586            });
587        }
588
589        let fetch_started = Instant::now();
590        let fetched_at_ms = chrono::Utc::now().timestamp_millis() as u64;
591        let orders_result = self.rest_client.get_all_orders(&self.symbol, limit).await;
592        let storage_key = storage_symbol(&self.symbol, self.market);
593        let last_trade_id = order_store::load_last_trade_id(&storage_key).ok().flatten();
594        let persisted_trade_count = order_store::load_trade_count(&storage_key).unwrap_or(0);
595        let need_backfill = persisted_trade_count < limit;
596        let trades_result = match (need_backfill, last_trade_id) {
597            (true, _) => {
598                self.rest_client
599                    .get_my_trades_history(&self.symbol, limit.max(1))
600                    .await
601            }
602            (false, Some(last_id)) => {
603                self.rest_client
604                    .get_my_trades_since(&self.symbol, last_id.saturating_add(1), 10)
605                    .await
606            }
607            (false, None) => {
608                self.rest_client
609                    .get_my_trades_history(&self.symbol, limit.max(1))
610                    .await
611            }
612        };
613        let fetch_latency_ms = fetch_started.elapsed().as_millis() as u64;
614        let trade_data_complete = trades_result.is_ok();
615
616        if orders_result.is_err() && trades_result.is_err() {
617            let oe = orders_result.err().unwrap();
618            let te = trades_result.err().unwrap();
619            return Err(anyhow::anyhow!(
620                "order history fetch failed: allOrders={} | myTrades={}",
621                oe,
622                te
623            ));
624        }
625
626        let mut orders = match orders_result {
627            Ok(v) => v,
628            Err(e) => {
629                tracing::warn!(error = %e, "Failed to fetch allOrders; falling back to trade-only history");
630                Vec::new()
631            }
632        };
633        let recent_trades = match trades_result {
634            Ok(t) => t,
635            Err(e) => {
636                tracing::warn!(error = %e, "Failed to fetch myTrades; falling back to order-only history");
637                Vec::new()
638            }
639        };
640        let mut trades = recent_trades.clone();
641        orders.sort_by_key(|o| o.update_time.max(o.time));
642
643        if let Err(e) = order_store::persist_order_snapshot(&storage_key, &orders, &recent_trades) {
644            tracing::warn!(error = %e, "Failed to persist order snapshot to sqlite");
645        }
646        let mut persisted_source_by_order_id: HashMap<u64, String> = HashMap::new();
647        match order_store::load_persisted_trades(&storage_key) {
648            Ok(saved) => {
649                if !saved.is_empty() {
650                    trades = saved.iter().map(|r| r.trade.clone()).collect();
651                    for row in saved {
652                        persisted_source_by_order_id
653                            .entry(row.trade.order_id)
654                            .or_insert(row.source);
655                    }
656                }
657            }
658            Err(e) => {
659                tracing::warn!(error = %e, "Failed to load persisted trades; using recent API trades");
660            }
661        }
662
663        let (stats, open_pos) = compute_trade_state(trades.clone(), &self.symbol);
664        let estimated_total_pnl_usdt = if self.last_price > 0.0 {
665            Some(stats.realized_pnl + (open_pos.qty * self.last_price - open_pos.cost_quote))
666        } else {
667            Some(stats.realized_pnl)
668        };
669        let latest_order_event = orders.iter().map(|o| o.update_time.max(o.time)).max();
670        let latest_trade_event = trades.iter().map(|t| t.time).max();
671        let latest_event_ms = latest_order_event.max(latest_trade_event);
672
673        let mut trades_by_order_id: HashMap<u64, Vec<BinanceMyTrade>> = HashMap::new();
674        for trade in &trades {
675            trades_by_order_id
676                .entry(trade.order_id)
677                .or_default()
678                .push(trade.clone());
679        }
680        for bucket in trades_by_order_id.values_mut() {
681            bucket.sort_by_key(|t| t.time);
682        }
683
684        let mut order_source_by_id = HashMap::new();
685        for o in &orders {
686            order_source_by_id.insert(
687                o.order_id,
688                source_label_from_client_order_id(&o.client_order_id).to_string(),
689            );
690        }
691        for (order_id, source) in persisted_source_by_order_id {
692            order_source_by_id.entry(order_id).or_insert(source);
693        }
694        let strategy_stats =
695            compute_trade_stats_by_source(trades.clone(), &order_source_by_id, &self.symbol);
696
697        let mut history = Vec::new();
698        let mut fills = Vec::new();
699        let mut used_trade_ids = std::collections::HashSet::new();
700
701        if orders.is_empty() && !trades.is_empty() {
702            let mut sorted = trades;
703            sorted.sort_by_key(|t| (t.time, t.id));
704            history.extend(sorted.iter().map(|t| {
705                fills.push(OrderHistoryFill {
706                    timestamp_ms: t.time,
707                    side: if t.is_buyer {
708                        OrderSide::Buy
709                    } else {
710                        OrderSide::Sell
711                    },
712                    price: t.price,
713                });
714                format_trade_history_row(
715                    t,
716                    order_source_by_id
717                        .get(&t.order_id)
718                        .map(String::as_str)
719                        .unwrap_or("UNKNOWN"),
720                )
721            }));
722            return Ok(OrderHistorySnapshot {
723                rows: history,
724                stats,
725                strategy_stats,
726                fills,
727                open_qty: open_pos.qty,
728                open_entry_price: if open_pos.qty > f64::EPSILON {
729                    open_pos.cost_quote / open_pos.qty
730                } else {
731                    0.0
732                },
733                estimated_total_pnl_usdt,
734                trade_data_complete,
735                fetched_at_ms,
736                fetch_latency_ms,
737                latest_event_ms,
738            });
739        }
740
741        for o in orders {
742            if o.executed_qty > 0.0 {
743                if let Some(order_trades) = trades_by_order_id.get(&o.order_id) {
744                    for t in order_trades {
745                        used_trade_ids.insert(t.id);
746                        let side = if t.is_buyer { "BUY" } else { "SELL" };
747                        fills.push(OrderHistoryFill {
748                            timestamp_ms: t.time,
749                            side: if t.is_buyer {
750                                OrderSide::Buy
751                            } else {
752                                OrderSide::Sell
753                            },
754                            price: t.price,
755                        });
756                        history.push(format_order_history_row(
757                            t.time,
758                            "FILLED",
759                            side,
760                            t.qty,
761                            t.price,
762                            &format!(
763                                "{}#T{} [{}]",
764                                o.client_order_id,
765                                t.id,
766                                source_label_from_client_order_id(&o.client_order_id)
767                            ),
768                        ));
769                    }
770                    continue;
771                }
772            }
773
774            let avg_price = if o.executed_qty > 0.0 {
775                o.cummulative_quote_qty / o.executed_qty
776            } else {
777                o.price
778            };
779            history.push(format_order_history_row(
780                o.update_time.max(o.time),
781                &o.status,
782                &o.side,
783                display_qty_for_history(&o.status, o.orig_qty, o.executed_qty),
784                avg_price,
785                &o.client_order_id,
786            ));
787        }
788
789        // Include trades that did not match fetched order pages.
790        for bucket in trades_by_order_id.values() {
791            for t in bucket {
792                if !used_trade_ids.contains(&t.id) {
793                    fills.push(OrderHistoryFill {
794                        timestamp_ms: t.time,
795                        side: if t.is_buyer {
796                            OrderSide::Buy
797                        } else {
798                            OrderSide::Sell
799                        },
800                        price: t.price,
801                    });
802                    history.push(format_trade_history_row(
803                        t,
804                        order_source_by_id
805                            .get(&t.order_id)
806                            .map(String::as_str)
807                            .unwrap_or("UNKNOWN"),
808                    ));
809                }
810            }
811        }
812        Ok(OrderHistorySnapshot {
813            rows: history,
814            stats,
815            strategy_stats,
816            fills,
817            open_qty: open_pos.qty,
818            open_entry_price: if open_pos.qty > f64::EPSILON {
819                open_pos.cost_quote / open_pos.qty
820            } else {
821                0.0
822            },
823            estimated_total_pnl_usdt,
824            trade_data_complete,
825            fetched_at_ms,
826            fetch_latency_ms,
827            latest_event_ms,
828        })
829    }
830
831    /// Build an order intent, run risk checks, and submit to broker when approved.
832    ///
833    /// # Behavior
834    /// - `Signal::Hold` returns `Ok(None)`.
835    /// - For buy/sell signals, this method:
836    ///   1. Builds `OrderIntent`.
837    ///   2. Calls `RiskModule::evaluate_intent`.
838    ///   3. Reserves one global rate token via `reserve_rate_budget`.
839    ///   4. Submits market order to spot/futures broker endpoint.
840    /// - Rejections are returned as `Ok(Some(OrderUpdate::Rejected { .. }))`
841    ///   with structured `reason_code`.
842    ///
843    /// # Usage
844    /// Recommended sequence:
845    /// 1. `update_unrealized_pnl(last_price)`
846    /// 2. `refresh_balances()` (periodic or before trading loop)
847    /// 3. `submit_order(signal, source_tag)`
848    ///
849    /// # Caution
850    /// - Spot sell requires base-asset balance (e.g. `ETH` for `ETHUSDT`).
851    /// - If balances are stale, you may see "No position to sell" or
852    ///   `"Insufficient <asset>"` even though exchange state changed recently.
853    /// - This method returns transport/runtime errors as `Err(_)`; business
854    ///   rejections are encoded in `OrderUpdate::Rejected`.
855    pub async fn submit_order(
856        &mut self,
857        signal: Signal,
858        source_tag: &str,
859    ) -> Result<Option<OrderUpdate>> {
860        let side = match &signal {
861            Signal::Buy => OrderSide::Buy,
862            Signal::Sell => OrderSide::Sell,
863            Signal::Hold => return Ok(None),
864        };
865        let source_tag = source_tag.to_ascii_lowercase();
866        let intent = OrderIntent {
867            intent_id: format!("intent-{}", &uuid::Uuid::new_v4().to_string()[..8]),
868            source_tag: source_tag.clone(),
869            symbol: self.symbol.clone(),
870            market: self.market,
871            side,
872            order_amount_usdt: self.order_amount_usdt,
873            last_price: self.last_price,
874            created_at_ms: chrono::Utc::now().timestamp_millis() as u64,
875        };
876        if let Some((reason_code, reason)) =
877            self.evaluate_strategy_limits(&intent.source_tag, intent.created_at_ms)
878        {
879            return Ok(Some(OrderUpdate::Rejected {
880                intent_id: intent.intent_id.clone(),
881                client_order_id: "n/a".to_string(),
882                reason_code,
883                reason,
884            }));
885        }
886        let decision = self
887            .risk_module
888            .evaluate_intent(&intent, &self.balances)
889            .await?;
890        if !decision.approved {
891            return Ok(Some(OrderUpdate::Rejected {
892                intent_id: intent.intent_id.clone(),
893                client_order_id: "n/a".to_string(),
894                reason_code: decision
895                    .reason_code
896                    .unwrap_or_else(|| RejectionReasonCode::RiskUnknown.as_str().to_string()),
897                reason: decision
898                    .reason
899                    .unwrap_or_else(|| "Rejected by RiskModule".to_string()),
900            }));
901        }
902        if !self.risk_module.reserve_rate_budget() {
903            return Ok(Some(OrderUpdate::Rejected {
904                intent_id: intent.intent_id.clone(),
905                client_order_id: "n/a".to_string(),
906                reason_code: RejectionReasonCode::RateGlobalBudgetExceeded
907                    .as_str()
908                    .to_string(),
909                reason: "Global rate budget exceeded; try again after reset".to_string(),
910            }));
911        }
912        let qty = decision.normalized_qty;
913        self.mark_strategy_submit(&intent.source_tag, intent.created_at_ms);
914
915        let client_order_id = format!(
916            "sq-{}-{}",
917            intent.source_tag,
918            &uuid::Uuid::new_v4().to_string()[..8]
919        );
920
921        let order = Order {
922            client_order_id: client_order_id.clone(),
923            server_order_id: None,
924            symbol: self.symbol.clone(),
925            side,
926            order_type: OrderType::Market,
927            quantity: qty,
928            price: None,
929            status: OrderStatus::PendingSubmit,
930            created_at: chrono::Utc::now(),
931            updated_at: chrono::Utc::now(),
932            fills: vec![],
933        };
934
935        self.active_orders.insert(client_order_id.clone(), order);
936
937        tracing::info!(
938            side = %side,
939            qty,
940            usdt_amount = intent.order_amount_usdt,
941            price = intent.last_price,
942            intent_id = %intent.intent_id,
943            created_at_ms = intent.created_at_ms,
944            "Submitting order"
945        );
946
947        let submit_res = if self.market == MarketKind::Futures {
948            self.rest_client
949                .place_futures_market_order(&self.symbol, side, qty, &client_order_id)
950                .await
951        } else {
952            self.rest_client
953                .place_market_order(&self.symbol, side, qty, &client_order_id)
954                .await
955        };
956
957        match submit_res {
958            Ok(response) => {
959                let update = self.process_order_response(
960                    &intent.intent_id,
961                    &client_order_id,
962                    side,
963                    &response,
964                );
965
966                // Refresh balances after fill
967                if matches!(update, OrderUpdate::Filled { .. }) {
968                    if let Err(e) = self.refresh_balances().await {
969                        tracing::warn!(error = %e, "Failed to refresh balances after fill");
970                    }
971                }
972
973                Ok(Some(update))
974            }
975            Err(e) => {
976                tracing::error!(
977                    client_order_id,
978                    error = %e,
979                    "Order rejected"
980                );
981                if let Some(order) = self.active_orders.get_mut(&client_order_id) {
982                    order.status = OrderStatus::Rejected;
983                    order.updated_at = chrono::Utc::now();
984                }
985                Ok(Some(OrderUpdate::Rejected {
986                    intent_id: intent.intent_id.clone(),
987                    client_order_id,
988                    reason_code: RejectionReasonCode::BrokerSubmitFailed.as_str().to_string(),
989                    reason: e.to_string(),
990                }))
991            }
992        }
993    }
994
995    fn process_order_response(
996        &mut self,
997        intent_id: &str,
998        client_order_id: &str,
999        side: OrderSide,
1000        response: &BinanceOrderResponse,
1001    ) -> OrderUpdate {
1002        let fills: Vec<Fill> = response
1003            .fills
1004            .iter()
1005            .map(|f| Fill {
1006                price: f.price,
1007                qty: f.qty,
1008                commission: f.commission,
1009                commission_asset: f.commission_asset.clone(),
1010            })
1011            .collect();
1012
1013        let status = OrderStatus::from_binance_str(&response.status);
1014
1015        if let Some(order) = self.active_orders.get_mut(client_order_id) {
1016            order.server_order_id = Some(response.order_id);
1017            order.status = status;
1018            order.fills = fills.clone();
1019            order.updated_at = chrono::Utc::now();
1020        }
1021
1022        if status == OrderStatus::Filled || status == OrderStatus::PartiallyFilled {
1023            self.position.apply_fill(side, &fills);
1024
1025            let avg_price = if fills.is_empty() {
1026                0.0
1027            } else {
1028                let total_value: f64 = fills.iter().map(|f| f.price * f.qty).sum();
1029                let total_qty: f64 = fills.iter().map(|f| f.qty).sum();
1030                total_value / total_qty
1031            };
1032
1033            tracing::info!(
1034                client_order_id,
1035                order_id = response.order_id,
1036                side = %side,
1037                avg_price,
1038                filled_qty = response.executed_qty,
1039                "Order filled"
1040            );
1041
1042            OrderUpdate::Filled {
1043                intent_id: intent_id.to_string(),
1044                client_order_id: client_order_id.to_string(),
1045                side,
1046                fills,
1047                avg_price,
1048            }
1049        } else {
1050            OrderUpdate::Submitted {
1051                intent_id: intent_id.to_string(),
1052                client_order_id: client_order_id.to_string(),
1053                server_order_id: response.order_id,
1054            }
1055        }
1056    }
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061    use super::{display_qty_for_history, split_symbol_assets, OrderManager};
1062    use crate::binance::rest::BinanceRestClient;
1063    use crate::config::RiskConfig;
1064    use crate::model::order::{Order, OrderSide, OrderStatus, OrderType};
1065    use std::sync::Arc;
1066
1067    fn build_test_order_manager() -> OrderManager {
1068        let rest = Arc::new(BinanceRestClient::new(
1069            "https://demo-api.binance.com",
1070            "https://demo-fapi.binance.com",
1071            "k",
1072            "s",
1073            "fk",
1074            "fs",
1075            5000,
1076        ));
1077        let risk = RiskConfig {
1078            global_rate_limit_per_minute: 600,
1079            default_strategy_cooldown_ms: 3_000,
1080            default_strategy_max_active_orders: 1,
1081            strategy_limits: vec![],
1082        };
1083        OrderManager::new(
1084            rest,
1085            "BTCUSDT",
1086            crate::order_manager::MarketKind::Spot,
1087            10.0,
1088            &risk,
1089        )
1090    }
1091
1092    #[test]
1093    fn valid_state_transitions() {
1094        // PendingSubmit -> Submitted
1095        let from = OrderStatus::PendingSubmit;
1096        let to = OrderStatus::Submitted;
1097        assert!(!from.is_terminal());
1098        assert!(!to.is_terminal());
1099
1100        // Submitted -> Filled
1101        let to = OrderStatus::Filled;
1102        assert!(to.is_terminal());
1103
1104        // Submitted -> Rejected
1105        let to = OrderStatus::Rejected;
1106        assert!(to.is_terminal());
1107
1108        // Submitted -> Cancelled
1109        let to = OrderStatus::Cancelled;
1110        assert!(to.is_terminal());
1111    }
1112
1113    #[test]
1114    fn from_binance_str_mapping() {
1115        assert_eq!(OrderStatus::from_binance_str("NEW"), OrderStatus::Submitted);
1116        assert_eq!(OrderStatus::from_binance_str("FILLED"), OrderStatus::Filled);
1117        assert_eq!(
1118            OrderStatus::from_binance_str("CANCELED"),
1119            OrderStatus::Cancelled
1120        );
1121        assert_eq!(
1122            OrderStatus::from_binance_str("REJECTED"),
1123            OrderStatus::Rejected
1124        );
1125        assert_eq!(
1126            OrderStatus::from_binance_str("EXPIRED"),
1127            OrderStatus::Expired
1128        );
1129        assert_eq!(
1130            OrderStatus::from_binance_str("PARTIALLY_FILLED"),
1131            OrderStatus::PartiallyFilled
1132        );
1133    }
1134
1135    #[test]
1136    fn order_history_uses_executed_qty_for_filled_states() {
1137        assert!((display_qty_for_history("FILLED", 1.0, 0.4) - 0.4).abs() < f64::EPSILON);
1138        assert!((display_qty_for_history("PARTIALLY_FILLED", 1.0, 0.4) - 0.4).abs() < f64::EPSILON);
1139    }
1140
1141    #[test]
1142    fn order_history_uses_orig_qty_for_non_filled_states() {
1143        assert!((display_qty_for_history("NEW", 1.0, 0.4) - 1.0).abs() < f64::EPSILON);
1144        assert!((display_qty_for_history("CANCELED", 1.0, 0.4) - 1.0).abs() < f64::EPSILON);
1145        assert!((display_qty_for_history("REJECTED", 1.0, 0.0) - 1.0).abs() < f64::EPSILON);
1146    }
1147
1148    #[test]
1149    fn split_symbol_assets_parses_known_quote_suffixes() {
1150        assert_eq!(
1151            split_symbol_assets("ETHUSDT"),
1152            ("ETH".to_string(), "USDT".to_string())
1153        );
1154        assert_eq!(
1155            split_symbol_assets("ETHBTC"),
1156            ("ETH".to_string(), "BTC".to_string())
1157        );
1158    }
1159
1160    #[test]
1161    fn split_symbol_assets_falls_back_when_quote_unknown() {
1162        assert_eq!(
1163            split_symbol_assets("FOOBAR"),
1164            ("FOOBAR".to_string(), String::new())
1165        );
1166    }
1167
1168    #[test]
1169    fn strategy_limit_rejects_when_active_orders_reach_limit() {
1170        let mut mgr = build_test_order_manager();
1171        let client_order_id = "sq-cfg-abcdef12".to_string();
1172        mgr.active_orders.insert(
1173            client_order_id.clone(),
1174            Order {
1175                client_order_id,
1176                server_order_id: None,
1177                symbol: "BTCUSDT".to_string(),
1178                side: OrderSide::Buy,
1179                order_type: OrderType::Market,
1180                quantity: 0.1,
1181                price: None,
1182                status: OrderStatus::Submitted,
1183                created_at: chrono::Utc::now(),
1184                updated_at: chrono::Utc::now(),
1185                fills: vec![],
1186            },
1187        );
1188
1189        let rejected = mgr
1190            .evaluate_strategy_limits("cfg", chrono::Utc::now().timestamp_millis() as u64)
1191            .expect("must be rejected");
1192        assert_eq!(
1193            rejected.0,
1194            "risk.strategy_max_active_orders_exceeded".to_string()
1195        );
1196    }
1197
1198    #[test]
1199    fn strategy_limit_rejects_during_cooldown_window() {
1200        let mut mgr = build_test_order_manager();
1201        let now = chrono::Utc::now().timestamp_millis() as u64;
1202        mgr.mark_strategy_submit("cfg", now);
1203
1204        let rejected = mgr
1205            .evaluate_strategy_limits("cfg", now + 500)
1206            .expect("must be rejected");
1207        assert_eq!(rejected.0, "risk.strategy_cooldown_active".to_string());
1208    }
1209}