solana_notifier/
lib.rs

1#![cfg_attr(
2    not(feature = "agave-unstable-api"),
3    deprecated(
4        since = "3.1.0",
5        note = "This crate has been marked for formal inclusion in the Agave Unstable API. From \
6                v4.0.0 onward, the `agave-unstable-api` crate feature must be specified to \
7                acknowledge use of an interface that may break without warning."
8    )
9)]
10/// To activate Slack, Discord, PagerDuty and/or Telegram notifications, define these environment variables
11/// before using the `Notifier`
12/// ```bash
13/// export SLACK_WEBHOOK=...
14/// export DISCORD_WEBHOOK=...
15/// ```
16///
17/// Telegram requires the following two variables:
18/// ```bash
19/// export TELEGRAM_BOT_TOKEN=...
20/// export TELEGRAM_CHAT_ID=...
21/// ```
22///
23/// PagerDuty requires an Integration Key from the Events API v2 (Add this integration to your PagerDuty service to get this)
24///
25/// ```bash
26/// export PAGERDUTY_INTEGRATION_KEY=...
27/// ```
28///
29/// To receive a Twilio SMS notification on failure, having a Twilio account,
30/// and a sending number owned by that account,
31/// define environment variable before running `agave-watchtower`:
32/// ```bash
33/// export TWILIO_CONFIG='ACCOUNT=<account>,TOKEN=<securityToken>,TO=<receivingNumber>,FROM=<sendingNumber>'
34/// ```
35use log::*;
36use {
37    reqwest::{blocking::Client, StatusCode},
38    serde_json::json,
39    solana_hash::Hash,
40    std::{env, str::FromStr, thread::sleep, time::Duration},
41};
42
43struct TelegramWebHook {
44    bot_token: String,
45    chat_id: String,
46}
47
48#[derive(Debug, Default)]
49struct TwilioWebHook {
50    account: String,
51    token: String,
52    to: String,
53    from: String,
54}
55
56impl TwilioWebHook {
57    fn complete(&self) -> bool {
58        !(self.account.is_empty()
59            || self.token.is_empty()
60            || self.to.is_empty()
61            || self.from.is_empty())
62    }
63}
64
65fn get_twilio_config() -> Result<Option<TwilioWebHook>, String> {
66    let config_var = env::var("TWILIO_CONFIG");
67
68    if config_var.is_err() {
69        info!("Twilio notifications disabled");
70        return Ok(None);
71    }
72
73    let mut config = TwilioWebHook::default();
74
75    for pair in config_var.unwrap().split(',') {
76        let nv: Vec<_> = pair.split('=').collect();
77        if nv.len() != 2 {
78            return Err(format!("TWILIO_CONFIG is invalid: '{pair}'"));
79        }
80        let v = nv[1].to_string();
81        match nv[0] {
82            "ACCOUNT" => config.account = v,
83            "TOKEN" => config.token = v,
84            "TO" => config.to = v,
85            "FROM" => config.from = v,
86            _ => return Err(format!("TWILIO_CONFIG is invalid: '{pair}'")),
87        }
88    }
89
90    if !config.complete() {
91        return Err("TWILIO_CONFIG is incomplete".to_string());
92    }
93    Ok(Some(config))
94}
95
96enum NotificationChannel {
97    Discord(String),
98    Slack(String),
99    PagerDuty(String),
100    Telegram(TelegramWebHook),
101    Twilio(TwilioWebHook),
102    Log(Level),
103}
104
105#[derive(Clone)]
106pub enum NotificationType {
107    Trigger { incident: Hash },
108    Resolve { incident: Hash },
109}
110
111pub struct Notifier {
112    client: Client,
113    notifiers: Vec<NotificationChannel>,
114}
115
116impl Default for Notifier {
117    fn default() -> Self {
118        Self::new("")
119    }
120}
121
122impl Notifier {
123    pub fn new(env_prefix: &str) -> Self {
124        info!("Initializing {env_prefix}Notifier");
125
126        let mut notifiers = vec![];
127
128        if let Ok(webhook) = env::var(format!("{env_prefix}DISCORD_WEBHOOK")) {
129            notifiers.push(NotificationChannel::Discord(webhook));
130        }
131        if let Ok(webhook) = env::var(format!("{env_prefix}SLACK_WEBHOOK")) {
132            notifiers.push(NotificationChannel::Slack(webhook));
133        }
134        if let Ok(routing_key) = env::var(format!("{env_prefix}PAGERDUTY_INTEGRATION_KEY")) {
135            notifiers.push(NotificationChannel::PagerDuty(routing_key));
136        }
137
138        if let (Ok(bot_token), Ok(chat_id)) = (
139            env::var(format!("{env_prefix}TELEGRAM_BOT_TOKEN")),
140            env::var(format!("{env_prefix}TELEGRAM_CHAT_ID")),
141        ) {
142            notifiers.push(NotificationChannel::Telegram(TelegramWebHook {
143                bot_token,
144                chat_id,
145            }));
146        }
147
148        if let Ok(Some(webhook)) = get_twilio_config() {
149            notifiers.push(NotificationChannel::Twilio(webhook));
150        }
151
152        if let Ok(log_level) = env::var(format!("{env_prefix}LOG_NOTIFIER_LEVEL")) {
153            match Level::from_str(&log_level) {
154                Ok(level) => notifiers.push(NotificationChannel::Log(level)),
155                Err(e) => {
156                    warn!("could not parse specified log notifier level string ({log_level}): {e}")
157                }
158            }
159        }
160
161        info!("{} notifiers", notifiers.len());
162
163        Notifier {
164            client: Client::new(),
165            notifiers,
166        }
167    }
168
169    pub fn is_empty(&self) -> bool {
170        self.notifiers.is_empty()
171    }
172
173    pub fn send(&self, msg: &str, notification_type: &NotificationType) {
174        for notifier in &self.notifiers {
175            match notifier {
176                NotificationChannel::Discord(webhook) => {
177                    for line in msg.split('\n') {
178                        // Discord rate limiting is aggressive, limit to 1 message a second
179                        sleep(Duration::from_millis(1000));
180
181                        info!("Sending {line}");
182                        let data = json!({ "content": line });
183
184                        loop {
185                            let response = self.client.post(webhook).json(&data).send();
186
187                            if let Err(err) = response {
188                                warn!("Failed to send Discord message: \"{line}\": {err:?}");
189                                break;
190                            } else if let Ok(response) = response {
191                                info!("response status: {}", response.status());
192                                if response.status() == StatusCode::TOO_MANY_REQUESTS {
193                                    warn!("rate limited!...");
194                                    warn!("response text: {:?}", response.text());
195                                    sleep(Duration::from_secs(2));
196                                } else {
197                                    break;
198                                }
199                            }
200                        }
201                    }
202                }
203                NotificationChannel::Slack(webhook) => {
204                    let data = json!({ "text": msg });
205                    if let Err(err) = self.client.post(webhook).json(&data).send() {
206                        warn!("Failed to send Slack message: {err:?}");
207                    }
208                }
209                NotificationChannel::PagerDuty(routing_key) => {
210                    let event_action = match notification_type {
211                        NotificationType::Trigger { incident: _ } => String::from("trigger"),
212                        NotificationType::Resolve { incident: _ } => String::from("resolve"),
213                    };
214                    let dedup_key = match notification_type {
215                        NotificationType::Trigger { ref incident } => incident.clone().to_string(),
216                        NotificationType::Resolve { ref incident } => incident.clone().to_string(),
217                    };
218
219                    let data = json!({"payload":{"summary":msg,"source":"agave-watchtower","severity":"critical"},"routing_key":routing_key,"event_action":event_action,"dedup_key":dedup_key});
220                    let url = "https://events.pagerduty.com/v2/enqueue";
221
222                    if let Err(err) = self.client.post(url).json(&data).send() {
223                        warn!("Failed to send PagerDuty alert: {err:?}");
224                    }
225                }
226
227                NotificationChannel::Telegram(TelegramWebHook { chat_id, bot_token }) => {
228                    let data = json!({ "chat_id": chat_id, "text": msg });
229                    let url = format!("https://api.telegram.org/bot{bot_token}/sendMessage");
230
231                    if let Err(err) = self.client.post(url).json(&data).send() {
232                        warn!("Failed to send Telegram message: {err:?}");
233                    }
234                }
235
236                NotificationChannel::Twilio(TwilioWebHook {
237                    account,
238                    token,
239                    to,
240                    from,
241                }) => {
242                    let url = format!(
243                        "https://{account}:{token}@api.twilio.com/2010-04-01/Accounts/{account}/Messages.json"
244                    );
245                    let params = [("To", to), ("From", from), ("Body", &msg.to_string())];
246                    if let Err(err) = self.client.post(url).form(&params).send() {
247                        warn!("Failed to send Twilio message: {err:?}");
248                    }
249                }
250                NotificationChannel::Log(level) => {
251                    log!(*level, "{msg}")
252                }
253            }
254        }
255    }
256}