Skip to main content

rivet_cli/
notify.rs

1use crate::config::{NotificationsConfig, NotifyEvent};
2use crate::pipeline::RunSummary;
3
4pub fn maybe_send(config: Option<&NotificationsConfig>, summary: &RunSummary) {
5    let Some(cfg) = config else { return };
6    let Some(slack) = &cfg.slack else { return };
7
8    let triggers = &slack.on;
9    let should_send = triggers.iter().any(|t| match t {
10        NotifyEvent::Failure => summary.status == "failed",
11        NotifyEvent::SchemaChange => summary.schema_changed == Some(true),
12        NotifyEvent::Degraded => summary.status == "degraded",
13    });
14
15    if !should_send {
16        return;
17    }
18
19    let url = match (&slack.webhook_url, &slack.webhook_url_env) {
20        (Some(u), _) => u.clone(),
21        (None, Some(env)) => match std::env::var(env) {
22            Ok(u) => u,
23            Err(_) => {
24                log::warn!("slack notification skipped: env var '{}' not set", env);
25                return;
26            }
27        },
28        (None, None) => {
29            log::warn!("slack notification skipped: no webhook_url or webhook_url_env configured");
30            return;
31        }
32    };
33
34    let color = if summary.status == "failed" {
35        "#e74c3c"
36    } else {
37        "#f39c12"
38    };
39    let text = format!(
40        "*{}* | status: `{}` | rows: {} | duration: {}ms{}{}",
41        summary.export_name,
42        summary.status,
43        summary.total_rows,
44        summary.duration_ms,
45        summary
46            .error_message
47            .as_ref()
48            .map(|e| format!("\nerror: {}", e))
49            .unwrap_or_default(),
50        if summary.schema_changed == Some(true) {
51            "\nschema changed"
52        } else {
53            ""
54        },
55    );
56
57    let payload = serde_json::json!({
58        "attachments": [{
59            "color": color,
60            "title": format!("Rivet: {}", summary.run_id),
61            "text": text,
62            "footer": "rivet export notification",
63        }]
64    });
65
66    match reqwest::blocking::Client::builder()
67        .timeout(std::time::Duration::from_secs(10))
68        .build()
69        .and_then(|c| c.post(&url).json(&payload).send())
70    {
71        Ok(resp) if resp.status().is_success() => {
72            log::info!("slack notification sent for '{}'", summary.export_name);
73        }
74        Ok(resp) => {
75            log::warn!("slack notification failed: HTTP {}", resp.status());
76        }
77        Err(e) => {
78            log::warn!("slack notification failed: {}", e);
79        }
80    }
81}
82
83#[cfg(test)]
84mod tests {
85    use super::*;
86    use crate::config::{NotificationsConfig, NotifyEvent, SlackConfig};
87
88    fn stub_summary(status: &str, schema_changed: Option<bool>) -> RunSummary {
89        RunSummary {
90            run_id: "test_run".into(),
91            export_name: "test_export".into(),
92            status: status.into(),
93            total_rows: 100,
94            files_produced: 1,
95            bytes_written: 1024,
96            duration_ms: 500,
97            peak_rss_mb: 10,
98            retries: 0,
99            validated: None,
100            schema_changed,
101            quality_passed: None,
102            error_message: None,
103            tuning_profile: "balanced".into(),
104            batch_size: 10_000,
105            batch_size_memory_mb: None,
106            format: "parquet".into(),
107            mode: "full".into(),
108            compression: "zstd".into(),
109            source_count: None,
110            reconciled: None,
111        }
112    }
113
114    #[test]
115    fn no_config_does_nothing() {
116        maybe_send(None, &stub_summary("success", None));
117    }
118
119    #[test]
120    fn no_slack_does_nothing() {
121        let cfg = NotificationsConfig { slack: None };
122        maybe_send(Some(&cfg), &stub_summary("failed", None));
123    }
124
125    #[test]
126    fn success_does_not_trigger_failure() {
127        let cfg = NotificationsConfig {
128            slack: Some(SlackConfig {
129                webhook_url: Some("http://localhost:1/noop".into()),
130                webhook_url_env: None,
131                on: vec![NotifyEvent::Failure],
132            }),
133        };
134        // should_send = false, so no HTTP call
135        maybe_send(Some(&cfg), &stub_summary("success", None));
136    }
137
138    #[test]
139    fn degraded_triggers_degraded_event() {
140        let cfg = NotificationsConfig {
141            slack: Some(SlackConfig {
142                webhook_url: Some("http://127.0.0.1:1/noop".into()),
143                webhook_url_env: None,
144                on: vec![NotifyEvent::Degraded],
145            }),
146        };
147        // "degraded" status should match; the HTTP will fail silently (port 1)
148        maybe_send(Some(&cfg), &stub_summary("degraded", None));
149    }
150
151    #[test]
152    fn schema_change_triggers_schema_change_event() {
153        let cfg = NotificationsConfig {
154            slack: Some(SlackConfig {
155                webhook_url: Some("http://127.0.0.1:1/noop".into()),
156                webhook_url_env: None,
157                on: vec![NotifyEvent::SchemaChange],
158            }),
159        };
160        maybe_send(Some(&cfg), &stub_summary("success", Some(true)));
161    }
162
163    #[test]
164    fn schema_change_false_does_not_trigger() {
165        let cfg = NotificationsConfig {
166            slack: Some(SlackConfig {
167                webhook_url: Some("http://127.0.0.1:1/noop".into()),
168                webhook_url_env: None,
169                on: vec![NotifyEvent::SchemaChange],
170            }),
171        };
172        // schema_changed = Some(false) → should_send = false
173        maybe_send(Some(&cfg), &stub_summary("success", Some(false)));
174    }
175
176    #[test]
177    fn missing_webhook_url_env_skips_silently() {
178        unsafe {
179            std::env::remove_var("RIVET_TEST_SLACK_NONEXISTENT");
180        }
181        let cfg = NotificationsConfig {
182            slack: Some(SlackConfig {
183                webhook_url: None,
184                webhook_url_env: Some("RIVET_TEST_SLACK_NONEXISTENT".into()),
185                on: vec![NotifyEvent::Failure],
186            }),
187        };
188        // should_send = true, but env var missing → skips
189        maybe_send(Some(&cfg), &stub_summary("failed", None));
190    }
191
192    #[test]
193    fn no_webhook_url_and_no_env_skips() {
194        let cfg = NotificationsConfig {
195            slack: Some(SlackConfig {
196                webhook_url: None,
197                webhook_url_env: None,
198                on: vec![NotifyEvent::Failure],
199            }),
200        };
201        maybe_send(Some(&cfg), &stub_summary("failed", None));
202    }
203
204    #[test]
205    fn multiple_triggers_any_match_fires() {
206        let cfg = NotificationsConfig {
207            slack: Some(SlackConfig {
208                webhook_url: Some("http://127.0.0.1:1/noop".into()),
209                webhook_url_env: None,
210                on: vec![
211                    NotifyEvent::Failure,
212                    NotifyEvent::SchemaChange,
213                    NotifyEvent::Degraded,
214                ],
215            }),
216        };
217        // "degraded" matches the 3rd trigger
218        maybe_send(Some(&cfg), &stub_summary("degraded", None));
219    }
220
221    #[test]
222    fn error_message_included_in_stub() {
223        let mut s = stub_summary("failed", None);
224        s.error_message = Some("connection refused".into());
225        let cfg = NotificationsConfig {
226            slack: Some(SlackConfig {
227                webhook_url: Some("http://127.0.0.1:1/noop".into()),
228                webhook_url_env: None,
229                on: vec![NotifyEvent::Failure],
230            }),
231        };
232        maybe_send(Some(&cfg), &s);
233    }
234}