Skip to main content

ironflow_engine/notify/
betterstack.rs

1//! [`BetterStackSubscriber`] -- forwards error events to BetterStack Logs.
2
3use std::time::Duration;
4
5use serde::Serialize;
6use tokio::time::sleep;
7use tracing::{error, info, warn};
8
9use super::{Event, EventSubscriber, SubscriberFuture};
10
11/// Default timeout for outbound HTTP calls.
12const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
13
14/// Maximum number of retry attempts for failed deliveries.
15const MAX_RETRIES: u32 = 3;
16
17/// Base delay for exponential backoff (doubled each retry).
18const BASE_BACKOFF: Duration = Duration::from_millis(500);
19
20/// Default BetterStack Logs ingestion endpoint.
21const DEFAULT_INGEST_URL: &str = "https://in.logs.betterstack.com";
22
23/// Payload sent to BetterStack Logs API.
24#[derive(Debug, Serialize)]
25struct LogPayload {
26    /// ISO-8601 timestamp.
27    dt: String,
28    /// Log level (always "error" for this subscriber).
29    level: &'static str,
30    /// Human-readable message.
31    message: String,
32    /// Structured event data.
33    event: serde_json::Value,
34}
35
36/// Subscriber that forwards error events to [BetterStack Logs](https://betterstack.com/docs/logs/).
37///
38/// Only acts on events that represent failures:
39/// - [`Event::StepFailed`]
40/// - [`Event::RunFailed`]
41///
42/// All other events are silently ignored (filtering by event type
43/// should already be done at subscription time, but this subscriber
44/// adds an extra safety check).
45///
46/// Retries failed deliveries with exponential backoff (up to 3 attempts,
47/// 5 s timeout per attempt).
48///
49/// # Examples
50///
51/// ```no_run
52/// use ironflow_engine::notify::{BetterStackSubscriber, Event, EventPublisher};
53///
54/// let mut publisher = EventPublisher::new();
55/// publisher.subscribe(
56///     BetterStackSubscriber::new("my-source-token"),
57///     &[Event::STEP_FAILED, Event::RUN_FAILED],
58/// );
59/// ```
60pub struct BetterStackSubscriber {
61    source_token: String,
62    authorization_header: String,
63    ingest_url: String,
64    client: reqwest::Client,
65}
66
67impl BetterStackSubscriber {
68    /// Create a new subscriber with the given BetterStack source token.
69    ///
70    /// Uses the default ingestion endpoint (`https://in.logs.betterstack.com`).
71    ///
72    /// # Panics
73    ///
74    /// Panics if the HTTP client cannot be built (TLS backend unavailable).
75    ///
76    /// # Examples
77    ///
78    /// ```
79    /// use ironflow_engine::notify::BetterStackSubscriber;
80    ///
81    /// let subscriber = BetterStackSubscriber::new("my-source-token");
82    /// assert_eq!(subscriber.source_token(), "my-source-token");
83    /// ```
84    pub fn new(source_token: &str) -> Self {
85        Self::with_url(source_token, DEFAULT_INGEST_URL)
86    }
87
88    /// Create a subscriber with a custom ingestion URL.
89    ///
90    /// Useful for testing or self-hosted BetterStack instances.
91    ///
92    /// # Panics
93    ///
94    /// Panics if the HTTP client cannot be built (TLS backend unavailable).
95    ///
96    /// # Examples
97    ///
98    /// ```
99    /// use ironflow_engine::notify::BetterStackSubscriber;
100    ///
101    /// let subscriber = BetterStackSubscriber::with_url(
102    ///     "my-source-token",
103    ///     "https://custom.logs.example.com",
104    /// );
105    /// assert_eq!(subscriber.ingest_url(), "https://custom.logs.example.com");
106    /// ```
107    pub fn with_url(source_token: &str, ingest_url: &str) -> Self {
108        let client = reqwest::Client::builder()
109            .timeout(DEFAULT_TIMEOUT)
110            .build()
111            .expect("failed to build HTTP client");
112        Self {
113            authorization_header: format!("Bearer {}", source_token),
114            source_token: source_token.to_string(),
115            ingest_url: ingest_url.to_string(),
116            client,
117        }
118    }
119
120    /// Returns the source token.
121    pub fn source_token(&self) -> &str {
122        &self.source_token
123    }
124
125    /// Returns the ingestion URL.
126    pub fn ingest_url(&self) -> &str {
127        &self.ingest_url
128    }
129
130    /// Build a log payload from an error event. Returns `None` for non-error events.
131    fn build_payload(event: &Event) -> Option<LogPayload> {
132        match event {
133            Event::StepFailed {
134                run_id,
135                step_id,
136                step_name,
137                kind,
138                error,
139                at,
140            } => {
141                let message = format!(
142                    "Step '{}' ({}) failed on run {}: {}",
143                    step_name, kind, run_id, error
144                );
145                let event_json = serde_json::json!({
146                    "type": "step_failed",
147                    "run_id": run_id.to_string(),
148                    "step_id": step_id.to_string(),
149                    "step_name": step_name,
150                    "kind": kind.to_string(),
151                    "error": error,
152                });
153                Some(LogPayload {
154                    dt: at.to_rfc3339(),
155                    level: "error",
156                    message,
157                    event: event_json,
158                })
159            }
160            Event::RunFailed {
161                run_id,
162                workflow_name,
163                error,
164                cost_usd,
165                duration_ms,
166                at,
167            } => {
168                let error_detail = error.as_deref().unwrap_or("unknown error");
169                let message = format!(
170                    "Run {} (workflow '{}') failed: {}",
171                    run_id, workflow_name, error_detail
172                );
173                let event_json = serde_json::json!({
174                    "type": "run_failed",
175                    "run_id": run_id.to_string(),
176                    "workflow_name": workflow_name,
177                    "error": error_detail,
178                    "cost_usd": cost_usd.to_string(),
179                    "duration_ms": duration_ms,
180                });
181                Some(LogPayload {
182                    dt: at.to_rfc3339(),
183                    level: "error",
184                    message,
185                    event: event_json,
186                })
187            }
188            _ => None,
189        }
190    }
191
192    /// Deliver with retry + exponential backoff.
193    async fn deliver(&self, payload: &LogPayload) {
194        for attempt in 0..MAX_RETRIES {
195            let result = self
196                .client
197                .post(&self.ingest_url)
198                .header("Authorization", &self.authorization_header)
199                .json(payload)
200                .send()
201                .await;
202
203            match result {
204                Ok(resp) if resp.status().as_u16() == 202 => {
205                    info!(
206                        ingest_url = %self.ingest_url,
207                        message = %payload.message,
208                        "betterstack log delivered"
209                    );
210                    return;
211                }
212                Ok(resp) => {
213                    let status = resp.status();
214                    self.log_retry_or_fail(attempt, &payload.message, &format!("HTTP {status}"));
215                }
216                Err(err) => {
217                    self.log_retry_or_fail(attempt, &payload.message, &err.to_string());
218                }
219            }
220
221            if attempt + 1 < MAX_RETRIES {
222                let delay = BASE_BACKOFF * 2u32.pow(attempt);
223                sleep(delay).await;
224            }
225        }
226    }
227
228    fn log_retry_or_fail(&self, attempt: u32, message: &str, err_msg: &str) {
229        let remaining = MAX_RETRIES - attempt - 1;
230        if remaining > 0 {
231            warn!(
232                ingest_url = %self.ingest_url,
233                message,
234                attempt = attempt + 1,
235                remaining,
236                error = %err_msg,
237                "betterstack delivery failed, retrying"
238            );
239        } else {
240            error!(
241                ingest_url = %self.ingest_url,
242                message,
243                error = %err_msg,
244                "betterstack delivery failed after all retries"
245            );
246        }
247    }
248}
249
250impl EventSubscriber for BetterStackSubscriber {
251    fn name(&self) -> &str {
252        "betterstack"
253    }
254
255    fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
256        Box::pin(async move {
257            if let Some(payload) = Self::build_payload(event) {
258                self.deliver(&payload).await;
259            }
260        })
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267    use chrono::Utc;
268    use ironflow_store::models::{RunStatus, StepKind};
269    use rust_decimal::Decimal;
270    use uuid::Uuid;
271
272    #[test]
273    fn new_sets_default_ingest_url() {
274        let sub = BetterStackSubscriber::new("token-123");
275        assert_eq!(sub.source_token(), "token-123");
276        assert_eq!(sub.ingest_url(), DEFAULT_INGEST_URL);
277    }
278
279    #[test]
280    fn with_url_sets_custom_ingest_url() {
281        let sub = BetterStackSubscriber::with_url("token-123", "https://custom.example.com");
282        assert_eq!(sub.source_token(), "token-123");
283        assert_eq!(sub.ingest_url(), "https://custom.example.com");
284    }
285
286    #[test]
287    fn name_is_betterstack() {
288        let sub = BetterStackSubscriber::new("token");
289        assert_eq!(sub.name(), "betterstack");
290    }
291
292    #[test]
293    fn build_payload_step_failed() {
294        let event = Event::StepFailed {
295            run_id: Uuid::now_v7(),
296            step_id: Uuid::now_v7(),
297            step_name: "build".to_string(),
298            kind: StepKind::Shell,
299            error: "exit code 1".to_string(),
300            at: Utc::now(),
301        };
302
303        let payload = BetterStackSubscriber::build_payload(&event);
304        assert!(payload.is_some());
305        let payload = payload.unwrap();
306        assert_eq!(payload.level, "error");
307        assert!(payload.message.contains("build"));
308        assert!(payload.message.contains("exit code 1"));
309        assert_eq!(payload.event["type"], "step_failed");
310        assert_eq!(payload.event["error"], "exit code 1");
311    }
312
313    #[test]
314    fn build_payload_run_failed() {
315        let event = Event::RunFailed {
316            run_id: Uuid::now_v7(),
317            workflow_name: "deploy".to_string(),
318            error: Some("step 'build' failed".to_string()),
319            cost_usd: Decimal::new(42, 2),
320            duration_ms: 5000,
321            at: Utc::now(),
322        };
323
324        let payload = BetterStackSubscriber::build_payload(&event);
325        assert!(payload.is_some());
326        let payload = payload.unwrap();
327        assert_eq!(payload.level, "error");
328        assert!(payload.message.contains("deploy"));
329        assert!(payload.message.contains("step 'build' failed"));
330        assert_eq!(payload.event["type"], "run_failed");
331        assert_eq!(payload.event["workflow_name"], "deploy");
332    }
333
334    #[test]
335    fn build_payload_run_failed_without_error_message() {
336        let event = Event::RunFailed {
337            run_id: Uuid::now_v7(),
338            workflow_name: "deploy".to_string(),
339            error: None,
340            cost_usd: Decimal::ZERO,
341            duration_ms: 1000,
342            at: Utc::now(),
343        };
344
345        let payload = BetterStackSubscriber::build_payload(&event).unwrap();
346        assert!(payload.message.contains("unknown error"));
347        assert_eq!(payload.event["error"], "unknown error");
348    }
349
350    #[test]
351    fn build_payload_run_completed_returns_none() {
352        let event = Event::RunStatusChanged {
353            run_id: Uuid::now_v7(),
354            workflow_name: "deploy".to_string(),
355            from: RunStatus::Running,
356            to: RunStatus::Completed,
357            error: None,
358            cost_usd: Decimal::ZERO,
359            duration_ms: 1000,
360            at: Utc::now(),
361        };
362
363        assert!(BetterStackSubscriber::build_payload(&event).is_none());
364    }
365
366    #[test]
367    fn build_payload_run_created_returns_none() {
368        let event = Event::RunCreated {
369            run_id: Uuid::now_v7(),
370            workflow_name: "deploy".to_string(),
371            at: Utc::now(),
372        };
373
374        assert!(BetterStackSubscriber::build_payload(&event).is_none());
375    }
376
377    #[test]
378    fn build_payload_step_completed_returns_none() {
379        let event = Event::StepCompleted {
380            run_id: Uuid::now_v7(),
381            step_id: Uuid::now_v7(),
382            step_name: "build".to_string(),
383            kind: StepKind::Shell,
384            duration_ms: 500,
385            cost_usd: Decimal::ZERO,
386            at: Utc::now(),
387        };
388
389        assert!(BetterStackSubscriber::build_payload(&event).is_none());
390    }
391
392    #[test]
393    fn build_payload_approval_requested_returns_none() {
394        let event = Event::ApprovalRequested {
395            run_id: Uuid::now_v7(),
396            step_id: Uuid::now_v7(),
397            message: "Deploy to prod?".to_string(),
398            at: Utc::now(),
399        };
400
401        assert!(BetterStackSubscriber::build_payload(&event).is_none());
402    }
403
404    #[test]
405    fn build_payload_user_signed_in_returns_none() {
406        let event = Event::UserSignedIn {
407            user_id: Uuid::now_v7(),
408            username: "alice".to_string(),
409            at: Utc::now(),
410        };
411
412        assert!(BetterStackSubscriber::build_payload(&event).is_none());
413    }
414
415    #[tokio::test]
416    async fn handle_ignores_non_error_events() {
417        let sub = BetterStackSubscriber::with_url("token", "http://127.0.0.1:1");
418        let event = Event::RunCreated {
419            run_id: Uuid::now_v7(),
420            workflow_name: "deploy".to_string(),
421            at: Utc::now(),
422        };
423        // Should return immediately without attempting HTTP
424        sub.handle(&event).await;
425    }
426
427    #[tokio::test]
428    async fn deliver_to_real_endpoint_returns_202() {
429        use axum::Router;
430        use axum::http::StatusCode;
431        use axum::routing::post;
432        use tokio::net::TcpListener;
433
434        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
435        let addr = listener.local_addr().unwrap();
436
437        let app = Router::new().route("/", post(|| async { StatusCode::ACCEPTED }));
438
439        tokio::spawn(async move {
440            axum::serve(listener, app).await.unwrap();
441        });
442
443        let sub = BetterStackSubscriber::with_url("test-token", &format!("http://{}", addr));
444        let event = Event::StepFailed {
445            run_id: Uuid::now_v7(),
446            step_id: Uuid::now_v7(),
447            step_name: "build".to_string(),
448            kind: StepKind::Shell,
449            error: "exit code 1".to_string(),
450            at: Utc::now(),
451        };
452
453        sub.handle(&event).await;
454    }
455}