1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use crossbeam::channel::{bounded, Sender};
use lazy_static::lazy_static;
use log::error;
use parking_lot::RwLock;
use serde_json::Value;
use std::{sync::Arc, time::Duration};
use crate::utils::DeliverReceipt;
lazy_static! {
pub(crate) static ref PAGERDUTY_INTEGRATION_KEY: Arc<RwLock<Option<String>>> =
Default::default();
pub(crate) static ref HUB: RwLock<Hub> = Default::default();
}
const BUFFER_SIZE: usize = 100;
pub(crate) enum Message {
Alert(Value, DeliverReceipt),
Terminate(DeliverReceipt),
}
pub(crate) enum Hub {
Empty,
Configured {
routing_key: String,
sender: Sender<Message>,
},
}
impl Default for Hub {
fn default() -> Self {
Self::Empty
}
}
impl Hub {
pub(crate) fn dispatch_and_block<F: Fn() -> Value>(&self, f: F) {
if let Some(receipt) = self.dispatch(f) {
receipt.wait()
}
}
pub(crate) fn dispatch<F: Fn() -> Value>(&self, f: F) -> Option<DeliverReceipt> {
if let Hub::Configured {
sender,
routing_key,
} = self
{
let mut dispatched = f();
dispatched["routing_key"] = Value::String(routing_key.clone());
let receipt = DeliverReceipt::default();
if sender
.try_send(Message::Alert(dispatched, receipt.clone()))
.is_err()
{
error!("Failed sending airbag alert: buffer is full");
}
Some(receipt)
} else {
None
}
}
}
#[must_use = "Airbag guard must be stored to flush messages on program end"]
pub fn configure_pagerduty(routing_key: impl Into<String>) -> AirbagGuard {
let (sender, receiver) = bounded(BUFFER_SIZE);
let guard = AirbagGuard {
sender: sender.clone(),
};
*HUB.write() = Hub::Configured {
routing_key: routing_key.into(),
sender,
};
std::thread::spawn(move || {
let client = reqwest::blocking::Client::builder()
.timeout(Duration::from_secs(10))
.build()
.expect("Can't create HTTP client");
while let Some(event) = receiver.recv().ok() {
match &event {
Message::Alert(alert, receipt) => {
log::debug!("Got PD alert to send");
while let Err(e) = client
.post("https://events.pagerduty.com/v2/enqueue")
.json(alert)
.send()
.and_then(|resp| resp.error_for_status())
{
error!("Failed dispatching PD event ({:?}). Going to retry...", e);
std::thread::sleep(Duration::from_secs(5));
}
log::info!("Sent successfully");
receipt.signal()
}
Message::Terminate(receipt) => {
receipt.signal();
break;
}
}
}
});
crate::panic_handler::install();
guard
}
pub struct AirbagGuard {
sender: Sender<Message>,
}
impl Drop for AirbagGuard {
fn drop(&mut self) {
let receipt = DeliverReceipt::default();
let _ = self.sender.send(Message::Terminate(receipt.clone()));
log::info!("Waiting for Airbag message to flush...");
receipt.wait()
}
}