1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use reqwest::Client;
6use rust_decimal::{prelude::ToPrimitive, Decimal};
7use serde_json::json;
8use tesser_config::AlertingConfig;
9use tokio::sync::Mutex;
10use tokio::time::interval;
11use tracing::{error, warn};
12
13#[derive(Clone)]
14pub struct AlertDispatcher {
15 client: Client,
16 webhook: Option<String>,
17}
18
19impl AlertDispatcher {
20 pub fn new(webhook: Option<String>) -> Self {
21 Self {
22 client: Client::builder().build().expect("reqwest client"),
23 webhook,
24 }
25 }
26
27 pub async fn notify(&self, title: &str, message: &str) {
28 warn!(%title, %message, "alert raised");
29 let Some(url) = self.webhook.as_ref() else {
30 return;
31 };
32 let payload = json!({ "title": title, "message": message });
33 if let Err(err) = self.client.post(url).json(&payload).send().await {
34 error!(error = %err, "failed to send alert webhook");
35 }
36 }
37}
38
39struct AlertState {
40 last_data: Instant,
41 consecutive_failures: u32,
42 peak_equity: Decimal,
43 drawdown_triggered: bool,
44 data_gap_triggered: bool,
45 last_public_connection: Instant,
46 last_private_connection: Instant,
47 public_alerted: bool,
48 private_alerted: bool,
49 checksum_alerts: HashMap<String, Instant>,
50}
51
52pub struct AlertManager {
53 config: AlertingConfig,
54 dispatcher: AlertDispatcher,
55 state: Arc<Mutex<AlertState>>,
56 public_connection: Option<Arc<std::sync::atomic::AtomicBool>>,
57 private_connection: Option<Arc<std::sync::atomic::AtomicBool>>,
58}
59
60impl AlertManager {
61 pub fn new(
62 config: AlertingConfig,
63 dispatcher: AlertDispatcher,
64 public_connection: Option<Arc<std::sync::atomic::AtomicBool>>,
65 private_connection: Option<Arc<std::sync::atomic::AtomicBool>>,
66 ) -> Self {
67 let state = AlertState {
68 last_data: Instant::now(),
69 consecutive_failures: 0,
70 peak_equity: Decimal::ZERO,
71 drawdown_triggered: false,
72 data_gap_triggered: false,
73 last_public_connection: Instant::now(),
74 last_private_connection: Instant::now(),
75 public_alerted: false,
76 private_alerted: false,
77 checksum_alerts: HashMap::new(),
78 };
79 Self {
80 config,
81 dispatcher,
82 state: Arc::new(Mutex::new(state)),
83 public_connection,
84 private_connection,
85 }
86 }
87
88 pub async fn heartbeat(&self) {
89 let mut state = self.state.lock().await;
90 state.last_data = Instant::now();
91 state.data_gap_triggered = false;
92 }
93
94 pub async fn order_failure(&self, reason: &str) {
95 let mut state = self.state.lock().await;
96 state.consecutive_failures += 1;
97 let max_failures = self.config.max_order_failures.max(1);
98 if state.consecutive_failures >= max_failures {
99 drop(state);
100 self.dispatcher
101 .notify(
102 "Execution failures",
103 &format!("{} consecutive order failures ({reason})", max_failures),
104 )
105 .await;
106 let mut state = self.state.lock().await;
107 state.consecutive_failures = 0;
108 }
109 }
110
111 pub async fn reset_order_failures(&self) {
112 let mut state = self.state.lock().await;
113 state.consecutive_failures = 0;
114 }
115
116 pub async fn order_book_checksum_mismatch(
117 &self,
118 driver: &str,
119 symbol: &str,
120 expected: u32,
121 actual: u32,
122 ) {
123 let mut state = self.state.lock().await;
124 let key = format!("{driver}:{symbol}");
125 let now = Instant::now();
126 if let Some(last) = state.checksum_alerts.get(&key) {
127 if now.duration_since(*last) < Duration::from_secs(30) {
128 return;
129 }
130 }
131 state.checksum_alerts.insert(key, now);
132 drop(state);
133 self.dispatcher
134 .notify(
135 "Order book checksum mismatch",
136 &format!(
137 "Driver {driver} symbol {symbol} checksum mismatch (expected {expected}, local {actual})"
138 ),
139 )
140 .await;
141 }
142
143 pub async fn notify(&self, title: &str, message: &str) {
144 self.dispatcher.notify(title, message).await;
145 }
146
147 pub async fn update_equity(&self, equity: Decimal) {
148 if equity <= Decimal::ZERO {
149 return;
150 }
151 let mut state = self.state.lock().await;
152 if equity > state.peak_equity {
153 state.peak_equity = equity;
154 state.drawdown_triggered = false;
155 return;
156 }
157 if state.peak_equity <= Decimal::ZERO {
158 state.peak_equity = equity;
159 return;
160 }
161 let drawdown = (state.peak_equity - equity) / state.peak_equity;
162 if drawdown >= self.config.max_drawdown && !state.drawdown_triggered {
163 state.drawdown_triggered = true;
164 let peak = state.peak_equity;
165 drop(state);
166 let equity_val = equity.to_f64().unwrap_or(0.0);
167 let peak_val = peak.to_f64().unwrap_or(0.0);
168 let drawdown_pct = drawdown.to_f64().unwrap_or(0.0) * 100.0;
169 self.dispatcher
170 .notify(
171 "Drawdown limit breached",
172 &format!(
173 "Current equity {:.2} vs peak {:.2} (drawdown {:.2}%)",
174 equity_val, peak_val, drawdown_pct
175 ),
176 )
177 .await;
178 }
179 }
180
181 pub fn spawn_watchdog(&self) -> Option<tokio::task::JoinHandle<()>> {
182 let threshold = self.config.max_data_gap_secs;
183 if threshold == 0 {
184 return None;
185 }
186 let dispatcher = self.dispatcher.clone();
187 let state = self.state.clone();
188 let public_connection = self.public_connection.clone();
189 let private_connection = self.private_connection.clone();
190 let period = Duration::from_secs(threshold);
191 Some(tokio::spawn(async move {
192 let mut ticker = interval(Duration::from_secs(30));
193 loop {
194 ticker.tick().await;
195 let mut guard = state.lock().await;
196 let now = Instant::now();
197 if let Some(flag) = &public_connection {
198 if flag.load(std::sync::atomic::Ordering::SeqCst) {
199 guard.last_public_connection = now;
200 guard.public_alerted = false;
201 } else if !guard.public_alerted
202 && now.duration_since(guard.last_public_connection)
203 >= Duration::from_secs(60)
204 {
205 guard.public_alerted = true;
206 drop(guard);
207 dispatcher
208 .notify(
209 "Exchange connection lost (public)",
210 "Public stream disconnected for over 60s",
211 )
212 .await;
213 guard = state.lock().await;
214 }
215 }
216 if let Some(flag) = &private_connection {
217 if flag.load(std::sync::atomic::Ordering::SeqCst) {
218 guard.last_private_connection = now;
219 guard.private_alerted = false;
220 } else if !guard.private_alerted
221 && now.duration_since(guard.last_private_connection)
222 >= Duration::from_secs(60)
223 {
224 guard.private_alerted = true;
225 drop(guard);
226 dispatcher
227 .notify(
228 "Exchange connection lost (private)",
229 "Private stream disconnected for over 60s",
230 )
231 .await;
232 guard = state.lock().await;
233 }
234 }
235 if guard.last_data.elapsed() >= period && !guard.data_gap_triggered {
236 guard.data_gap_triggered = true;
237 drop(guard);
238 dispatcher
239 .notify("Market data stalled", "No heartbeat in configured window")
240 .await;
241 }
242 }
243 }))
244 }
245}
246
247pub fn sanitize_webhook(input: Option<String>) -> Option<String> {
248 input.and_then(|value| {
249 let trimmed = value.trim().to_string();
250 if trimmed.is_empty() {
251 None
252 } else {
253 Some(trimmed)
254 }
255 })
256}