Skip to main content

ironflow_engine/notify/
betterstack.rs

1//! [`BetterStackSubscriber`] -- forwards error events to BetterStack Logs.
2
3use reqwest::Client;
4use serde::Serialize;
5
6use super::retry::{RetryConfig, deliver_with_retry, is_accepted_202};
7use super::{Event, EventSubscriber, SubscriberFuture};
8
9/// Default BetterStack Logs ingestion endpoint.
10const DEFAULT_INGEST_URL: &str = "https://in.logs.betterstack.com";
11
12/// Payload sent to BetterStack Logs API.
13#[derive(Debug, Serialize)]
14struct LogPayload {
15    /// ISO-8601 timestamp.
16    dt: String,
17    /// Log level (always "error" for this subscriber).
18    level: &'static str,
19    /// Human-readable message.
20    message: String,
21    /// Structured event data.
22    event: serde_json::Value,
23}
24
25/// Subscriber that forwards error events to [BetterStack Logs](https://betterstack.com/docs/logs/).
26///
27/// Only acts on events that represent failures:
28/// - [`Event::StepFailed`]
29/// - [`Event::RunFailed`]
30///
31/// All other events are silently ignored (filtering by event type
32/// should already be done at subscription time, but this subscriber
33/// adds an extra safety check).
34///
35/// Retries failed deliveries with exponential backoff (up to 3 attempts,
36/// 5 s timeout per attempt).
37///
38/// # Examples
39///
40/// ```no_run
41/// use ironflow_engine::notify::{BetterStackSubscriber, Event, EventPublisher};
42///
43/// let mut publisher = EventPublisher::new();
44/// publisher.subscribe(
45///     BetterStackSubscriber::new("my-source-token"),
46///     &[Event::STEP_FAILED, Event::RUN_FAILED],
47/// );
48/// ```
49pub struct BetterStackSubscriber {
50    source_token: String,
51    authorization_header: String,
52    ingest_url: String,
53    client: Client,
54    retry_config: RetryConfig,
55}
56
57impl BetterStackSubscriber {
58    /// Create a new subscriber with the given BetterStack source token.
59    ///
60    /// Uses the default ingestion endpoint (`https://in.logs.betterstack.com`)
61    /// and default [`RetryConfig`].
62    ///
63    /// # Panics
64    ///
65    /// Panics if the HTTP client cannot be built (TLS backend unavailable).
66    ///
67    /// # Examples
68    ///
69    /// ```
70    /// use ironflow_engine::notify::BetterStackSubscriber;
71    ///
72    /// let subscriber = BetterStackSubscriber::new("my-source-token");
73    /// assert_eq!(subscriber.source_token(), "my-source-token");
74    /// ```
75    pub fn new(source_token: &str) -> Self {
76        Self::with_url(source_token, DEFAULT_INGEST_URL)
77    }
78
79    /// Create a subscriber with a custom ingestion URL.
80    ///
81    /// Useful for testing or self-hosted BetterStack instances.
82    ///
83    /// # Panics
84    ///
85    /// Panics if the HTTP client cannot be built (TLS backend unavailable).
86    ///
87    /// # Examples
88    ///
89    /// ```
90    /// use ironflow_engine::notify::BetterStackSubscriber;
91    ///
92    /// let subscriber = BetterStackSubscriber::with_url(
93    ///     "my-source-token",
94    ///     "https://custom.logs.example.com",
95    /// );
96    /// assert_eq!(subscriber.ingest_url(), "https://custom.logs.example.com");
97    /// ```
98    pub fn with_url(source_token: &str, ingest_url: &str) -> Self {
99        Self::with_url_and_retry(source_token, ingest_url, RetryConfig::default())
100    }
101
102    /// Create a subscriber with a custom ingestion URL and retry configuration.
103    ///
104    /// # Panics
105    ///
106    /// Panics if the HTTP client cannot be built (TLS backend unavailable).
107    ///
108    /// # Examples
109    ///
110    /// ```
111    /// use ironflow_engine::notify::{BetterStackSubscriber, RetryConfig};
112    ///
113    /// let config = RetryConfig::new(
114    ///     5,
115    ///     std::time::Duration::from_secs(10),
116    ///     std::time::Duration::from_secs(1),
117    /// );
118    /// let subscriber = BetterStackSubscriber::with_url_and_retry(
119    ///     "my-source-token",
120    ///     "https://custom.logs.example.com",
121    ///     config,
122    /// );
123    /// ```
124    pub fn with_url_and_retry(
125        source_token: &str,
126        ingest_url: &str,
127        retry_config: RetryConfig,
128    ) -> Self {
129        let client = retry_config.build_client();
130        Self {
131            authorization_header: format!("Bearer {}", source_token),
132            source_token: source_token.to_string(),
133            ingest_url: ingest_url.to_string(),
134            client,
135            retry_config,
136        }
137    }
138
139    /// Returns the source token.
140    pub fn source_token(&self) -> &str {
141        &self.source_token
142    }
143
144    /// Returns the ingestion URL.
145    pub fn ingest_url(&self) -> &str {
146        &self.ingest_url
147    }
148
149    /// Build a log payload from an error event. Returns `None` for non-error events.
150    fn build_payload(event: &Event) -> Option<LogPayload> {
151        match event {
152            Event::StepFailed {
153                run_id,
154                step_id,
155                step_name,
156                kind,
157                error,
158                at,
159            } => {
160                let message = format!(
161                    "Step '{}' ({}) failed on run {}: {}",
162                    step_name, kind, run_id, error
163                );
164                let event_json = serde_json::json!({
165                    "type": "step_failed",
166                    "run_id": run_id.to_string(),
167                    "step_id": step_id.to_string(),
168                    "step_name": step_name,
169                    "kind": kind.to_string(),
170                    "error": error,
171                });
172                Some(LogPayload {
173                    dt: at.to_rfc3339(),
174                    level: "error",
175                    message,
176                    event: event_json,
177                })
178            }
179            Event::RunFailed {
180                run_id,
181                workflow_name,
182                error,
183                cost_usd,
184                duration_ms,
185                at,
186            } => {
187                let error_detail = error.as_deref().unwrap_or("unknown error");
188                let message = format!(
189                    "Run {} (workflow '{}') failed: {}",
190                    run_id, workflow_name, error_detail
191                );
192                let event_json = serde_json::json!({
193                    "type": "run_failed",
194                    "run_id": run_id.to_string(),
195                    "workflow_name": workflow_name,
196                    "error": error_detail,
197                    "cost_usd": cost_usd.to_string(),
198                    "duration_ms": duration_ms,
199                });
200                Some(LogPayload {
201                    dt: at.to_rfc3339(),
202                    level: "error",
203                    message,
204                    event: event_json,
205                })
206            }
207            _ => None,
208        }
209    }
210}
211
212impl EventSubscriber for BetterStackSubscriber {
213    fn name(&self) -> &str {
214        "betterstack"
215    }
216
217    fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
218        Box::pin(async move {
219            if let Some(payload) = Self::build_payload(event) {
220                deliver_with_retry(
221                    &self.retry_config,
222                    || {
223                        self.client
224                            .post(&self.ingest_url)
225                            .header("Authorization", &self.authorization_header)
226                            .json(&payload)
227                    },
228                    is_accepted_202,
229                    "betterstack",
230                    &payload.message,
231                )
232                .await;
233            }
234        })
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241    use chrono::Utc;
242    use ironflow_store::models::{RunStatus, StepKind};
243    use rust_decimal::Decimal;
244    use uuid::Uuid;
245
246    #[test]
247    fn new_sets_default_ingest_url() {
248        let sub = BetterStackSubscriber::new("token-123");
249        assert_eq!(sub.source_token(), "token-123");
250        assert_eq!(sub.ingest_url(), DEFAULT_INGEST_URL);
251    }
252
253    #[test]
254    fn with_url_sets_custom_ingest_url() {
255        let sub = BetterStackSubscriber::with_url("token-123", "https://custom.example.com");
256        assert_eq!(sub.source_token(), "token-123");
257        assert_eq!(sub.ingest_url(), "https://custom.example.com");
258    }
259
260    #[test]
261    fn name_is_betterstack() {
262        let sub = BetterStackSubscriber::new("token");
263        assert_eq!(sub.name(), "betterstack");
264    }
265
266    #[test]
267    fn build_payload_step_failed() {
268        let event = Event::StepFailed {
269            run_id: Uuid::now_v7(),
270            step_id: Uuid::now_v7(),
271            step_name: "build".to_string(),
272            kind: StepKind::Shell,
273            error: "exit code 1".to_string(),
274            at: Utc::now(),
275        };
276
277        let payload = BetterStackSubscriber::build_payload(&event);
278        assert!(payload.is_some());
279        let payload = payload.unwrap();
280        assert_eq!(payload.level, "error");
281        assert!(payload.message.contains("build"));
282        assert!(payload.message.contains("exit code 1"));
283        assert_eq!(payload.event["type"], "step_failed");
284        assert_eq!(payload.event["error"], "exit code 1");
285    }
286
287    #[test]
288    fn build_payload_run_failed() {
289        let event = Event::RunFailed {
290            run_id: Uuid::now_v7(),
291            workflow_name: "deploy".to_string(),
292            error: Some("step 'build' failed".to_string()),
293            cost_usd: Decimal::new(42, 2),
294            duration_ms: 5000,
295            at: Utc::now(),
296        };
297
298        let payload = BetterStackSubscriber::build_payload(&event);
299        assert!(payload.is_some());
300        let payload = payload.unwrap();
301        assert_eq!(payload.level, "error");
302        assert!(payload.message.contains("deploy"));
303        assert!(payload.message.contains("step 'build' failed"));
304        assert_eq!(payload.event["type"], "run_failed");
305        assert_eq!(payload.event["workflow_name"], "deploy");
306    }
307
308    #[test]
309    fn build_payload_run_failed_without_error_message() {
310        let event = Event::RunFailed {
311            run_id: Uuid::now_v7(),
312            workflow_name: "deploy".to_string(),
313            error: None,
314            cost_usd: Decimal::ZERO,
315            duration_ms: 1000,
316            at: Utc::now(),
317        };
318
319        let payload = BetterStackSubscriber::build_payload(&event).unwrap();
320        assert!(payload.message.contains("unknown error"));
321        assert_eq!(payload.event["error"], "unknown error");
322    }
323
324    #[test]
325    fn build_payload_run_completed_returns_none() {
326        let event = Event::RunStatusChanged {
327            run_id: Uuid::now_v7(),
328            workflow_name: "deploy".to_string(),
329            from: RunStatus::Running,
330            to: RunStatus::Completed,
331            error: None,
332            cost_usd: Decimal::ZERO,
333            duration_ms: 1000,
334            at: Utc::now(),
335        };
336
337        assert!(BetterStackSubscriber::build_payload(&event).is_none());
338    }
339
340    #[test]
341    fn build_payload_run_created_returns_none() {
342        let event = Event::RunCreated {
343            run_id: Uuid::now_v7(),
344            workflow_name: "deploy".to_string(),
345            at: Utc::now(),
346        };
347
348        assert!(BetterStackSubscriber::build_payload(&event).is_none());
349    }
350
351    #[test]
352    fn build_payload_step_completed_returns_none() {
353        let event = Event::StepCompleted {
354            run_id: Uuid::now_v7(),
355            step_id: Uuid::now_v7(),
356            step_name: "build".to_string(),
357            kind: StepKind::Shell,
358            duration_ms: 500,
359            cost_usd: Decimal::ZERO,
360            at: Utc::now(),
361        };
362
363        assert!(BetterStackSubscriber::build_payload(&event).is_none());
364    }
365
366    #[test]
367    fn build_payload_approval_requested_returns_none() {
368        let event = Event::ApprovalRequested {
369            run_id: Uuid::now_v7(),
370            step_id: Uuid::now_v7(),
371            message: "Deploy to prod?".to_string(),
372            at: Utc::now(),
373        };
374
375        assert!(BetterStackSubscriber::build_payload(&event).is_none());
376    }
377
378    #[test]
379    fn build_payload_user_signed_in_returns_none() {
380        let event = Event::UserSignedIn {
381            user_id: Uuid::now_v7(),
382            username: "alice".to_string(),
383            at: Utc::now(),
384        };
385
386        assert!(BetterStackSubscriber::build_payload(&event).is_none());
387    }
388
389    #[tokio::test]
390    async fn handle_ignores_non_error_events() {
391        let sub = BetterStackSubscriber::with_url("token", "http://127.0.0.1:1");
392        let event = Event::RunCreated {
393            run_id: Uuid::now_v7(),
394            workflow_name: "deploy".to_string(),
395            at: Utc::now(),
396        };
397        // Should return immediately without attempting HTTP
398        sub.handle(&event).await;
399    }
400
401    #[tokio::test]
402    async fn deliver_to_real_endpoint_returns_202() {
403        use axum::Router;
404        use axum::http::StatusCode;
405        use axum::routing::post;
406        use tokio::net::TcpListener;
407
408        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
409        let addr = listener.local_addr().unwrap();
410
411        let app = Router::new().route("/", post(|| async { StatusCode::ACCEPTED }));
412
413        tokio::spawn(async move {
414            axum::serve(listener, app).await.unwrap();
415        });
416
417        let sub = BetterStackSubscriber::with_url("test-token", &format!("http://{}", addr));
418        let event = Event::StepFailed {
419            run_id: Uuid::now_v7(),
420            step_id: Uuid::now_v7(),
421            step_name: "build".to_string(),
422            kind: StepKind::Shell,
423            error: "exit code 1".to_string(),
424            at: Utc::now(),
425        };
426
427        sub.handle(&event).await;
428    }
429}