Skip to main content

sandbox_quant/runtime/
execution_intent_flow.rs

1use std::collections::HashMap;
2
3use tokio::sync::mpsc;
4
5use crate::event::{AppEvent, LogDomain, LogLevel, LogRecord};
6use crate::order_manager::{OrderHistoryStats, OrderManager};
7
8#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
9pub struct ExecutionIntentProcessResult {
10    pub emit_asset_snapshot: bool,
11    pub emit_rate_snapshot: bool,
12}
13
14#[allow(clippy::too_many_arguments)]
15pub async fn process_execution_intent_for_instrument(
16    app_tx: &mpsc::Sender<AppEvent>,
17    mgr: &mut OrderManager,
18    instrument: &str,
19    source_tag: &str,
20    signal: crate::model::signal::Signal,
21    selected_symbol: &str,
22    order_history_limit: usize,
23    strategy_stats_by_instrument: &mut HashMap<String, HashMap<String, OrderHistoryStats>>,
24    realized_pnl_by_symbol: &mut HashMap<String, f64>,
25    build_scoped_stats: fn(
26        &HashMap<String, HashMap<String, OrderHistoryStats>>,
27    ) -> HashMap<String, OrderHistoryStats>,
28) -> ExecutionIntentProcessResult {
29    let mut result = ExecutionIntentProcessResult::default();
30    let source_tag_lc = source_tag.to_ascii_lowercase();
31
32    match mgr.submit_order(signal, &source_tag_lc).await {
33        Ok(Some(ref update)) => {
34            if instrument == selected_symbol {
35                let _ = app_tx.send(AppEvent::OrderUpdate(update.clone())).await;
36            }
37            match mgr.refresh_order_history(order_history_limit).await {
38                Ok(history) => {
39                    strategy_stats_by_instrument
40                        .insert(instrument.to_string(), history.strategy_stats.clone());
41                    realized_pnl_by_symbol
42                        .insert(instrument.to_string(), history.stats.realized_pnl);
43                    if instrument == selected_symbol {
44                        let _ = app_tx.send(AppEvent::OrderHistoryUpdate(history)).await;
45                    }
46                    let _ = app_tx
47                        .send(AppEvent::StrategyStatsUpdate {
48                            strategy_stats: build_scoped_stats(strategy_stats_by_instrument),
49                        })
50                        .await;
51                }
52                Err(e) => {
53                    let _ = app_tx
54                        .send(log_event(
55                            LogLevel::Warn,
56                            LogDomain::Order,
57                            "history.refresh.fail",
58                            format!("Order history refresh failed: {}", e),
59                        ))
60                        .await;
61                }
62            }
63            if let Ok(balances) = mgr.refresh_balances().await {
64                if instrument == selected_symbol {
65                    let _ = app_tx.send(AppEvent::BalanceUpdate(balances)).await;
66                }
67            }
68            result.emit_asset_snapshot = true;
69            result.emit_rate_snapshot = true;
70        }
71        Ok(None) => {}
72        Err(e) => {
73            let _ = app_tx.send(AppEvent::Error(e.to_string())).await;
74        }
75    }
76
77    result
78}
79
80fn log_event(level: LogLevel, domain: LogDomain, event: &'static str, msg: String) -> AppEvent {
81    AppEvent::LogRecord(LogRecord::new(level, domain, event, msg))
82}