solana_notifier/
lib.rs

1/// To activate Slack, Discord, PagerDuty and/or Telegram notifications, define these environment variables
2/// before using the `Notifier`
3/// ```bash
4/// export SLACK_WEBHOOK=...
5/// export DISCORD_WEBHOOK=...
6/// ```
7///
8/// Telegram requires the following two variables:
9/// ```bash
10/// export TELEGRAM_BOT_TOKEN=...
11/// export TELEGRAM_CHAT_ID=...
12/// ```
13///
14/// PagerDuty requires an Integration Key from the Events API v2 (Add this integration to your PagerDuty service to get this)
15///
16/// ```bash
17/// export PAGERDUTY_INTEGRATION_KEY=...
18/// ```
19///
20/// To receive a Twilio SMS notification on failure, having a Twilio account,
21/// and a sending number owned by that account,
22/// define environment variable before running `agave-watchtower`:
23/// ```bash
24/// export TWILIO_CONFIG='ACCOUNT=<account>,TOKEN=<securityToken>,TO=<receivingNumber>,FROM=<sendingNumber>'
25/// ```
26use log::*;
27use {
28    reqwest::{blocking::Client, StatusCode},
29    serde_json::json,
30    solana_hash::Hash,
31    std::{env, str::FromStr, thread::sleep, time::Duration},
32};
33
34struct TelegramWebHook {
35    bot_token: String,
36    chat_id: String,
37}
38
39#[derive(Debug, Default)]
40struct TwilioWebHook {
41    account: String,
42    token: String,
43    to: String,
44    from: String,
45}
46
47impl TwilioWebHook {
48    fn complete(&self) -> bool {
49        !(self.account.is_empty()
50            || self.token.is_empty()
51            || self.to.is_empty()
52            || self.from.is_empty())
53    }
54}
55
56fn get_twilio_config() -> Result<Option<TwilioWebHook>, String> {
57    let config_var = env::var("TWILIO_CONFIG");
58
59    if config_var.is_err() {
60        info!("Twilio notifications disabled");
61        return Ok(None);
62    }
63
64    let mut config = TwilioWebHook::default();
65
66    for pair in config_var.unwrap().split(',') {
67        let nv: Vec<_> = pair.split('=').collect();
68        if nv.len() != 2 {
69            return Err(format!("TWILIO_CONFIG is invalid: '{pair}'"));
70        }
71        let v = nv[1].to_string();
72        match nv[0] {
73            "ACCOUNT" => config.account = v,
74            "TOKEN" => config.token = v,
75            "TO" => config.to = v,
76            "FROM" => config.from = v,
77            _ => return Err(format!("TWILIO_CONFIG is invalid: '{pair}'")),
78        }
79    }
80
81    if !config.complete() {
82        return Err("TWILIO_CONFIG is incomplete".to_string());
83    }
84    Ok(Some(config))
85}
86
87enum NotificationChannel {
88    Discord(String),
89    Slack(String),
90    PagerDuty(String),
91    Telegram(TelegramWebHook),
92    Twilio(TwilioWebHook),
93    Log(Level),
94}
95
96#[derive(Clone)]
97pub enum NotificationType {
98    Trigger { incident: Hash },
99    Resolve { incident: Hash },
100}
101
102pub struct Notifier {
103    client: Client,
104    notifiers: Vec<NotificationChannel>,
105}
106
107impl Default for Notifier {
108    fn default() -> Self {
109        Self::new("")
110    }
111}
112
113impl Notifier {
114    pub fn new(env_prefix: &str) -> Self {
115        info!("Initializing {}Notifier", env_prefix);
116
117        let mut notifiers = vec![];
118
119        if let Ok(webhook) = env::var(format!("{env_prefix}DISCORD_WEBHOOK")) {
120            notifiers.push(NotificationChannel::Discord(webhook));
121        }
122        if let Ok(webhook) = env::var(format!("{env_prefix}SLACK_WEBHOOK")) {
123            notifiers.push(NotificationChannel::Slack(webhook));
124        }
125        if let Ok(routing_key) = env::var(format!("{env_prefix}PAGERDUTY_INTEGRATION_KEY")) {
126            notifiers.push(NotificationChannel::PagerDuty(routing_key));
127        }
128
129        if let (Ok(bot_token), Ok(chat_id)) = (
130            env::var(format!("{env_prefix}TELEGRAM_BOT_TOKEN")),
131            env::var(format!("{env_prefix}TELEGRAM_CHAT_ID")),
132        ) {
133            notifiers.push(NotificationChannel::Telegram(TelegramWebHook {
134                bot_token,
135                chat_id,
136            }));
137        }
138
139        if let Ok(Some(webhook)) = get_twilio_config() {
140            notifiers.push(NotificationChannel::Twilio(webhook));
141        }
142
143        if let Ok(log_level) = env::var(format!("{env_prefix}LOG_NOTIFIER_LEVEL")) {
144            match Level::from_str(&log_level) {
145                Ok(level) => notifiers.push(NotificationChannel::Log(level)),
146                Err(e) => warn!(
147                    "could not parse specified log notifier level string ({}): {}",
148                    log_level, e
149                ),
150            }
151        }
152
153        info!("{} notifiers", notifiers.len());
154
155        Notifier {
156            client: Client::new(),
157            notifiers,
158        }
159    }
160
161    pub fn is_empty(&self) -> bool {
162        self.notifiers.is_empty()
163    }
164
165    pub fn send(&self, msg: &str, notification_type: &NotificationType) {
166        for notifier in &self.notifiers {
167            match notifier {
168                NotificationChannel::Discord(webhook) => {
169                    for line in msg.split('\n') {
170                        // Discord rate limiting is aggressive, limit to 1 message a second
171                        sleep(Duration::from_millis(1000));
172
173                        info!("Sending {}", line);
174                        let data = json!({ "content": line });
175
176                        loop {
177                            let response = self.client.post(webhook).json(&data).send();
178
179                            if let Err(err) = response {
180                                warn!("Failed to send Discord message: \"{}\": {:?}", line, err);
181                                break;
182                            } else if let Ok(response) = response {
183                                info!("response status: {}", response.status());
184                                if response.status() == StatusCode::TOO_MANY_REQUESTS {
185                                    warn!("rate limited!...");
186                                    warn!("response text: {:?}", response.text());
187                                    sleep(Duration::from_secs(2));
188                                } else {
189                                    break;
190                                }
191                            }
192                        }
193                    }
194                }
195                NotificationChannel::Slack(webhook) => {
196                    let data = json!({ "text": msg });
197                    if let Err(err) = self.client.post(webhook).json(&data).send() {
198                        warn!("Failed to send Slack message: {:?}", err);
199                    }
200                }
201                NotificationChannel::PagerDuty(routing_key) => {
202                    let event_action = match notification_type {
203                        NotificationType::Trigger { incident: _ } => String::from("trigger"),
204                        NotificationType::Resolve { incident: _ } => String::from("resolve"),
205                    };
206                    let dedup_key = match notification_type {
207                        NotificationType::Trigger { ref incident } => incident.clone().to_string(),
208                        NotificationType::Resolve { ref incident } => incident.clone().to_string(),
209                    };
210
211                    let data = json!({"payload":{"summary":msg,"source":"agave-watchtower","severity":"critical"},"routing_key":routing_key,"event_action":event_action,"dedup_key":dedup_key});
212                    let url = "https://events.pagerduty.com/v2/enqueue";
213
214                    if let Err(err) = self.client.post(url).json(&data).send() {
215                        warn!("Failed to send PagerDuty alert: {:?}", err);
216                    }
217                }
218
219                NotificationChannel::Telegram(TelegramWebHook { chat_id, bot_token }) => {
220                    let data = json!({ "chat_id": chat_id, "text": msg });
221                    let url = format!("https://api.telegram.org/bot{bot_token}/sendMessage");
222
223                    if let Err(err) = self.client.post(url).json(&data).send() {
224                        warn!("Failed to send Telegram message: {:?}", err);
225                    }
226                }
227
228                NotificationChannel::Twilio(TwilioWebHook {
229                    account,
230                    token,
231                    to,
232                    from,
233                }) => {
234                    let url = format!(
235                        "https://{account}:{token}@api.twilio.com/2010-04-01/Accounts/{account}/Messages.json"
236                    );
237                    let params = [("To", to), ("From", from), ("Body", &msg.to_string())];
238                    if let Err(err) = self.client.post(url).form(&params).send() {
239                        warn!("Failed to send Twilio message: {:?}", err);
240                    }
241                }
242                NotificationChannel::Log(level) => {
243                    log!(*level, "{}", msg)
244                }
245            }
246        }
247    }
248}