sandbox_quant/runtime/
execution_intent_flow.rs1use std::collections::HashMap;
2
3use tokio::sync::mpsc;
4
5use crate::event::{AppEvent, LogDomain, LogLevel, LogRecord};
6use crate::order_manager::{OrderHistoryStats, OrderManager};
7
8#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
9pub struct ExecutionIntentProcessResult {
10 pub emit_asset_snapshot: bool,
11 pub emit_rate_snapshot: bool,
12}
13
14#[allow(clippy::too_many_arguments)]
15pub async fn process_execution_intent_for_instrument(
16 app_tx: &mpsc::Sender<AppEvent>,
17 mgr: &mut OrderManager,
18 instrument: &str,
19 source_tag: &str,
20 signal: crate::model::signal::Signal,
21 selected_symbol: &str,
22 order_history_limit: usize,
23 strategy_stats_by_instrument: &mut HashMap<String, HashMap<String, OrderHistoryStats>>,
24 realized_pnl_by_symbol: &mut HashMap<String, f64>,
25 build_scoped_stats: fn(
26 &HashMap<String, HashMap<String, OrderHistoryStats>>,
27 ) -> HashMap<String, OrderHistoryStats>,
28) -> ExecutionIntentProcessResult {
29 let mut result = ExecutionIntentProcessResult::default();
30 let source_tag_lc = source_tag.to_ascii_lowercase();
31
32 match mgr.submit_order(signal, &source_tag_lc).await {
33 Ok(Some(ref update)) => {
34 if instrument == selected_symbol {
35 let _ = app_tx.send(AppEvent::OrderUpdate(update.clone())).await;
36 }
37 match mgr.refresh_order_history(order_history_limit).await {
38 Ok(history) => {
39 strategy_stats_by_instrument
40 .insert(instrument.to_string(), history.strategy_stats.clone());
41 realized_pnl_by_symbol
42 .insert(instrument.to_string(), history.stats.realized_pnl);
43 if instrument == selected_symbol {
44 let _ = app_tx.send(AppEvent::OrderHistoryUpdate(history)).await;
45 }
46 let _ = app_tx
47 .send(AppEvent::StrategyStatsUpdate {
48 strategy_stats: build_scoped_stats(strategy_stats_by_instrument),
49 })
50 .await;
51 }
52 Err(e) => {
53 let _ = app_tx
54 .send(log_event(
55 LogLevel::Warn,
56 LogDomain::Order,
57 "history.refresh.fail",
58 format!("Order history refresh failed: {}", e),
59 ))
60 .await;
61 }
62 }
63 if let Ok(balances) = mgr.refresh_balances().await {
64 if instrument == selected_symbol {
65 let _ = app_tx.send(AppEvent::BalanceUpdate(balances)).await;
66 }
67 }
68 result.emit_asset_snapshot = true;
69 result.emit_rate_snapshot = true;
70 }
71 Ok(None) => {}
72 Err(e) => {
73 let _ = app_tx.send(AppEvent::Error(e.to_string())).await;
74 }
75 }
76
77 result
78}
79
80fn log_event(level: LogLevel, domain: LogDomain, event: &'static str, msg: String) -> AppEvent {
81 AppEvent::LogRecord(LogRecord::new(level, domain, event, msg))
82}