1use crate::config::{NotificationsConfig, NotifyEvent};
2use crate::pipeline::RunSummary;
3
4pub fn maybe_send(config: Option<&NotificationsConfig>, summary: &RunSummary) {
5 let Some(cfg) = config else { return };
6 let Some(slack) = &cfg.slack else { return };
7
8 let triggers = &slack.on;
9 let should_send = triggers.iter().any(|t| match t {
10 NotifyEvent::Failure => summary.status == "failed",
11 NotifyEvent::SchemaChange => summary.schema_changed == Some(true),
12 NotifyEvent::Degraded => summary.status == "degraded",
13 });
14
15 if !should_send {
16 return;
17 }
18
19 let url = match (&slack.webhook_url, &slack.webhook_url_env) {
20 (Some(u), _) => u.clone(),
21 (None, Some(env)) => match std::env::var(env) {
22 Ok(u) => u,
23 Err(_) => {
24 log::warn!("slack notification skipped: env var '{}' not set", env);
25 return;
26 }
27 },
28 (None, None) => {
29 log::warn!("slack notification skipped: no webhook_url or webhook_url_env configured");
30 return;
31 }
32 };
33
34 let color = if summary.status == "failed" {
35 "#e74c3c"
36 } else {
37 "#f39c12"
38 };
39 let text = format!(
40 "*{}* | status: `{}` | rows: {} | duration: {}ms{}{}",
41 summary.export_name,
42 summary.status,
43 summary.total_rows,
44 summary.duration_ms,
45 summary
46 .error_message
47 .as_ref()
48 .map(|e| format!("\nerror: {}", e))
49 .unwrap_or_default(),
50 if summary.schema_changed == Some(true) {
51 "\nschema changed"
52 } else {
53 ""
54 },
55 );
56
57 let payload = serde_json::json!({
58 "attachments": [{
59 "color": color,
60 "title": format!("Rivet: {}", summary.run_id),
61 "text": text,
62 "footer": "rivet export notification",
63 }]
64 });
65
66 match reqwest::blocking::Client::builder()
67 .timeout(std::time::Duration::from_secs(10))
68 .build()
69 .and_then(|c| c.post(&url).json(&payload).send())
70 {
71 Ok(resp) if resp.status().is_success() => {
72 log::info!("slack notification sent for '{}'", summary.export_name);
73 }
74 Ok(resp) => {
75 log::warn!("slack notification failed: HTTP {}", resp.status());
76 }
77 Err(e) => {
78 log::warn!("slack notification failed: {}", e);
79 }
80 }
81}
82
83#[cfg(test)]
84mod tests {
85 use super::*;
86 use crate::config::{NotificationsConfig, NotifyEvent, SlackConfig};
87
88 fn stub_summary(status: &str, schema_changed: Option<bool>) -> RunSummary {
89 RunSummary {
90 run_id: "test_run".into(),
91 export_name: "test_export".into(),
92 status: status.into(),
93 total_rows: 100,
94 files_produced: 1,
95 bytes_written: 1024,
96 duration_ms: 500,
97 peak_rss_mb: 10,
98 retries: 0,
99 validated: None,
100 schema_changed,
101 quality_passed: None,
102 error_message: None,
103 tuning_profile: "balanced".into(),
104 batch_size: 10_000,
105 batch_size_memory_mb: None,
106 format: "parquet".into(),
107 mode: "full".into(),
108 compression: "zstd".into(),
109 source_count: None,
110 reconciled: None,
111 }
112 }
113
114 #[test]
115 fn no_config_does_nothing() {
116 maybe_send(None, &stub_summary("success", None));
117 }
118
119 #[test]
120 fn no_slack_does_nothing() {
121 let cfg = NotificationsConfig { slack: None };
122 maybe_send(Some(&cfg), &stub_summary("failed", None));
123 }
124
125 #[test]
126 fn success_does_not_trigger_failure() {
127 let cfg = NotificationsConfig {
128 slack: Some(SlackConfig {
129 webhook_url: Some("http://localhost:1/noop".into()),
130 webhook_url_env: None,
131 on: vec![NotifyEvent::Failure],
132 }),
133 };
134 maybe_send(Some(&cfg), &stub_summary("success", None));
136 }
137
138 #[test]
139 fn degraded_triggers_degraded_event() {
140 let cfg = NotificationsConfig {
141 slack: Some(SlackConfig {
142 webhook_url: Some("http://127.0.0.1:1/noop".into()),
143 webhook_url_env: None,
144 on: vec![NotifyEvent::Degraded],
145 }),
146 };
147 maybe_send(Some(&cfg), &stub_summary("degraded", None));
149 }
150
151 #[test]
152 fn schema_change_triggers_schema_change_event() {
153 let cfg = NotificationsConfig {
154 slack: Some(SlackConfig {
155 webhook_url: Some("http://127.0.0.1:1/noop".into()),
156 webhook_url_env: None,
157 on: vec![NotifyEvent::SchemaChange],
158 }),
159 };
160 maybe_send(Some(&cfg), &stub_summary("success", Some(true)));
161 }
162
163 #[test]
164 fn schema_change_false_does_not_trigger() {
165 let cfg = NotificationsConfig {
166 slack: Some(SlackConfig {
167 webhook_url: Some("http://127.0.0.1:1/noop".into()),
168 webhook_url_env: None,
169 on: vec![NotifyEvent::SchemaChange],
170 }),
171 };
172 maybe_send(Some(&cfg), &stub_summary("success", Some(false)));
174 }
175
176 #[test]
177 fn missing_webhook_url_env_skips_silently() {
178 unsafe {
179 std::env::remove_var("RIVET_TEST_SLACK_NONEXISTENT");
180 }
181 let cfg = NotificationsConfig {
182 slack: Some(SlackConfig {
183 webhook_url: None,
184 webhook_url_env: Some("RIVET_TEST_SLACK_NONEXISTENT".into()),
185 on: vec![NotifyEvent::Failure],
186 }),
187 };
188 maybe_send(Some(&cfg), &stub_summary("failed", None));
190 }
191
192 #[test]
193 fn no_webhook_url_and_no_env_skips() {
194 let cfg = NotificationsConfig {
195 slack: Some(SlackConfig {
196 webhook_url: None,
197 webhook_url_env: None,
198 on: vec![NotifyEvent::Failure],
199 }),
200 };
201 maybe_send(Some(&cfg), &stub_summary("failed", None));
202 }
203
204 #[test]
205 fn multiple_triggers_any_match_fires() {
206 let cfg = NotificationsConfig {
207 slack: Some(SlackConfig {
208 webhook_url: Some("http://127.0.0.1:1/noop".into()),
209 webhook_url_env: None,
210 on: vec![
211 NotifyEvent::Failure,
212 NotifyEvent::SchemaChange,
213 NotifyEvent::Degraded,
214 ],
215 }),
216 };
217 maybe_send(Some(&cfg), &stub_summary("degraded", None));
219 }
220
221 #[test]
222 fn error_message_included_in_stub() {
223 let mut s = stub_summary("failed", None);
224 s.error_message = Some("connection refused".into());
225 let cfg = NotificationsConfig {
226 slack: Some(SlackConfig {
227 webhook_url: Some("http://127.0.0.1:1/noop".into()),
228 webhook_url_env: None,
229 on: vec![NotifyEvent::Failure],
230 }),
231 };
232 maybe_send(Some(&cfg), &s);
233 }
234}