1use std::time::Duration;
4
5use serde::Serialize;
6use tokio::time::sleep;
7use tracing::{error, info, warn};
8
9use super::{Event, EventSubscriber, SubscriberFuture};
10
11const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
13
14const MAX_RETRIES: u32 = 3;
16
17const BASE_BACKOFF: Duration = Duration::from_millis(500);
19
20const DEFAULT_INGEST_URL: &str = "https://in.logs.betterstack.com";
22
23#[derive(Debug, Serialize)]
25struct LogPayload {
26 dt: String,
28 level: &'static str,
30 message: String,
32 event: serde_json::Value,
34}
35
36pub struct BetterStackSubscriber {
61 source_token: String,
62 authorization_header: String,
63 ingest_url: String,
64 client: reqwest::Client,
65}
66
67impl BetterStackSubscriber {
68 pub fn new(source_token: &str) -> Self {
85 Self::with_url(source_token, DEFAULT_INGEST_URL)
86 }
87
88 pub fn with_url(source_token: &str, ingest_url: &str) -> Self {
108 let client = reqwest::Client::builder()
109 .timeout(DEFAULT_TIMEOUT)
110 .build()
111 .expect("failed to build HTTP client");
112 Self {
113 authorization_header: format!("Bearer {}", source_token),
114 source_token: source_token.to_string(),
115 ingest_url: ingest_url.to_string(),
116 client,
117 }
118 }
119
120 pub fn source_token(&self) -> &str {
122 &self.source_token
123 }
124
125 pub fn ingest_url(&self) -> &str {
127 &self.ingest_url
128 }
129
130 fn build_payload(event: &Event) -> Option<LogPayload> {
132 match event {
133 Event::StepFailed {
134 run_id,
135 step_id,
136 step_name,
137 kind,
138 error,
139 at,
140 } => {
141 let message = format!(
142 "Step '{}' ({}) failed on run {}: {}",
143 step_name, kind, run_id, error
144 );
145 let event_json = serde_json::json!({
146 "type": "step_failed",
147 "run_id": run_id.to_string(),
148 "step_id": step_id.to_string(),
149 "step_name": step_name,
150 "kind": kind.to_string(),
151 "error": error,
152 });
153 Some(LogPayload {
154 dt: at.to_rfc3339(),
155 level: "error",
156 message,
157 event: event_json,
158 })
159 }
160 Event::RunFailed {
161 run_id,
162 workflow_name,
163 error,
164 cost_usd,
165 duration_ms,
166 at,
167 } => {
168 let error_detail = error.as_deref().unwrap_or("unknown error");
169 let message = format!(
170 "Run {} (workflow '{}') failed: {}",
171 run_id, workflow_name, error_detail
172 );
173 let event_json = serde_json::json!({
174 "type": "run_failed",
175 "run_id": run_id.to_string(),
176 "workflow_name": workflow_name,
177 "error": error_detail,
178 "cost_usd": cost_usd.to_string(),
179 "duration_ms": duration_ms,
180 });
181 Some(LogPayload {
182 dt: at.to_rfc3339(),
183 level: "error",
184 message,
185 event: event_json,
186 })
187 }
188 _ => None,
189 }
190 }
191
192 async fn deliver(&self, payload: &LogPayload) {
194 for attempt in 0..MAX_RETRIES {
195 let result = self
196 .client
197 .post(&self.ingest_url)
198 .header("Authorization", &self.authorization_header)
199 .json(payload)
200 .send()
201 .await;
202
203 match result {
204 Ok(resp) if resp.status().as_u16() == 202 => {
205 info!(
206 ingest_url = %self.ingest_url,
207 message = %payload.message,
208 "betterstack log delivered"
209 );
210 return;
211 }
212 Ok(resp) => {
213 let status = resp.status();
214 self.log_retry_or_fail(attempt, &payload.message, &format!("HTTP {status}"));
215 }
216 Err(err) => {
217 self.log_retry_or_fail(attempt, &payload.message, &err.to_string());
218 }
219 }
220
221 if attempt + 1 < MAX_RETRIES {
222 let delay = BASE_BACKOFF * 2u32.pow(attempt);
223 sleep(delay).await;
224 }
225 }
226 }
227
228 fn log_retry_or_fail(&self, attempt: u32, message: &str, err_msg: &str) {
229 let remaining = MAX_RETRIES - attempt - 1;
230 if remaining > 0 {
231 warn!(
232 ingest_url = %self.ingest_url,
233 message,
234 attempt = attempt + 1,
235 remaining,
236 error = %err_msg,
237 "betterstack delivery failed, retrying"
238 );
239 } else {
240 error!(
241 ingest_url = %self.ingest_url,
242 message,
243 error = %err_msg,
244 "betterstack delivery failed after all retries"
245 );
246 }
247 }
248}
249
250impl EventSubscriber for BetterStackSubscriber {
251 fn name(&self) -> &str {
252 "betterstack"
253 }
254
255 fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
256 Box::pin(async move {
257 if let Some(payload) = Self::build_payload(event) {
258 self.deliver(&payload).await;
259 }
260 })
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267 use chrono::Utc;
268 use ironflow_store::models::{RunStatus, StepKind};
269 use rust_decimal::Decimal;
270 use uuid::Uuid;
271
272 #[test]
273 fn new_sets_default_ingest_url() {
274 let sub = BetterStackSubscriber::new("token-123");
275 assert_eq!(sub.source_token(), "token-123");
276 assert_eq!(sub.ingest_url(), DEFAULT_INGEST_URL);
277 }
278
279 #[test]
280 fn with_url_sets_custom_ingest_url() {
281 let sub = BetterStackSubscriber::with_url("token-123", "https://custom.example.com");
282 assert_eq!(sub.source_token(), "token-123");
283 assert_eq!(sub.ingest_url(), "https://custom.example.com");
284 }
285
286 #[test]
287 fn name_is_betterstack() {
288 let sub = BetterStackSubscriber::new("token");
289 assert_eq!(sub.name(), "betterstack");
290 }
291
292 #[test]
293 fn build_payload_step_failed() {
294 let event = Event::StepFailed {
295 run_id: Uuid::now_v7(),
296 step_id: Uuid::now_v7(),
297 step_name: "build".to_string(),
298 kind: StepKind::Shell,
299 error: "exit code 1".to_string(),
300 at: Utc::now(),
301 };
302
303 let payload = BetterStackSubscriber::build_payload(&event);
304 assert!(payload.is_some());
305 let payload = payload.unwrap();
306 assert_eq!(payload.level, "error");
307 assert!(payload.message.contains("build"));
308 assert!(payload.message.contains("exit code 1"));
309 assert_eq!(payload.event["type"], "step_failed");
310 assert_eq!(payload.event["error"], "exit code 1");
311 }
312
313 #[test]
314 fn build_payload_run_failed() {
315 let event = Event::RunFailed {
316 run_id: Uuid::now_v7(),
317 workflow_name: "deploy".to_string(),
318 error: Some("step 'build' failed".to_string()),
319 cost_usd: Decimal::new(42, 2),
320 duration_ms: 5000,
321 at: Utc::now(),
322 };
323
324 let payload = BetterStackSubscriber::build_payload(&event);
325 assert!(payload.is_some());
326 let payload = payload.unwrap();
327 assert_eq!(payload.level, "error");
328 assert!(payload.message.contains("deploy"));
329 assert!(payload.message.contains("step 'build' failed"));
330 assert_eq!(payload.event["type"], "run_failed");
331 assert_eq!(payload.event["workflow_name"], "deploy");
332 }
333
334 #[test]
335 fn build_payload_run_failed_without_error_message() {
336 let event = Event::RunFailed {
337 run_id: Uuid::now_v7(),
338 workflow_name: "deploy".to_string(),
339 error: None,
340 cost_usd: Decimal::ZERO,
341 duration_ms: 1000,
342 at: Utc::now(),
343 };
344
345 let payload = BetterStackSubscriber::build_payload(&event).unwrap();
346 assert!(payload.message.contains("unknown error"));
347 assert_eq!(payload.event["error"], "unknown error");
348 }
349
350 #[test]
351 fn build_payload_run_completed_returns_none() {
352 let event = Event::RunStatusChanged {
353 run_id: Uuid::now_v7(),
354 workflow_name: "deploy".to_string(),
355 from: RunStatus::Running,
356 to: RunStatus::Completed,
357 error: None,
358 cost_usd: Decimal::ZERO,
359 duration_ms: 1000,
360 at: Utc::now(),
361 };
362
363 assert!(BetterStackSubscriber::build_payload(&event).is_none());
364 }
365
366 #[test]
367 fn build_payload_run_created_returns_none() {
368 let event = Event::RunCreated {
369 run_id: Uuid::now_v7(),
370 workflow_name: "deploy".to_string(),
371 at: Utc::now(),
372 };
373
374 assert!(BetterStackSubscriber::build_payload(&event).is_none());
375 }
376
377 #[test]
378 fn build_payload_step_completed_returns_none() {
379 let event = Event::StepCompleted {
380 run_id: Uuid::now_v7(),
381 step_id: Uuid::now_v7(),
382 step_name: "build".to_string(),
383 kind: StepKind::Shell,
384 duration_ms: 500,
385 cost_usd: Decimal::ZERO,
386 at: Utc::now(),
387 };
388
389 assert!(BetterStackSubscriber::build_payload(&event).is_none());
390 }
391
392 #[test]
393 fn build_payload_approval_requested_returns_none() {
394 let event = Event::ApprovalRequested {
395 run_id: Uuid::now_v7(),
396 step_id: Uuid::now_v7(),
397 message: "Deploy to prod?".to_string(),
398 at: Utc::now(),
399 };
400
401 assert!(BetterStackSubscriber::build_payload(&event).is_none());
402 }
403
404 #[test]
405 fn build_payload_user_signed_in_returns_none() {
406 let event = Event::UserSignedIn {
407 user_id: Uuid::now_v7(),
408 username: "alice".to_string(),
409 at: Utc::now(),
410 };
411
412 assert!(BetterStackSubscriber::build_payload(&event).is_none());
413 }
414
415 #[tokio::test]
416 async fn handle_ignores_non_error_events() {
417 let sub = BetterStackSubscriber::with_url("token", "http://127.0.0.1:1");
418 let event = Event::RunCreated {
419 run_id: Uuid::now_v7(),
420 workflow_name: "deploy".to_string(),
421 at: Utc::now(),
422 };
423 sub.handle(&event).await;
425 }
426
427 #[tokio::test]
428 async fn deliver_to_real_endpoint_returns_202() {
429 use axum::Router;
430 use axum::http::StatusCode;
431 use axum::routing::post;
432 use tokio::net::TcpListener;
433
434 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
435 let addr = listener.local_addr().unwrap();
436
437 let app = Router::new().route("/", post(|| async { StatusCode::ACCEPTED }));
438
439 tokio::spawn(async move {
440 axum::serve(listener, app).await.unwrap();
441 });
442
443 let sub = BetterStackSubscriber::with_url("test-token", &format!("http://{}", addr));
444 let event = Event::StepFailed {
445 run_id: Uuid::now_v7(),
446 step_id: Uuid::now_v7(),
447 step_name: "build".to_string(),
448 kind: StepKind::Shell,
449 error: "exit code 1".to_string(),
450 at: Utc::now(),
451 };
452
453 sub.handle(&event).await;
454 }
455}