1use std::time::Duration;
4
5use serde::Serialize;
6use tokio::time::sleep;
7use tracing::{error, info, warn};
8
9use ironflow_store::models::RunStatus;
10
11use super::{Event, EventSubscriber, SubscriberFuture};
12
13const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
15
16const MAX_RETRIES: u32 = 3;
18
19const BASE_BACKOFF: Duration = Duration::from_millis(500);
21
22const DEFAULT_INGEST_URL: &str = "https://in.logs.betterstack.com";
24
25#[derive(Debug, Serialize)]
27struct LogPayload {
28 dt: String,
30 level: &'static str,
32 message: String,
34 event: serde_json::Value,
36}
37
38pub struct BetterStackSubscriber {
63 source_token: String,
64 authorization_header: String,
65 ingest_url: String,
66 client: reqwest::Client,
67}
68
69impl BetterStackSubscriber {
70 pub fn new(source_token: &str) -> Self {
87 Self::with_url(source_token, DEFAULT_INGEST_URL)
88 }
89
90 pub fn with_url(source_token: &str, ingest_url: &str) -> Self {
110 let client = reqwest::Client::builder()
111 .timeout(DEFAULT_TIMEOUT)
112 .build()
113 .expect("failed to build HTTP client");
114 Self {
115 authorization_header: format!("Bearer {}", source_token),
116 source_token: source_token.to_string(),
117 ingest_url: ingest_url.to_string(),
118 client,
119 }
120 }
121
122 pub fn source_token(&self) -> &str {
124 &self.source_token
125 }
126
127 pub fn ingest_url(&self) -> &str {
129 &self.ingest_url
130 }
131
132 fn build_payload(event: &Event) -> Option<LogPayload> {
134 match event {
135 Event::StepFailed {
136 run_id,
137 step_id,
138 step_name,
139 kind,
140 error,
141 at,
142 } => {
143 let message = format!(
144 "Step '{}' ({}) failed on run {}: {}",
145 step_name, kind, run_id, error
146 );
147 let event_json = serde_json::json!({
148 "type": "step_failed",
149 "run_id": run_id.to_string(),
150 "step_id": step_id.to_string(),
151 "step_name": step_name,
152 "kind": kind.to_string(),
153 "error": error,
154 });
155 Some(LogPayload {
156 dt: at.to_rfc3339(),
157 level: "error",
158 message,
159 event: event_json,
160 })
161 }
162 Event::RunStatusChanged {
163 run_id,
164 workflow_name,
165 to: RunStatus::Failed,
166 error,
167 cost_usd,
168 duration_ms,
169 at,
170 ..
171 } => {
172 let error_detail = error.as_deref().unwrap_or("unknown error");
173 let message = format!(
174 "Run {} (workflow '{}') failed: {}",
175 run_id, workflow_name, error_detail
176 );
177 let event_json = serde_json::json!({
178 "type": "run_failed",
179 "run_id": run_id.to_string(),
180 "workflow_name": workflow_name,
181 "error": error_detail,
182 "cost_usd": cost_usd.to_string(),
183 "duration_ms": duration_ms,
184 });
185 Some(LogPayload {
186 dt: at.to_rfc3339(),
187 level: "error",
188 message,
189 event: event_json,
190 })
191 }
192 _ => None,
193 }
194 }
195
196 async fn deliver(&self, payload: &LogPayload) {
198 for attempt in 0..MAX_RETRIES {
199 let result = self
200 .client
201 .post(&self.ingest_url)
202 .header("Authorization", &self.authorization_header)
203 .json(payload)
204 .send()
205 .await;
206
207 match result {
208 Ok(resp) if resp.status().as_u16() == 202 => {
209 info!(
210 ingest_url = %self.ingest_url,
211 message = %payload.message,
212 "betterstack log delivered"
213 );
214 return;
215 }
216 Ok(resp) => {
217 let status = resp.status();
218 self.log_retry_or_fail(attempt, &payload.message, &format!("HTTP {status}"));
219 }
220 Err(err) => {
221 self.log_retry_or_fail(attempt, &payload.message, &err.to_string());
222 }
223 }
224
225 if attempt + 1 < MAX_RETRIES {
226 let delay = BASE_BACKOFF * 2u32.pow(attempt);
227 sleep(delay).await;
228 }
229 }
230 }
231
232 fn log_retry_or_fail(&self, attempt: u32, message: &str, err_msg: &str) {
233 let remaining = MAX_RETRIES - attempt - 1;
234 if remaining > 0 {
235 warn!(
236 ingest_url = %self.ingest_url,
237 message,
238 attempt = attempt + 1,
239 remaining,
240 error = %err_msg,
241 "betterstack delivery failed, retrying"
242 );
243 } else {
244 error!(
245 ingest_url = %self.ingest_url,
246 message,
247 error = %err_msg,
248 "betterstack delivery failed after all retries"
249 );
250 }
251 }
252}
253
254impl EventSubscriber for BetterStackSubscriber {
255 fn name(&self) -> &str {
256 "betterstack"
257 }
258
259 fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
260 Box::pin(async move {
261 if let Some(payload) = Self::build_payload(event) {
262 self.deliver(&payload).await;
263 }
264 })
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271 use chrono::Utc;
272 use ironflow_store::models::{RunStatus, StepKind};
273 use rust_decimal::Decimal;
274 use uuid::Uuid;
275
276 #[test]
277 fn new_sets_default_ingest_url() {
278 let sub = BetterStackSubscriber::new("token-123");
279 assert_eq!(sub.source_token(), "token-123");
280 assert_eq!(sub.ingest_url(), DEFAULT_INGEST_URL);
281 }
282
283 #[test]
284 fn with_url_sets_custom_ingest_url() {
285 let sub = BetterStackSubscriber::with_url("token-123", "https://custom.example.com");
286 assert_eq!(sub.source_token(), "token-123");
287 assert_eq!(sub.ingest_url(), "https://custom.example.com");
288 }
289
290 #[test]
291 fn name_is_betterstack() {
292 let sub = BetterStackSubscriber::new("token");
293 assert_eq!(sub.name(), "betterstack");
294 }
295
296 #[test]
297 fn build_payload_step_failed() {
298 let event = Event::StepFailed {
299 run_id: Uuid::now_v7(),
300 step_id: Uuid::now_v7(),
301 step_name: "build".to_string(),
302 kind: StepKind::Shell,
303 error: "exit code 1".to_string(),
304 at: Utc::now(),
305 };
306
307 let payload = BetterStackSubscriber::build_payload(&event);
308 assert!(payload.is_some());
309 let payload = payload.unwrap();
310 assert_eq!(payload.level, "error");
311 assert!(payload.message.contains("build"));
312 assert!(payload.message.contains("exit code 1"));
313 assert_eq!(payload.event["type"], "step_failed");
314 assert_eq!(payload.event["error"], "exit code 1");
315 }
316
317 #[test]
318 fn build_payload_run_failed() {
319 let event = Event::RunStatusChanged {
320 run_id: Uuid::now_v7(),
321 workflow_name: "deploy".to_string(),
322 from: RunStatus::Running,
323 to: RunStatus::Failed,
324 error: Some("step 'build' failed".to_string()),
325 cost_usd: Decimal::new(42, 2),
326 duration_ms: 5000,
327 at: Utc::now(),
328 };
329
330 let payload = BetterStackSubscriber::build_payload(&event);
331 assert!(payload.is_some());
332 let payload = payload.unwrap();
333 assert_eq!(payload.level, "error");
334 assert!(payload.message.contains("deploy"));
335 assert!(payload.message.contains("step 'build' failed"));
336 assert_eq!(payload.event["type"], "run_failed");
337 assert_eq!(payload.event["workflow_name"], "deploy");
338 }
339
340 #[test]
341 fn build_payload_run_failed_without_error_message() {
342 let event = Event::RunStatusChanged {
343 run_id: Uuid::now_v7(),
344 workflow_name: "deploy".to_string(),
345 from: RunStatus::Running,
346 to: RunStatus::Failed,
347 error: None,
348 cost_usd: Decimal::ZERO,
349 duration_ms: 1000,
350 at: Utc::now(),
351 };
352
353 let payload = BetterStackSubscriber::build_payload(&event).unwrap();
354 assert!(payload.message.contains("unknown error"));
355 assert_eq!(payload.event["error"], "unknown error");
356 }
357
358 #[test]
359 fn build_payload_run_completed_returns_none() {
360 let event = Event::RunStatusChanged {
361 run_id: Uuid::now_v7(),
362 workflow_name: "deploy".to_string(),
363 from: RunStatus::Running,
364 to: RunStatus::Completed,
365 error: None,
366 cost_usd: Decimal::ZERO,
367 duration_ms: 1000,
368 at: Utc::now(),
369 };
370
371 assert!(BetterStackSubscriber::build_payload(&event).is_none());
372 }
373
374 #[test]
375 fn build_payload_run_created_returns_none() {
376 let event = Event::RunCreated {
377 run_id: Uuid::now_v7(),
378 workflow_name: "deploy".to_string(),
379 at: Utc::now(),
380 };
381
382 assert!(BetterStackSubscriber::build_payload(&event).is_none());
383 }
384
385 #[test]
386 fn build_payload_step_completed_returns_none() {
387 let event = Event::StepCompleted {
388 run_id: Uuid::now_v7(),
389 step_id: Uuid::now_v7(),
390 step_name: "build".to_string(),
391 kind: StepKind::Shell,
392 duration_ms: 500,
393 cost_usd: Decimal::ZERO,
394 at: Utc::now(),
395 };
396
397 assert!(BetterStackSubscriber::build_payload(&event).is_none());
398 }
399
400 #[test]
401 fn build_payload_approval_requested_returns_none() {
402 let event = Event::ApprovalRequested {
403 run_id: Uuid::now_v7(),
404 step_id: Uuid::now_v7(),
405 message: "Deploy to prod?".to_string(),
406 at: Utc::now(),
407 };
408
409 assert!(BetterStackSubscriber::build_payload(&event).is_none());
410 }
411
412 #[test]
413 fn build_payload_user_signed_in_returns_none() {
414 let event = Event::UserSignedIn {
415 user_id: Uuid::now_v7(),
416 username: "alice".to_string(),
417 at: Utc::now(),
418 };
419
420 assert!(BetterStackSubscriber::build_payload(&event).is_none());
421 }
422
423 #[tokio::test]
424 async fn handle_ignores_non_error_events() {
425 let sub = BetterStackSubscriber::with_url("token", "http://127.0.0.1:1");
426 let event = Event::RunCreated {
427 run_id: Uuid::now_v7(),
428 workflow_name: "deploy".to_string(),
429 at: Utc::now(),
430 };
431 sub.handle(&event).await;
433 }
434
435 #[tokio::test]
436 async fn deliver_to_real_endpoint_returns_202() {
437 use axum::Router;
438 use axum::http::StatusCode;
439 use axum::routing::post;
440 use tokio::net::TcpListener;
441
442 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
443 let addr = listener.local_addr().unwrap();
444
445 let app = Router::new().route("/", post(|| async { StatusCode::ACCEPTED }));
446
447 tokio::spawn(async move {
448 axum::serve(listener, app).await.unwrap();
449 });
450
451 let sub = BetterStackSubscriber::with_url("test-token", &format!("http://{}", addr));
452 let event = Event::StepFailed {
453 run_id: Uuid::now_v7(),
454 step_id: Uuid::now_v7(),
455 step_name: "build".to_string(),
456 kind: StepKind::Shell,
457 error: "exit code 1".to_string(),
458 at: Utc::now(),
459 };
460
461 sub.handle(&event).await;
462 }
463}