tesser_cli/
alerts.rs

1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use reqwest::Client;
5use rust_decimal::{prelude::ToPrimitive, Decimal};
6use serde_json::json;
7use tesser_config::AlertingConfig;
8use tokio::sync::Mutex;
9use tokio::time::interval;
10use tracing::{error, warn};
11
12#[derive(Clone)]
13pub struct AlertDispatcher {
14    client: Client,
15    webhook: Option<String>,
16}
17
18impl AlertDispatcher {
19    pub fn new(webhook: Option<String>) -> Self {
20        Self {
21            client: Client::builder().build().expect("reqwest client"),
22            webhook,
23        }
24    }
25
26    pub async fn notify(&self, title: &str, message: &str) {
27        warn!(%title, %message, "alert raised");
28        let Some(url) = self.webhook.as_ref() else {
29            return;
30        };
31        let payload = json!({ "title": title, "message": message });
32        if let Err(err) = self.client.post(url).json(&payload).send().await {
33            error!(error = %err, "failed to send alert webhook");
34        }
35    }
36}
37
38struct AlertState {
39    last_data: Instant,
40    consecutive_failures: u32,
41    peak_equity: Decimal,
42    drawdown_triggered: bool,
43    data_gap_triggered: bool,
44    last_public_connection: Instant,
45    last_private_connection: Instant,
46    public_alerted: bool,
47    private_alerted: bool,
48}
49
50pub struct AlertManager {
51    config: AlertingConfig,
52    dispatcher: AlertDispatcher,
53    state: Arc<Mutex<AlertState>>,
54    public_connection: Option<Arc<std::sync::atomic::AtomicBool>>,
55    private_connection: Option<Arc<std::sync::atomic::AtomicBool>>,
56}
57
58impl AlertManager {
59    pub fn new(
60        config: AlertingConfig,
61        dispatcher: AlertDispatcher,
62        public_connection: Option<Arc<std::sync::atomic::AtomicBool>>,
63        private_connection: Option<Arc<std::sync::atomic::AtomicBool>>,
64    ) -> Self {
65        let state = AlertState {
66            last_data: Instant::now(),
67            consecutive_failures: 0,
68            peak_equity: Decimal::ZERO,
69            drawdown_triggered: false,
70            data_gap_triggered: false,
71            last_public_connection: Instant::now(),
72            last_private_connection: Instant::now(),
73            public_alerted: false,
74            private_alerted: false,
75        };
76        Self {
77            config,
78            dispatcher,
79            state: Arc::new(Mutex::new(state)),
80            public_connection,
81            private_connection,
82        }
83    }
84
85    pub async fn heartbeat(&self) {
86        let mut state = self.state.lock().await;
87        state.last_data = Instant::now();
88        state.data_gap_triggered = false;
89    }
90
91    pub async fn order_failure(&self, reason: &str) {
92        let mut state = self.state.lock().await;
93        state.consecutive_failures += 1;
94        let max_failures = self.config.max_order_failures.max(1);
95        if state.consecutive_failures >= max_failures {
96            drop(state);
97            self.dispatcher
98                .notify(
99                    "Execution failures",
100                    &format!("{} consecutive order failures ({reason})", max_failures),
101                )
102                .await;
103            let mut state = self.state.lock().await;
104            state.consecutive_failures = 0;
105        }
106    }
107
108    pub async fn reset_order_failures(&self) {
109        let mut state = self.state.lock().await;
110        state.consecutive_failures = 0;
111    }
112
113    pub async fn notify(&self, title: &str, message: &str) {
114        self.dispatcher.notify(title, message).await;
115    }
116
117    pub async fn update_equity(&self, equity: Decimal) {
118        if equity <= Decimal::ZERO {
119            return;
120        }
121        let mut state = self.state.lock().await;
122        if equity > state.peak_equity {
123            state.peak_equity = equity;
124            state.drawdown_triggered = false;
125            return;
126        }
127        if state.peak_equity <= Decimal::ZERO {
128            state.peak_equity = equity;
129            return;
130        }
131        let drawdown = (state.peak_equity - equity) / state.peak_equity;
132        if drawdown >= self.config.max_drawdown && !state.drawdown_triggered {
133            state.drawdown_triggered = true;
134            let peak = state.peak_equity;
135            drop(state);
136            let equity_val = equity.to_f64().unwrap_or(0.0);
137            let peak_val = peak.to_f64().unwrap_or(0.0);
138            let drawdown_pct = drawdown.to_f64().unwrap_or(0.0) * 100.0;
139            self.dispatcher
140                .notify(
141                    "Drawdown limit breached",
142                    &format!(
143                        "Current equity {:.2} vs peak {:.2} (drawdown {:.2}%)",
144                        equity_val, peak_val, drawdown_pct
145                    ),
146                )
147                .await;
148        }
149    }
150
151    pub fn spawn_watchdog(&self) -> Option<tokio::task::JoinHandle<()>> {
152        let threshold = self.config.max_data_gap_secs;
153        if threshold == 0 {
154            return None;
155        }
156        let dispatcher = self.dispatcher.clone();
157        let state = self.state.clone();
158        let public_connection = self.public_connection.clone();
159        let private_connection = self.private_connection.clone();
160        let period = Duration::from_secs(threshold);
161        Some(tokio::spawn(async move {
162            let mut ticker = interval(Duration::from_secs(30));
163            loop {
164                ticker.tick().await;
165                let mut guard = state.lock().await;
166                let now = Instant::now();
167                if let Some(flag) = &public_connection {
168                    if flag.load(std::sync::atomic::Ordering::SeqCst) {
169                        guard.last_public_connection = now;
170                        guard.public_alerted = false;
171                    } else if !guard.public_alerted
172                        && now.duration_since(guard.last_public_connection)
173                            >= Duration::from_secs(60)
174                    {
175                        guard.public_alerted = true;
176                        drop(guard);
177                        dispatcher
178                            .notify(
179                                "Exchange connection lost (public)",
180                                "Public stream disconnected for over 60s",
181                            )
182                            .await;
183                        guard = state.lock().await;
184                    }
185                }
186                if let Some(flag) = &private_connection {
187                    if flag.load(std::sync::atomic::Ordering::SeqCst) {
188                        guard.last_private_connection = now;
189                        guard.private_alerted = false;
190                    } else if !guard.private_alerted
191                        && now.duration_since(guard.last_private_connection)
192                            >= Duration::from_secs(60)
193                    {
194                        guard.private_alerted = true;
195                        drop(guard);
196                        dispatcher
197                            .notify(
198                                "Exchange connection lost (private)",
199                                "Private stream disconnected for over 60s",
200                            )
201                            .await;
202                        guard = state.lock().await;
203                    }
204                }
205                if guard.last_data.elapsed() >= period && !guard.data_gap_triggered {
206                    guard.data_gap_triggered = true;
207                    drop(guard);
208                    dispatcher
209                        .notify("Market data stalled", "No heartbeat in configured window")
210                        .await;
211                }
212            }
213        }))
214    }
215}
216
217pub fn sanitize_webhook(input: Option<String>) -> Option<String> {
218    input.and_then(|value| {
219        let trimmed = value.trim().to_string();
220        if trimmed.is_empty() {
221            None
222        } else {
223            Some(trimmed)
224        }
225    })
226}