tesser_cli/
alerts.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use reqwest::Client;
6use rust_decimal::{prelude::ToPrimitive, Decimal};
7use serde_json::json;
8use tesser_config::AlertingConfig;
9use tokio::sync::Mutex;
10use tokio::time::interval;
11use tracing::{error, warn};
12
13#[derive(Clone)]
14pub struct AlertDispatcher {
15    client: Client,
16    webhook: Option<String>,
17}
18
19impl AlertDispatcher {
20    pub fn new(webhook: Option<String>) -> Self {
21        Self {
22            client: Client::builder().build().expect("reqwest client"),
23            webhook,
24        }
25    }
26
27    pub async fn notify(&self, title: &str, message: &str) {
28        warn!(%title, %message, "alert raised");
29        let Some(url) = self.webhook.as_ref() else {
30            return;
31        };
32        let payload = json!({ "title": title, "message": message });
33        if let Err(err) = self.client.post(url).json(&payload).send().await {
34            error!(error = %err, "failed to send alert webhook");
35        }
36    }
37}
38
39struct AlertState {
40    last_data: Instant,
41    consecutive_failures: u32,
42    peak_equity: Decimal,
43    drawdown_triggered: bool,
44    data_gap_triggered: bool,
45    last_public_connection: Instant,
46    last_private_connection: Instant,
47    public_alerted: bool,
48    private_alerted: bool,
49    checksum_alerts: HashMap<String, Instant>,
50}
51
52pub struct AlertManager {
53    config: AlertingConfig,
54    dispatcher: AlertDispatcher,
55    state: Arc<Mutex<AlertState>>,
56    public_connection: Option<Arc<std::sync::atomic::AtomicBool>>,
57    private_connection: Option<Arc<std::sync::atomic::AtomicBool>>,
58}
59
60impl AlertManager {
61    pub fn new(
62        config: AlertingConfig,
63        dispatcher: AlertDispatcher,
64        public_connection: Option<Arc<std::sync::atomic::AtomicBool>>,
65        private_connection: Option<Arc<std::sync::atomic::AtomicBool>>,
66    ) -> Self {
67        let state = AlertState {
68            last_data: Instant::now(),
69            consecutive_failures: 0,
70            peak_equity: Decimal::ZERO,
71            drawdown_triggered: false,
72            data_gap_triggered: false,
73            last_public_connection: Instant::now(),
74            last_private_connection: Instant::now(),
75            public_alerted: false,
76            private_alerted: false,
77            checksum_alerts: HashMap::new(),
78        };
79        Self {
80            config,
81            dispatcher,
82            state: Arc::new(Mutex::new(state)),
83            public_connection,
84            private_connection,
85        }
86    }
87
88    pub async fn heartbeat(&self) {
89        let mut state = self.state.lock().await;
90        state.last_data = Instant::now();
91        state.data_gap_triggered = false;
92    }
93
94    pub async fn order_failure(&self, reason: &str) {
95        let mut state = self.state.lock().await;
96        state.consecutive_failures += 1;
97        let max_failures = self.config.max_order_failures.max(1);
98        if state.consecutive_failures >= max_failures {
99            drop(state);
100            self.dispatcher
101                .notify(
102                    "Execution failures",
103                    &format!("{} consecutive order failures ({reason})", max_failures),
104                )
105                .await;
106            let mut state = self.state.lock().await;
107            state.consecutive_failures = 0;
108        }
109    }
110
111    pub async fn reset_order_failures(&self) {
112        let mut state = self.state.lock().await;
113        state.consecutive_failures = 0;
114    }
115
116    pub async fn order_book_checksum_mismatch(
117        &self,
118        driver: &str,
119        symbol: &str,
120        expected: u32,
121        actual: u32,
122    ) {
123        let mut state = self.state.lock().await;
124        let key = format!("{driver}:{symbol}");
125        let now = Instant::now();
126        if let Some(last) = state.checksum_alerts.get(&key) {
127            if now.duration_since(*last) < Duration::from_secs(30) {
128                return;
129            }
130        }
131        state.checksum_alerts.insert(key, now);
132        drop(state);
133        self.dispatcher
134            .notify(
135                "Order book checksum mismatch",
136                &format!(
137                    "Driver {driver} symbol {symbol} checksum mismatch (expected {expected}, local {actual})"
138                ),
139            )
140            .await;
141    }
142
143    pub async fn notify(&self, title: &str, message: &str) {
144        self.dispatcher.notify(title, message).await;
145    }
146
147    pub async fn update_equity(&self, equity: Decimal) {
148        if equity <= Decimal::ZERO {
149            return;
150        }
151        let mut state = self.state.lock().await;
152        if equity > state.peak_equity {
153            state.peak_equity = equity;
154            state.drawdown_triggered = false;
155            return;
156        }
157        if state.peak_equity <= Decimal::ZERO {
158            state.peak_equity = equity;
159            return;
160        }
161        let drawdown = (state.peak_equity - equity) / state.peak_equity;
162        if drawdown >= self.config.max_drawdown && !state.drawdown_triggered {
163            state.drawdown_triggered = true;
164            let peak = state.peak_equity;
165            drop(state);
166            let equity_val = equity.to_f64().unwrap_or(0.0);
167            let peak_val = peak.to_f64().unwrap_or(0.0);
168            let drawdown_pct = drawdown.to_f64().unwrap_or(0.0) * 100.0;
169            self.dispatcher
170                .notify(
171                    "Drawdown limit breached",
172                    &format!(
173                        "Current equity {:.2} vs peak {:.2} (drawdown {:.2}%)",
174                        equity_val, peak_val, drawdown_pct
175                    ),
176                )
177                .await;
178        }
179    }
180
181    pub fn spawn_watchdog(&self) -> Option<tokio::task::JoinHandle<()>> {
182        let threshold = self.config.max_data_gap_secs;
183        if threshold == 0 {
184            return None;
185        }
186        let dispatcher = self.dispatcher.clone();
187        let state = self.state.clone();
188        let public_connection = self.public_connection.clone();
189        let private_connection = self.private_connection.clone();
190        let period = Duration::from_secs(threshold);
191        Some(tokio::spawn(async move {
192            let mut ticker = interval(Duration::from_secs(30));
193            loop {
194                ticker.tick().await;
195                let mut guard = state.lock().await;
196                let now = Instant::now();
197                if let Some(flag) = &public_connection {
198                    if flag.load(std::sync::atomic::Ordering::SeqCst) {
199                        guard.last_public_connection = now;
200                        guard.public_alerted = false;
201                    } else if !guard.public_alerted
202                        && now.duration_since(guard.last_public_connection)
203                            >= Duration::from_secs(60)
204                    {
205                        guard.public_alerted = true;
206                        drop(guard);
207                        dispatcher
208                            .notify(
209                                "Exchange connection lost (public)",
210                                "Public stream disconnected for over 60s",
211                            )
212                            .await;
213                        guard = state.lock().await;
214                    }
215                }
216                if let Some(flag) = &private_connection {
217                    if flag.load(std::sync::atomic::Ordering::SeqCst) {
218                        guard.last_private_connection = now;
219                        guard.private_alerted = false;
220                    } else if !guard.private_alerted
221                        && now.duration_since(guard.last_private_connection)
222                            >= Duration::from_secs(60)
223                    {
224                        guard.private_alerted = true;
225                        drop(guard);
226                        dispatcher
227                            .notify(
228                                "Exchange connection lost (private)",
229                                "Private stream disconnected for over 60s",
230                            )
231                            .await;
232                        guard = state.lock().await;
233                    }
234                }
235                if guard.last_data.elapsed() >= period && !guard.data_gap_triggered {
236                    guard.data_gap_triggered = true;
237                    drop(guard);
238                    dispatcher
239                        .notify("Market data stalled", "No heartbeat in configured window")
240                        .await;
241                }
242            }
243        }))
244    }
245}
246
247pub fn sanitize_webhook(input: Option<String>) -> Option<String> {
248    input.and_then(|value| {
249        let trimmed = value.trim().to_string();
250        if trimmed.is_empty() {
251            None
252        } else {
253            Some(trimmed)
254        }
255    })
256}