Skip to main content

logtail_rust/http_client/
service.rs

1use super::{HttpClient, LogtailError, RetryConfig};
2use crate::r#struct::betterstack_log_schema::BetterStackLogSchema;
3use crate::r#struct::env_config::EnvConfig;
4use reqwest::header::{HeaderMap, HeaderValue};
5use serde_json::Value;
6use std::time::Duration;
7
8/// Pushes a log to the BetterStack logs server asynchronously and returns a value.
9///
10/// # Arguments
11///
12/// * `client` - The HTTP client to use for sending the request.
13/// * `config` - The configuration of the server.
14/// * `log` - The log to be pushed.
15///
16/// # Returns
17///
18/// * `Ok(Some(value))` if the log is sent successfully and a response body is returned.
19/// * `Ok(None)` if the log is sent successfully but no response body is returned.
20/// * `Err(LogtailError)` if there is an error sending the log.
21pub async fn push_log(
22    client: &impl HttpClient,
23    config: &EnvConfig,
24    log: &BetterStackLogSchema,
25) -> Result<Option<Value>, LogtailError> {
26    let logs_url = "https://in.logs.betterstack.com";
27    let bearer_header = bearer_headers(config);
28    let body = serde_json::to_value(log)?;
29
30    client.post_json(logs_url, &body, Some(bearer_header)).await
31}
32
33/// Pushes a log with automatic retry on transient failures.
34///
35/// Uses exponential backoff with optional jitter. Only retries on errors
36/// where `is_retryable()` returns true (5xx HTTP errors and network errors).
37pub async fn push_log_with_retry(
38    client: &impl HttpClient,
39    config: &EnvConfig,
40    log: &BetterStackLogSchema,
41    retry_config: &RetryConfig,
42) -> Result<Option<Value>, LogtailError> {
43    let mut last_err = None;
44
45    for attempt in 0..=retry_config.max_retries {
46        match push_log(client, config, log).await {
47            Ok(val) => return Ok(val),
48            Err(err) => {
49                if !err.is_retryable() || attempt == retry_config.max_retries {
50                    return Err(err);
51                }
52                last_err = Some(err);
53
54                let base_ms = retry_config
55                    .base_delay
56                    .as_millis()
57                    .saturating_mul(2u128.saturating_pow(attempt))
58                    as u64;
59                let capped_ms = base_ms.min(retry_config.max_delay.as_millis() as u64);
60
61                let delay_ms = if retry_config.jitter && capped_ms > 0 {
62                    // Cheap jitter using timestamp nanos
63                    let nanos = std::time::SystemTime::now()
64                        .duration_since(std::time::UNIX_EPOCH)
65                        .unwrap_or_default()
66                        .subsec_nanos() as u64;
67                    nanos % capped_ms
68                } else {
69                    capped_ms
70                };
71
72                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
73            }
74        }
75    }
76
77    Err(last_err.unwrap_or_else(|| LogtailError::Http {
78        status: 500,
79        message: "retry exhausted".to_string(),
80    }))
81}
82
83/// Generate a bearer header for the given server configuration.
84fn bearer_headers(config: &EnvConfig) -> HeaderMap {
85    let logs_source_token = config.logs_source_token.as_str();
86    let bearer_value_str = format!("Bearer {}", logs_source_token);
87    let bearer_value = &bearer_value_str;
88
89    let mut headers = HeaderMap::new();
90
91    headers.insert(
92        "Authorization",
93        HeaderValue::from_str(bearer_value).unwrap(),
94    );
95    headers.insert(
96        "Content-Type",
97        HeaderValue::from_str("application/json").unwrap(),
98    );
99
100    headers
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106    use crate::http_client::mock::MockHttpClient;
107    use crate::r#struct::env_config::{EnvConfig, EnvEnum};
108    use crate::r#struct::log_level::LogLevel;
109    use std::sync::atomic::Ordering;
110
111    fn test_config() -> EnvConfig {
112        EnvConfig::from_values(
113            "1.0.0".to_string(),
114            EnvEnum::QA,
115            "test-source-token".to_string(),
116            false,
117        )
118    }
119
120    fn test_log() -> BetterStackLogSchema {
121        BetterStackLogSchema {
122            env: EnvEnum::QA,
123            message: "test message".to_string(),
124            context: "test context".to_string(),
125            level: LogLevel::Info,
126            app_version: "1.0.0".to_string(),
127        }
128    }
129
130    #[tokio::test]
131    async fn calls_correct_url() {
132        let mock = MockHttpClient::with_success(None);
133        let _ = push_log(&mock, &test_config(), &test_log()).await;
134
135        let url = mock.captured_url.lock().unwrap().clone().unwrap();
136        assert_eq!(url, "https://in.logs.betterstack.com");
137    }
138
139    #[tokio::test]
140    async fn sends_bearer_header() {
141        let mock = MockHttpClient::with_success(None);
142        let _ = push_log(&mock, &test_config(), &test_log()).await;
143
144        let headers = mock.captured_headers.lock().unwrap().clone().unwrap();
145        assert_eq!(
146            headers.get("Authorization").unwrap().to_str().unwrap(),
147            "Bearer test-source-token"
148        );
149    }
150
151    #[tokio::test]
152    async fn sends_content_type_json() {
153        let mock = MockHttpClient::with_success(None);
154        let _ = push_log(&mock, &test_config(), &test_log()).await;
155
156        let headers = mock.captured_headers.lock().unwrap().clone().unwrap();
157        assert_eq!(
158            headers.get("Content-Type").unwrap().to_str().unwrap(),
159            "application/json"
160        );
161    }
162
163    #[tokio::test]
164    async fn sends_serialized_log_body() {
165        let mock = MockHttpClient::with_success(None);
166        let _ = push_log(&mock, &test_config(), &test_log()).await;
167
168        let body = mock.captured_body.lock().unwrap().clone().unwrap();
169        assert_eq!(body["message"], "test message");
170        assert_eq!(body["context"], "test context");
171        assert_eq!(body["level"], "Info");
172        assert_eq!(body["env"], "QA");
173        assert_eq!(body["app_version"], "1.0.0");
174    }
175
176    #[tokio::test]
177    async fn returns_some_on_success() {
178        let response = serde_json::json!({"status": "ok"});
179        let mock = MockHttpClient::with_success(Some(response.clone()));
180
181        let result = push_log(&mock, &test_config(), &test_log()).await;
182        assert_eq!(result.unwrap().unwrap(), response);
183    }
184
185    #[tokio::test]
186    async fn returns_error_on_failure() {
187        let mock = MockHttpClient::with_error("connection refused");
188
189        let result = push_log(&mock, &test_config(), &test_log()).await;
190        assert!(result.is_err());
191    }
192
193    #[tokio::test]
194    async fn returns_none_on_empty_body() {
195        let mock = MockHttpClient::with_success(None);
196
197        let result = push_log(&mock, &test_config(), &test_log()).await;
198        assert!(result.unwrap().is_none());
199        assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
200    }
201}