1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use reqwest::Client;
5use rust_decimal::{prelude::ToPrimitive, Decimal};
6use serde_json::json;
7use tesser_config::AlertingConfig;
8use tokio::sync::Mutex;
9use tokio::time::interval;
10use tracing::{error, warn};
11
12#[derive(Clone)]
13pub struct AlertDispatcher {
14 client: Client,
15 webhook: Option<String>,
16}
17
18impl AlertDispatcher {
19 pub fn new(webhook: Option<String>) -> Self {
20 Self {
21 client: Client::builder().build().expect("reqwest client"),
22 webhook,
23 }
24 }
25
26 pub async fn notify(&self, title: &str, message: &str) {
27 warn!(%title, %message, "alert raised");
28 let Some(url) = self.webhook.as_ref() else {
29 return;
30 };
31 let payload = json!({ "title": title, "message": message });
32 if let Err(err) = self.client.post(url).json(&payload).send().await {
33 error!(error = %err, "failed to send alert webhook");
34 }
35 }
36}
37
38struct AlertState {
39 last_data: Instant,
40 consecutive_failures: u32,
41 peak_equity: Decimal,
42 drawdown_triggered: bool,
43 data_gap_triggered: bool,
44 last_public_connection: Instant,
45 last_private_connection: Instant,
46 public_alerted: bool,
47 private_alerted: bool,
48}
49
50pub struct AlertManager {
51 config: AlertingConfig,
52 dispatcher: AlertDispatcher,
53 state: Arc<Mutex<AlertState>>,
54 public_connection: Option<Arc<std::sync::atomic::AtomicBool>>,
55 private_connection: Option<Arc<std::sync::atomic::AtomicBool>>,
56}
57
58impl AlertManager {
59 pub fn new(
60 config: AlertingConfig,
61 dispatcher: AlertDispatcher,
62 public_connection: Option<Arc<std::sync::atomic::AtomicBool>>,
63 private_connection: Option<Arc<std::sync::atomic::AtomicBool>>,
64 ) -> Self {
65 let state = AlertState {
66 last_data: Instant::now(),
67 consecutive_failures: 0,
68 peak_equity: Decimal::ZERO,
69 drawdown_triggered: false,
70 data_gap_triggered: false,
71 last_public_connection: Instant::now(),
72 last_private_connection: Instant::now(),
73 public_alerted: false,
74 private_alerted: false,
75 };
76 Self {
77 config,
78 dispatcher,
79 state: Arc::new(Mutex::new(state)),
80 public_connection,
81 private_connection,
82 }
83 }
84
85 pub async fn heartbeat(&self) {
86 let mut state = self.state.lock().await;
87 state.last_data = Instant::now();
88 state.data_gap_triggered = false;
89 }
90
91 pub async fn order_failure(&self, reason: &str) {
92 let mut state = self.state.lock().await;
93 state.consecutive_failures += 1;
94 let max_failures = self.config.max_order_failures.max(1);
95 if state.consecutive_failures >= max_failures {
96 drop(state);
97 self.dispatcher
98 .notify(
99 "Execution failures",
100 &format!("{} consecutive order failures ({reason})", max_failures),
101 )
102 .await;
103 let mut state = self.state.lock().await;
104 state.consecutive_failures = 0;
105 }
106 }
107
108 pub async fn reset_order_failures(&self) {
109 let mut state = self.state.lock().await;
110 state.consecutive_failures = 0;
111 }
112
113 pub async fn notify(&self, title: &str, message: &str) {
114 self.dispatcher.notify(title, message).await;
115 }
116
117 pub async fn update_equity(&self, equity: Decimal) {
118 if equity <= Decimal::ZERO {
119 return;
120 }
121 let mut state = self.state.lock().await;
122 if equity > state.peak_equity {
123 state.peak_equity = equity;
124 state.drawdown_triggered = false;
125 return;
126 }
127 if state.peak_equity <= Decimal::ZERO {
128 state.peak_equity = equity;
129 return;
130 }
131 let drawdown = (state.peak_equity - equity) / state.peak_equity;
132 if drawdown >= self.config.max_drawdown && !state.drawdown_triggered {
133 state.drawdown_triggered = true;
134 let peak = state.peak_equity;
135 drop(state);
136 let equity_val = equity.to_f64().unwrap_or(0.0);
137 let peak_val = peak.to_f64().unwrap_or(0.0);
138 let drawdown_pct = drawdown.to_f64().unwrap_or(0.0) * 100.0;
139 self.dispatcher
140 .notify(
141 "Drawdown limit breached",
142 &format!(
143 "Current equity {:.2} vs peak {:.2} (drawdown {:.2}%)",
144 equity_val, peak_val, drawdown_pct
145 ),
146 )
147 .await;
148 }
149 }
150
151 pub fn spawn_watchdog(&self) -> Option<tokio::task::JoinHandle<()>> {
152 let threshold = self.config.max_data_gap_secs;
153 if threshold == 0 {
154 return None;
155 }
156 let dispatcher = self.dispatcher.clone();
157 let state = self.state.clone();
158 let public_connection = self.public_connection.clone();
159 let private_connection = self.private_connection.clone();
160 let period = Duration::from_secs(threshold);
161 Some(tokio::spawn(async move {
162 let mut ticker = interval(Duration::from_secs(30));
163 loop {
164 ticker.tick().await;
165 let mut guard = state.lock().await;
166 let now = Instant::now();
167 if let Some(flag) = &public_connection {
168 if flag.load(std::sync::atomic::Ordering::SeqCst) {
169 guard.last_public_connection = now;
170 guard.public_alerted = false;
171 } else if !guard.public_alerted
172 && now.duration_since(guard.last_public_connection)
173 >= Duration::from_secs(60)
174 {
175 guard.public_alerted = true;
176 drop(guard);
177 dispatcher
178 .notify(
179 "Exchange connection lost (public)",
180 "Public stream disconnected for over 60s",
181 )
182 .await;
183 guard = state.lock().await;
184 }
185 }
186 if let Some(flag) = &private_connection {
187 if flag.load(std::sync::atomic::Ordering::SeqCst) {
188 guard.last_private_connection = now;
189 guard.private_alerted = false;
190 } else if !guard.private_alerted
191 && now.duration_since(guard.last_private_connection)
192 >= Duration::from_secs(60)
193 {
194 guard.private_alerted = true;
195 drop(guard);
196 dispatcher
197 .notify(
198 "Exchange connection lost (private)",
199 "Private stream disconnected for over 60s",
200 )
201 .await;
202 guard = state.lock().await;
203 }
204 }
205 if guard.last_data.elapsed() >= period && !guard.data_gap_triggered {
206 guard.data_gap_triggered = true;
207 drop(guard);
208 dispatcher
209 .notify("Market data stalled", "No heartbeat in configured window")
210 .await;
211 }
212 }
213 }))
214 }
215}
216
217pub fn sanitize_webhook(input: Option<String>) -> Option<String> {
218 input.and_then(|value| {
219 let trimmed = value.trim().to_string();
220 if trimmed.is_empty() {
221 None
222 } else {
223 Some(trimmed)
224 }
225 })
226}