1use std::collections::HashMap;
2
3use crate::event::{AppEvent, LogDomain, LogLevel, LogRecord};
4use crate::lifecycle::PositionLifecycleEngine;
5use crate::order_manager::{OrderHistoryStats, OrderManager, OrderUpdate};
6use std::collections::HashSet;
7use tokio::sync::mpsc;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub struct CloseAllUpdate {
11 pub total: usize,
12 pub completed: usize,
13 pub failed: usize,
14 pub finished: bool,
15}
16
17#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
18pub struct InternalExitProcessResult {
19 pub emit_asset_snapshot: bool,
20 pub emit_rate_snapshot: bool,
21}
22
23#[derive(Debug, Clone, Default, PartialEq, Eq)]
24pub struct CloseAttemptOutcome {
25 pub close_failed_reason: Option<String>,
26 pub close_reject_code: Option<String>,
27}
28
29pub fn classify_close_update(update: &OrderUpdate) -> CloseAttemptOutcome {
30 match update {
31 OrderUpdate::Rejected {
32 reason_code,
33 reason,
34 ..
35 } => CloseAttemptOutcome {
36 close_failed_reason: Some(reason.clone()),
37 close_reject_code: Some(reason_code.clone()),
38 },
39 _ => CloseAttemptOutcome::default(),
40 }
41}
42
43pub fn advance_close_all_job(
44 close_all_jobs: &mut HashMap<u64, (usize, usize, usize)>,
45 job_id: u64,
46 close_failed_reason: Option<&str>,
47 close_reject_code: Option<&str>,
48 is_soft_skip_reason: fn(&str) -> bool,
49) -> Option<CloseAllUpdate> {
50 let (total, completed, failed) = if let Some(state) = close_all_jobs.get_mut(&job_id) {
51 state.1 = state.1.saturating_add(1);
52 let is_soft_skip = close_reject_code.map(is_soft_skip_reason).unwrap_or(false);
53 if close_failed_reason.is_some() && !is_soft_skip {
54 state.2 = state.2.saturating_add(1);
55 }
56 (state.0, state.1, state.2)
57 } else {
58 (0, 0, 0)
59 };
60
61 if total == 0 && completed == 0 {
62 return None;
63 }
64 let finished = completed >= total;
65 if finished {
66 close_all_jobs.remove(&job_id);
67 }
68 Some(CloseAllUpdate {
69 total,
70 completed,
71 failed,
72 finished,
73 })
74}
75
76#[allow(clippy::too_many_arguments)]
77pub async fn process_internal_exit_for_instrument(
78 app_tx: &mpsc::Sender<AppEvent>,
79 mgr: &mut OrderManager,
80 instrument: &str,
81 source_tag_lc: &str,
82 reason_code: &str,
83 selected_symbol: &str,
84 order_history_limit: usize,
85 close_all_job_id: Option<u64>,
86 close_all_jobs: &mut HashMap<u64, (usize, usize, usize)>,
87 strategy_stats_by_instrument: &mut HashMap<String, HashMap<String, OrderHistoryStats>>,
88 realized_pnl_by_symbol: &mut HashMap<String, f64>,
89 lifecycle_triggered_once: &mut HashSet<String>,
90 lifecycle_engine: &mut PositionLifecycleEngine,
91 close_all_soft_skip_reason: fn(&str) -> bool,
92 build_scoped_stats: fn(
93 &HashMap<String, HashMap<String, OrderHistoryStats>>,
94 ) -> HashMap<String, OrderHistoryStats>,
95) -> InternalExitProcessResult {
96 let mut close_failed_reason: Option<String> = None;
97 let mut close_reject_code: Option<String> = None;
98 let mut result = InternalExitProcessResult::default();
99
100 match mgr
101 .emergency_close_position(source_tag_lc, reason_code)
102 .await
103 {
104 Ok(Some(ref update)) => {
105 let outcome = classify_close_update(update);
106 close_reject_code = outcome.close_reject_code;
107 close_failed_reason = outcome.close_failed_reason;
108 if instrument == selected_symbol {
109 let _ = app_tx.send(AppEvent::OrderUpdate(update.clone())).await;
110 }
111 match mgr.refresh_order_history(order_history_limit).await {
112 Ok(history) => {
113 strategy_stats_by_instrument
114 .insert(instrument.to_string(), history.strategy_stats.clone());
115 realized_pnl_by_symbol
116 .insert(instrument.to_string(), history.stats.realized_pnl);
117 if instrument == selected_symbol {
118 let _ = app_tx.send(AppEvent::OrderHistoryUpdate(history)).await;
119 }
120 let _ = app_tx
121 .send(AppEvent::StrategyStatsUpdate {
122 strategy_stats: build_scoped_stats(strategy_stats_by_instrument),
123 })
124 .await;
125 }
126 Err(e) => {
127 let _ = app_tx
128 .send(log_event(
129 LogLevel::Warn,
130 LogDomain::Order,
131 "history.refresh.fail",
132 format!("Order history refresh failed: {}", e),
133 ))
134 .await;
135 }
136 }
137 if let OrderUpdate::Filled { .. } = update {
138 lifecycle_triggered_once.remove(instrument);
139 if let Some(state) = lifecycle_engine.on_position_closed(instrument) {
140 let _ = app_tx
141 .send(log_event(
142 LogLevel::Info,
143 LogDomain::Risk,
144 "lifecycle.close.internal",
145 format!(
146 "Lifecycle internal close: {} pos={} reason={} mfe={:+.4} mae={:+.4}",
147 instrument, state.position_id, reason_code, state.mfe_usdt, state.mae_usdt
148 ),
149 ))
150 .await;
151 }
152 if let Ok(balances) = mgr.refresh_balances().await {
153 if instrument == selected_symbol {
154 let _ = app_tx.send(AppEvent::BalanceUpdate(balances)).await;
155 }
156 }
157 }
158 result.emit_asset_snapshot = true;
159 result.emit_rate_snapshot = true;
160 }
161 Ok(None) => {}
162 Err(e) => {
163 close_failed_reason = Some(e.to_string());
164 let _ = app_tx.send(AppEvent::Error(e.to_string())).await;
165 }
166 }
167
168 if let Some(job_id) = close_all_job_id {
169 if let Some(update) = advance_close_all_job(
170 close_all_jobs,
171 job_id,
172 close_failed_reason.as_deref(),
173 close_reject_code.as_deref(),
174 close_all_soft_skip_reason,
175 ) {
176 let _ = app_tx
177 .send(AppEvent::CloseAllProgress {
178 job_id,
179 symbol: instrument.to_string(),
180 completed: update.completed,
181 total: update.total,
182 failed: update.failed,
183 reason: close_failed_reason.clone(),
184 })
185 .await;
186 if update.finished {
187 let _ = app_tx
188 .send(AppEvent::CloseAllFinished {
189 job_id,
190 completed: update.completed,
191 total: update.total,
192 failed: update.failed,
193 })
194 .await;
195 }
196 }
197 }
198
199 result
200}
201
202fn log_event(level: LogLevel, domain: LogDomain, event: &'static str, msg: String) -> AppEvent {
203 AppEvent::LogRecord(LogRecord::new(level, domain, event, msg))
204}