1use reqwest::Client;
4use serde::Serialize;
5
6use super::retry::{RetryConfig, deliver_with_retry, is_accepted_202};
7use super::{Event, EventSubscriber, SubscriberFuture};
8
9const DEFAULT_INGEST_URL: &str = "https://in.logs.betterstack.com";
11
12#[derive(Debug, Serialize)]
14struct LogPayload {
15 dt: String,
17 level: &'static str,
19 message: String,
21 event: serde_json::Value,
23}
24
25pub struct BetterStackSubscriber {
50 source_token: String,
51 authorization_header: String,
52 ingest_url: String,
53 client: Client,
54 retry_config: RetryConfig,
55}
56
57impl BetterStackSubscriber {
58 pub fn new(source_token: &str) -> Self {
76 Self::with_url(source_token, DEFAULT_INGEST_URL)
77 }
78
79 pub fn with_url(source_token: &str, ingest_url: &str) -> Self {
99 Self::with_url_and_retry(source_token, ingest_url, RetryConfig::default())
100 }
101
102 pub fn with_url_and_retry(
125 source_token: &str,
126 ingest_url: &str,
127 retry_config: RetryConfig,
128 ) -> Self {
129 let client = retry_config.build_client();
130 Self {
131 authorization_header: format!("Bearer {}", source_token),
132 source_token: source_token.to_string(),
133 ingest_url: ingest_url.to_string(),
134 client,
135 retry_config,
136 }
137 }
138
139 pub fn source_token(&self) -> &str {
141 &self.source_token
142 }
143
144 pub fn ingest_url(&self) -> &str {
146 &self.ingest_url
147 }
148
149 fn build_payload(event: &Event) -> Option<LogPayload> {
151 match event {
152 Event::StepFailed {
153 run_id,
154 step_id,
155 step_name,
156 kind,
157 error,
158 at,
159 } => {
160 let message = format!(
161 "Step '{}' ({}) failed on run {}: {}",
162 step_name, kind, run_id, error
163 );
164 let event_json = serde_json::json!({
165 "type": "step_failed",
166 "run_id": run_id.to_string(),
167 "step_id": step_id.to_string(),
168 "step_name": step_name,
169 "kind": kind.to_string(),
170 "error": error,
171 });
172 Some(LogPayload {
173 dt: at.to_rfc3339(),
174 level: "error",
175 message,
176 event: event_json,
177 })
178 }
179 Event::RunFailed {
180 run_id,
181 workflow_name,
182 error,
183 cost_usd,
184 duration_ms,
185 at,
186 } => {
187 let error_detail = error.as_deref().unwrap_or("unknown error");
188 let message = format!(
189 "Run {} (workflow '{}') failed: {}",
190 run_id, workflow_name, error_detail
191 );
192 let event_json = serde_json::json!({
193 "type": "run_failed",
194 "run_id": run_id.to_string(),
195 "workflow_name": workflow_name,
196 "error": error_detail,
197 "cost_usd": cost_usd.to_string(),
198 "duration_ms": duration_ms,
199 });
200 Some(LogPayload {
201 dt: at.to_rfc3339(),
202 level: "error",
203 message,
204 event: event_json,
205 })
206 }
207 _ => None,
208 }
209 }
210}
211
212impl EventSubscriber for BetterStackSubscriber {
213 fn name(&self) -> &str {
214 "betterstack"
215 }
216
217 fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
218 Box::pin(async move {
219 if let Some(payload) = Self::build_payload(event) {
220 deliver_with_retry(
221 &self.retry_config,
222 || {
223 self.client
224 .post(&self.ingest_url)
225 .header("Authorization", &self.authorization_header)
226 .json(&payload)
227 },
228 is_accepted_202,
229 "betterstack",
230 &payload.message,
231 )
232 .await;
233 }
234 })
235 }
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241 use chrono::Utc;
242 use ironflow_store::models::{RunStatus, StepKind};
243 use rust_decimal::Decimal;
244 use uuid::Uuid;
245
246 #[test]
247 fn new_sets_default_ingest_url() {
248 let sub = BetterStackSubscriber::new("token-123");
249 assert_eq!(sub.source_token(), "token-123");
250 assert_eq!(sub.ingest_url(), DEFAULT_INGEST_URL);
251 }
252
253 #[test]
254 fn with_url_sets_custom_ingest_url() {
255 let sub = BetterStackSubscriber::with_url("token-123", "https://custom.example.com");
256 assert_eq!(sub.source_token(), "token-123");
257 assert_eq!(sub.ingest_url(), "https://custom.example.com");
258 }
259
260 #[test]
261 fn name_is_betterstack() {
262 let sub = BetterStackSubscriber::new("token");
263 assert_eq!(sub.name(), "betterstack");
264 }
265
266 #[test]
267 fn build_payload_step_failed() {
268 let event = Event::StepFailed {
269 run_id: Uuid::now_v7(),
270 step_id: Uuid::now_v7(),
271 step_name: "build".to_string(),
272 kind: StepKind::Shell,
273 error: "exit code 1".to_string(),
274 at: Utc::now(),
275 };
276
277 let payload = BetterStackSubscriber::build_payload(&event);
278 assert!(payload.is_some());
279 let payload = payload.unwrap();
280 assert_eq!(payload.level, "error");
281 assert!(payload.message.contains("build"));
282 assert!(payload.message.contains("exit code 1"));
283 assert_eq!(payload.event["type"], "step_failed");
284 assert_eq!(payload.event["error"], "exit code 1");
285 }
286
287 #[test]
288 fn build_payload_run_failed() {
289 let event = Event::RunFailed {
290 run_id: Uuid::now_v7(),
291 workflow_name: "deploy".to_string(),
292 error: Some("step 'build' failed".to_string()),
293 cost_usd: Decimal::new(42, 2),
294 duration_ms: 5000,
295 at: Utc::now(),
296 };
297
298 let payload = BetterStackSubscriber::build_payload(&event);
299 assert!(payload.is_some());
300 let payload = payload.unwrap();
301 assert_eq!(payload.level, "error");
302 assert!(payload.message.contains("deploy"));
303 assert!(payload.message.contains("step 'build' failed"));
304 assert_eq!(payload.event["type"], "run_failed");
305 assert_eq!(payload.event["workflow_name"], "deploy");
306 }
307
308 #[test]
309 fn build_payload_run_failed_without_error_message() {
310 let event = Event::RunFailed {
311 run_id: Uuid::now_v7(),
312 workflow_name: "deploy".to_string(),
313 error: None,
314 cost_usd: Decimal::ZERO,
315 duration_ms: 1000,
316 at: Utc::now(),
317 };
318
319 let payload = BetterStackSubscriber::build_payload(&event).unwrap();
320 assert!(payload.message.contains("unknown error"));
321 assert_eq!(payload.event["error"], "unknown error");
322 }
323
324 #[test]
325 fn build_payload_run_completed_returns_none() {
326 let event = Event::RunStatusChanged {
327 run_id: Uuid::now_v7(),
328 workflow_name: "deploy".to_string(),
329 from: RunStatus::Running,
330 to: RunStatus::Completed,
331 error: None,
332 cost_usd: Decimal::ZERO,
333 duration_ms: 1000,
334 at: Utc::now(),
335 };
336
337 assert!(BetterStackSubscriber::build_payload(&event).is_none());
338 }
339
340 #[test]
341 fn build_payload_run_created_returns_none() {
342 let event = Event::RunCreated {
343 run_id: Uuid::now_v7(),
344 workflow_name: "deploy".to_string(),
345 at: Utc::now(),
346 };
347
348 assert!(BetterStackSubscriber::build_payload(&event).is_none());
349 }
350
351 #[test]
352 fn build_payload_step_completed_returns_none() {
353 let event = Event::StepCompleted {
354 run_id: Uuid::now_v7(),
355 step_id: Uuid::now_v7(),
356 step_name: "build".to_string(),
357 kind: StepKind::Shell,
358 duration_ms: 500,
359 cost_usd: Decimal::ZERO,
360 at: Utc::now(),
361 };
362
363 assert!(BetterStackSubscriber::build_payload(&event).is_none());
364 }
365
366 #[test]
367 fn build_payload_approval_requested_returns_none() {
368 let event = Event::ApprovalRequested {
369 run_id: Uuid::now_v7(),
370 step_id: Uuid::now_v7(),
371 message: "Deploy to prod?".to_string(),
372 at: Utc::now(),
373 };
374
375 assert!(BetterStackSubscriber::build_payload(&event).is_none());
376 }
377
378 #[test]
379 fn build_payload_user_signed_in_returns_none() {
380 let event = Event::UserSignedIn {
381 user_id: Uuid::now_v7(),
382 username: "alice".to_string(),
383 at: Utc::now(),
384 };
385
386 assert!(BetterStackSubscriber::build_payload(&event).is_none());
387 }
388
389 #[tokio::test]
390 async fn handle_ignores_non_error_events() {
391 let sub = BetterStackSubscriber::with_url("token", "http://127.0.0.1:1");
392 let event = Event::RunCreated {
393 run_id: Uuid::now_v7(),
394 workflow_name: "deploy".to_string(),
395 at: Utc::now(),
396 };
397 sub.handle(&event).await;
399 }
400
401 #[tokio::test]
402 async fn deliver_to_real_endpoint_returns_202() {
403 use axum::Router;
404 use axum::http::StatusCode;
405 use axum::routing::post;
406 use tokio::net::TcpListener;
407
408 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
409 let addr = listener.local_addr().unwrap();
410
411 let app = Router::new().route("/", post(|| async { StatusCode::ACCEPTED }));
412
413 tokio::spawn(async move {
414 axum::serve(listener, app).await.unwrap();
415 });
416
417 let sub = BetterStackSubscriber::with_url("test-token", &format!("http://{}", addr));
418 let event = Event::StepFailed {
419 run_id: Uuid::now_v7(),
420 step_id: Uuid::now_v7(),
421 step_name: "build".to_string(),
422 kind: StepKind::Shell,
423 error: "exit code 1".to_string(),
424 at: Utc::now(),
425 };
426
427 sub.handle(&event).await;
428 }
429}