Skip to main content

sandbox_quant/runtime/
internal_exit_flow.rs

1use std::collections::HashMap;
2
3use crate::event::{AppEvent, LogDomain, LogLevel, LogRecord};
4use crate::lifecycle::PositionLifecycleEngine;
5use crate::order_manager::{OrderHistoryStats, OrderManager, OrderUpdate};
6use std::collections::HashSet;
7use tokio::sync::mpsc;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub struct CloseAllUpdate {
11    pub total: usize,
12    pub completed: usize,
13    pub failed: usize,
14    pub finished: bool,
15}
16
17#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
18pub struct InternalExitProcessResult {
19    pub emit_asset_snapshot: bool,
20    pub emit_rate_snapshot: bool,
21}
22
23#[derive(Debug, Clone, Default, PartialEq, Eq)]
24pub struct CloseAttemptOutcome {
25    pub close_failed_reason: Option<String>,
26    pub close_reject_code: Option<String>,
27}
28
29pub fn classify_close_update(update: &OrderUpdate) -> CloseAttemptOutcome {
30    match update {
31        OrderUpdate::Rejected {
32            reason_code,
33            reason,
34            ..
35        } => CloseAttemptOutcome {
36            close_failed_reason: Some(reason.clone()),
37            close_reject_code: Some(reason_code.clone()),
38        },
39        _ => CloseAttemptOutcome::default(),
40    }
41}
42
43pub fn advance_close_all_job(
44    close_all_jobs: &mut HashMap<u64, (usize, usize, usize)>,
45    job_id: u64,
46    close_failed_reason: Option<&str>,
47    close_reject_code: Option<&str>,
48    is_soft_skip_reason: fn(&str) -> bool,
49) -> Option<CloseAllUpdate> {
50    let (total, completed, failed) = if let Some(state) = close_all_jobs.get_mut(&job_id) {
51        state.1 = state.1.saturating_add(1);
52        let is_soft_skip = close_reject_code.map(is_soft_skip_reason).unwrap_or(false);
53        if close_failed_reason.is_some() && !is_soft_skip {
54            state.2 = state.2.saturating_add(1);
55        }
56        (state.0, state.1, state.2)
57    } else {
58        (0, 0, 0)
59    };
60
61    if total == 0 && completed == 0 {
62        return None;
63    }
64    let finished = completed >= total;
65    if finished {
66        close_all_jobs.remove(&job_id);
67    }
68    Some(CloseAllUpdate {
69        total,
70        completed,
71        failed,
72        finished,
73    })
74}
75
76#[allow(clippy::too_many_arguments)]
77pub async fn process_internal_exit_for_instrument(
78    app_tx: &mpsc::Sender<AppEvent>,
79    mgr: &mut OrderManager,
80    instrument: &str,
81    source_tag_lc: &str,
82    reason_code: &str,
83    selected_symbol: &str,
84    order_history_limit: usize,
85    close_all_job_id: Option<u64>,
86    close_all_jobs: &mut HashMap<u64, (usize, usize, usize)>,
87    strategy_stats_by_instrument: &mut HashMap<String, HashMap<String, OrderHistoryStats>>,
88    realized_pnl_by_symbol: &mut HashMap<String, f64>,
89    lifecycle_triggered_once: &mut HashSet<String>,
90    lifecycle_engine: &mut PositionLifecycleEngine,
91    close_all_soft_skip_reason: fn(&str) -> bool,
92    build_scoped_stats: fn(
93        &HashMap<String, HashMap<String, OrderHistoryStats>>,
94    ) -> HashMap<String, OrderHistoryStats>,
95) -> InternalExitProcessResult {
96    let mut close_failed_reason: Option<String> = None;
97    let mut close_reject_code: Option<String> = None;
98    let mut result = InternalExitProcessResult::default();
99
100    match mgr
101        .emergency_close_position(source_tag_lc, reason_code)
102        .await
103    {
104        Ok(Some(ref update)) => {
105            let outcome = classify_close_update(update);
106            close_reject_code = outcome.close_reject_code;
107            close_failed_reason = outcome.close_failed_reason;
108            if instrument == selected_symbol {
109                let _ = app_tx.send(AppEvent::OrderUpdate(update.clone())).await;
110            }
111            match mgr.refresh_order_history(order_history_limit).await {
112                Ok(history) => {
113                    strategy_stats_by_instrument
114                        .insert(instrument.to_string(), history.strategy_stats.clone());
115                    realized_pnl_by_symbol
116                        .insert(instrument.to_string(), history.stats.realized_pnl);
117                    if instrument == selected_symbol {
118                        let _ = app_tx.send(AppEvent::OrderHistoryUpdate(history)).await;
119                    }
120                    let _ = app_tx
121                        .send(AppEvent::StrategyStatsUpdate {
122                            strategy_stats: build_scoped_stats(strategy_stats_by_instrument),
123                        })
124                        .await;
125                }
126                Err(e) => {
127                    let _ = app_tx
128                        .send(log_event(
129                            LogLevel::Warn,
130                            LogDomain::Order,
131                            "history.refresh.fail",
132                            format!("Order history refresh failed: {}", e),
133                        ))
134                        .await;
135                }
136            }
137            if let OrderUpdate::Filled { .. } = update {
138                lifecycle_triggered_once.remove(instrument);
139                if let Some(state) = lifecycle_engine.on_position_closed(instrument) {
140                    let _ = app_tx
141                        .send(log_event(
142                            LogLevel::Info,
143                            LogDomain::Risk,
144                            "lifecycle.close.internal",
145                            format!(
146                                "Lifecycle internal close: {} pos={} reason={} mfe={:+.4} mae={:+.4}",
147                                instrument, state.position_id, reason_code, state.mfe_usdt, state.mae_usdt
148                            ),
149                        ))
150                        .await;
151                }
152                if let Ok(balances) = mgr.refresh_balances().await {
153                    if instrument == selected_symbol {
154                        let _ = app_tx.send(AppEvent::BalanceUpdate(balances)).await;
155                    }
156                }
157            }
158            result.emit_asset_snapshot = true;
159            result.emit_rate_snapshot = true;
160        }
161        Ok(None) => {}
162        Err(e) => {
163            close_failed_reason = Some(e.to_string());
164            let _ = app_tx.send(AppEvent::Error(e.to_string())).await;
165        }
166    }
167
168    if let Some(job_id) = close_all_job_id {
169        if let Some(update) = advance_close_all_job(
170            close_all_jobs,
171            job_id,
172            close_failed_reason.as_deref(),
173            close_reject_code.as_deref(),
174            close_all_soft_skip_reason,
175        ) {
176            let _ = app_tx
177                .send(AppEvent::CloseAllProgress {
178                    job_id,
179                    symbol: instrument.to_string(),
180                    completed: update.completed,
181                    total: update.total,
182                    failed: update.failed,
183                    reason: close_failed_reason.clone(),
184                })
185                .await;
186            if update.finished {
187                let _ = app_tx
188                    .send(AppEvent::CloseAllFinished {
189                        job_id,
190                        completed: update.completed,
191                        total: update.total,
192                        failed: update.failed,
193                    })
194                    .await;
195            }
196        }
197    }
198
199    result
200}
201
202fn log_event(level: LogLevel, domain: LogDomain, event: &'static str, msg: String) -> AppEvent {
203    AppEvent::LogRecord(LogRecord::new(level, domain, event, msg))
204}