Skip to main content

ironflow_engine/notify/
betterstack.rs

1//! [`BetterStackSubscriber`] -- forwards error events to BetterStack Logs.
2
3use std::time::Duration;
4
5use serde::Serialize;
6use tokio::time::sleep;
7use tracing::{error, info, warn};
8
9use ironflow_store::models::RunStatus;
10
11use super::{Event, EventSubscriber, SubscriberFuture};
12
13/// Default timeout for outbound HTTP calls.
14const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
15
16/// Maximum number of retry attempts for failed deliveries.
17const MAX_RETRIES: u32 = 3;
18
19/// Base delay for exponential backoff (doubled each retry).
20const BASE_BACKOFF: Duration = Duration::from_millis(500);
21
22/// Default BetterStack Logs ingestion endpoint.
23const DEFAULT_INGEST_URL: &str = "https://in.logs.betterstack.com";
24
25/// Payload sent to BetterStack Logs API.
26#[derive(Debug, Serialize)]
27struct LogPayload {
28    /// ISO-8601 timestamp.
29    dt: String,
30    /// Log level (always "error" for this subscriber).
31    level: &'static str,
32    /// Human-readable message.
33    message: String,
34    /// Structured event data.
35    event: serde_json::Value,
36}
37
38/// Subscriber that forwards error events to [BetterStack Logs](https://betterstack.com/docs/logs/).
39///
40/// Only acts on events that represent failures:
41/// - [`Event::StepFailed`]
42/// - [`Event::RunStatusChanged`] when `to` is [`Failed`](ironflow_store::models::RunStatus::Failed)
43///
44/// All other events are silently ignored (filtering by event type
45/// should already be done at subscription time, but this subscriber
46/// adds an extra safety check).
47///
48/// Retries failed deliveries with exponential backoff (up to 3 attempts,
49/// 5 s timeout per attempt).
50///
51/// # Examples
52///
53/// ```no_run
54/// use ironflow_engine::notify::{BetterStackSubscriber, Event, EventPublisher};
55///
56/// let mut publisher = EventPublisher::new();
57/// publisher.subscribe(
58///     BetterStackSubscriber::new("my-source-token"),
59///     &[Event::STEP_FAILED, Event::RUN_STATUS_CHANGED],
60/// );
61/// ```
62pub struct BetterStackSubscriber {
63    source_token: String,
64    authorization_header: String,
65    ingest_url: String,
66    client: reqwest::Client,
67}
68
69impl BetterStackSubscriber {
70    /// Create a new subscriber with the given BetterStack source token.
71    ///
72    /// Uses the default ingestion endpoint (`https://in.logs.betterstack.com`).
73    ///
74    /// # Panics
75    ///
76    /// Panics if the HTTP client cannot be built (TLS backend unavailable).
77    ///
78    /// # Examples
79    ///
80    /// ```
81    /// use ironflow_engine::notify::BetterStackSubscriber;
82    ///
83    /// let subscriber = BetterStackSubscriber::new("my-source-token");
84    /// assert_eq!(subscriber.source_token(), "my-source-token");
85    /// ```
86    pub fn new(source_token: &str) -> Self {
87        Self::with_url(source_token, DEFAULT_INGEST_URL)
88    }
89
90    /// Create a subscriber with a custom ingestion URL.
91    ///
92    /// Useful for testing or self-hosted BetterStack instances.
93    ///
94    /// # Panics
95    ///
96    /// Panics if the HTTP client cannot be built (TLS backend unavailable).
97    ///
98    /// # Examples
99    ///
100    /// ```
101    /// use ironflow_engine::notify::BetterStackSubscriber;
102    ///
103    /// let subscriber = BetterStackSubscriber::with_url(
104    ///     "my-source-token",
105    ///     "https://custom.logs.example.com",
106    /// );
107    /// assert_eq!(subscriber.ingest_url(), "https://custom.logs.example.com");
108    /// ```
109    pub fn with_url(source_token: &str, ingest_url: &str) -> Self {
110        let client = reqwest::Client::builder()
111            .timeout(DEFAULT_TIMEOUT)
112            .build()
113            .expect("failed to build HTTP client");
114        Self {
115            authorization_header: format!("Bearer {}", source_token),
116            source_token: source_token.to_string(),
117            ingest_url: ingest_url.to_string(),
118            client,
119        }
120    }
121
122    /// Returns the source token.
123    pub fn source_token(&self) -> &str {
124        &self.source_token
125    }
126
127    /// Returns the ingestion URL.
128    pub fn ingest_url(&self) -> &str {
129        &self.ingest_url
130    }
131
132    /// Build a log payload from an error event. Returns `None` for non-error events.
133    fn build_payload(event: &Event) -> Option<LogPayload> {
134        match event {
135            Event::StepFailed {
136                run_id,
137                step_id,
138                step_name,
139                kind,
140                error,
141                at,
142            } => {
143                let message = format!(
144                    "Step '{}' ({}) failed on run {}: {}",
145                    step_name, kind, run_id, error
146                );
147                let event_json = serde_json::json!({
148                    "type": "step_failed",
149                    "run_id": run_id.to_string(),
150                    "step_id": step_id.to_string(),
151                    "step_name": step_name,
152                    "kind": kind.to_string(),
153                    "error": error,
154                });
155                Some(LogPayload {
156                    dt: at.to_rfc3339(),
157                    level: "error",
158                    message,
159                    event: event_json,
160                })
161            }
162            Event::RunStatusChanged {
163                run_id,
164                workflow_name,
165                to: RunStatus::Failed,
166                error,
167                cost_usd,
168                duration_ms,
169                at,
170                ..
171            } => {
172                let error_detail = error.as_deref().unwrap_or("unknown error");
173                let message = format!(
174                    "Run {} (workflow '{}') failed: {}",
175                    run_id, workflow_name, error_detail
176                );
177                let event_json = serde_json::json!({
178                    "type": "run_failed",
179                    "run_id": run_id.to_string(),
180                    "workflow_name": workflow_name,
181                    "error": error_detail,
182                    "cost_usd": cost_usd.to_string(),
183                    "duration_ms": duration_ms,
184                });
185                Some(LogPayload {
186                    dt: at.to_rfc3339(),
187                    level: "error",
188                    message,
189                    event: event_json,
190                })
191            }
192            _ => None,
193        }
194    }
195
196    /// Deliver with retry + exponential backoff.
197    async fn deliver(&self, payload: &LogPayload) {
198        for attempt in 0..MAX_RETRIES {
199            let result = self
200                .client
201                .post(&self.ingest_url)
202                .header("Authorization", &self.authorization_header)
203                .json(payload)
204                .send()
205                .await;
206
207            match result {
208                Ok(resp) if resp.status().as_u16() == 202 => {
209                    info!(
210                        ingest_url = %self.ingest_url,
211                        message = %payload.message,
212                        "betterstack log delivered"
213                    );
214                    return;
215                }
216                Ok(resp) => {
217                    let status = resp.status();
218                    self.log_retry_or_fail(attempt, &payload.message, &format!("HTTP {status}"));
219                }
220                Err(err) => {
221                    self.log_retry_or_fail(attempt, &payload.message, &err.to_string());
222                }
223            }
224
225            if attempt + 1 < MAX_RETRIES {
226                let delay = BASE_BACKOFF * 2u32.pow(attempt);
227                sleep(delay).await;
228            }
229        }
230    }
231
232    fn log_retry_or_fail(&self, attempt: u32, message: &str, err_msg: &str) {
233        let remaining = MAX_RETRIES - attempt - 1;
234        if remaining > 0 {
235            warn!(
236                ingest_url = %self.ingest_url,
237                message,
238                attempt = attempt + 1,
239                remaining,
240                error = %err_msg,
241                "betterstack delivery failed, retrying"
242            );
243        } else {
244            error!(
245                ingest_url = %self.ingest_url,
246                message,
247                error = %err_msg,
248                "betterstack delivery failed after all retries"
249            );
250        }
251    }
252}
253
254impl EventSubscriber for BetterStackSubscriber {
255    fn name(&self) -> &str {
256        "betterstack"
257    }
258
259    fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
260        Box::pin(async move {
261            if let Some(payload) = Self::build_payload(event) {
262                self.deliver(&payload).await;
263            }
264        })
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271    use chrono::Utc;
272    use ironflow_store::models::{RunStatus, StepKind};
273    use rust_decimal::Decimal;
274    use uuid::Uuid;
275
276    #[test]
277    fn new_sets_default_ingest_url() {
278        let sub = BetterStackSubscriber::new("token-123");
279        assert_eq!(sub.source_token(), "token-123");
280        assert_eq!(sub.ingest_url(), DEFAULT_INGEST_URL);
281    }
282
283    #[test]
284    fn with_url_sets_custom_ingest_url() {
285        let sub = BetterStackSubscriber::with_url("token-123", "https://custom.example.com");
286        assert_eq!(sub.source_token(), "token-123");
287        assert_eq!(sub.ingest_url(), "https://custom.example.com");
288    }
289
290    #[test]
291    fn name_is_betterstack() {
292        let sub = BetterStackSubscriber::new("token");
293        assert_eq!(sub.name(), "betterstack");
294    }
295
296    #[test]
297    fn build_payload_step_failed() {
298        let event = Event::StepFailed {
299            run_id: Uuid::now_v7(),
300            step_id: Uuid::now_v7(),
301            step_name: "build".to_string(),
302            kind: StepKind::Shell,
303            error: "exit code 1".to_string(),
304            at: Utc::now(),
305        };
306
307        let payload = BetterStackSubscriber::build_payload(&event);
308        assert!(payload.is_some());
309        let payload = payload.unwrap();
310        assert_eq!(payload.level, "error");
311        assert!(payload.message.contains("build"));
312        assert!(payload.message.contains("exit code 1"));
313        assert_eq!(payload.event["type"], "step_failed");
314        assert_eq!(payload.event["error"], "exit code 1");
315    }
316
317    #[test]
318    fn build_payload_run_failed() {
319        let event = Event::RunStatusChanged {
320            run_id: Uuid::now_v7(),
321            workflow_name: "deploy".to_string(),
322            from: RunStatus::Running,
323            to: RunStatus::Failed,
324            error: Some("step 'build' failed".to_string()),
325            cost_usd: Decimal::new(42, 2),
326            duration_ms: 5000,
327            at: Utc::now(),
328        };
329
330        let payload = BetterStackSubscriber::build_payload(&event);
331        assert!(payload.is_some());
332        let payload = payload.unwrap();
333        assert_eq!(payload.level, "error");
334        assert!(payload.message.contains("deploy"));
335        assert!(payload.message.contains("step 'build' failed"));
336        assert_eq!(payload.event["type"], "run_failed");
337        assert_eq!(payload.event["workflow_name"], "deploy");
338    }
339
340    #[test]
341    fn build_payload_run_failed_without_error_message() {
342        let event = Event::RunStatusChanged {
343            run_id: Uuid::now_v7(),
344            workflow_name: "deploy".to_string(),
345            from: RunStatus::Running,
346            to: RunStatus::Failed,
347            error: None,
348            cost_usd: Decimal::ZERO,
349            duration_ms: 1000,
350            at: Utc::now(),
351        };
352
353        let payload = BetterStackSubscriber::build_payload(&event).unwrap();
354        assert!(payload.message.contains("unknown error"));
355        assert_eq!(payload.event["error"], "unknown error");
356    }
357
358    #[test]
359    fn build_payload_run_completed_returns_none() {
360        let event = Event::RunStatusChanged {
361            run_id: Uuid::now_v7(),
362            workflow_name: "deploy".to_string(),
363            from: RunStatus::Running,
364            to: RunStatus::Completed,
365            error: None,
366            cost_usd: Decimal::ZERO,
367            duration_ms: 1000,
368            at: Utc::now(),
369        };
370
371        assert!(BetterStackSubscriber::build_payload(&event).is_none());
372    }
373
374    #[test]
375    fn build_payload_run_created_returns_none() {
376        let event = Event::RunCreated {
377            run_id: Uuid::now_v7(),
378            workflow_name: "deploy".to_string(),
379            at: Utc::now(),
380        };
381
382        assert!(BetterStackSubscriber::build_payload(&event).is_none());
383    }
384
385    #[test]
386    fn build_payload_step_completed_returns_none() {
387        let event = Event::StepCompleted {
388            run_id: Uuid::now_v7(),
389            step_id: Uuid::now_v7(),
390            step_name: "build".to_string(),
391            kind: StepKind::Shell,
392            duration_ms: 500,
393            cost_usd: Decimal::ZERO,
394            at: Utc::now(),
395        };
396
397        assert!(BetterStackSubscriber::build_payload(&event).is_none());
398    }
399
400    #[test]
401    fn build_payload_approval_requested_returns_none() {
402        let event = Event::ApprovalRequested {
403            run_id: Uuid::now_v7(),
404            step_id: Uuid::now_v7(),
405            message: "Deploy to prod?".to_string(),
406            at: Utc::now(),
407        };
408
409        assert!(BetterStackSubscriber::build_payload(&event).is_none());
410    }
411
412    #[test]
413    fn build_payload_user_signed_in_returns_none() {
414        let event = Event::UserSignedIn {
415            user_id: Uuid::now_v7(),
416            username: "alice".to_string(),
417            at: Utc::now(),
418        };
419
420        assert!(BetterStackSubscriber::build_payload(&event).is_none());
421    }
422
423    #[tokio::test]
424    async fn handle_ignores_non_error_events() {
425        let sub = BetterStackSubscriber::with_url("token", "http://127.0.0.1:1");
426        let event = Event::RunCreated {
427            run_id: Uuid::now_v7(),
428            workflow_name: "deploy".to_string(),
429            at: Utc::now(),
430        };
431        // Should return immediately without attempting HTTP
432        sub.handle(&event).await;
433    }
434
435    #[tokio::test]
436    async fn deliver_to_real_endpoint_returns_202() {
437        use axum::Router;
438        use axum::http::StatusCode;
439        use axum::routing::post;
440        use tokio::net::TcpListener;
441
442        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
443        let addr = listener.local_addr().unwrap();
444
445        let app = Router::new().route("/", post(|| async { StatusCode::ACCEPTED }));
446
447        tokio::spawn(async move {
448            axum::serve(listener, app).await.unwrap();
449        });
450
451        let sub = BetterStackSubscriber::with_url("test-token", &format!("http://{}", addr));
452        let event = Event::StepFailed {
453            run_id: Uuid::now_v7(),
454            step_id: Uuid::now_v7(),
455            step_name: "build".to_string(),
456            kind: StepKind::Shell,
457            error: "exit code 1".to_string(),
458            at: Utc::now(),
459        };
460
461        sub.handle(&event).await;
462    }
463}