logtail_rust/http_client/
service.rs1use super::{HttpClient, LogtailError, RetryConfig};
2use crate::r#struct::betterstack_log_schema::BetterStackLogSchema;
3use crate::r#struct::env_config::EnvConfig;
4use reqwest::header::{HeaderMap, HeaderValue};
5use serde_json::Value;
6use std::time::Duration;
7
8pub async fn push_log(
22 client: &impl HttpClient,
23 config: &EnvConfig,
24 log: &BetterStackLogSchema,
25) -> Result<Option<Value>, LogtailError> {
26 let logs_url = "https://in.logs.betterstack.com";
27 let bearer_header = bearer_headers(config);
28 let body = serde_json::to_value(log)?;
29
30 client.post_json(logs_url, &body, Some(bearer_header)).await
31}
32
33pub async fn push_log_with_retry(
38 client: &impl HttpClient,
39 config: &EnvConfig,
40 log: &BetterStackLogSchema,
41 retry_config: &RetryConfig,
42) -> Result<Option<Value>, LogtailError> {
43 let mut last_err = None;
44
45 for attempt in 0..=retry_config.max_retries {
46 match push_log(client, config, log).await {
47 Ok(val) => return Ok(val),
48 Err(err) => {
49 if !err.is_retryable() || attempt == retry_config.max_retries {
50 return Err(err);
51 }
52 last_err = Some(err);
53
54 let base_ms = retry_config
55 .base_delay
56 .as_millis()
57 .saturating_mul(2u128.saturating_pow(attempt))
58 as u64;
59 let capped_ms = base_ms.min(retry_config.max_delay.as_millis() as u64);
60
61 let delay_ms = if retry_config.jitter && capped_ms > 0 {
62 let nanos = std::time::SystemTime::now()
64 .duration_since(std::time::UNIX_EPOCH)
65 .unwrap_or_default()
66 .subsec_nanos() as u64;
67 nanos % capped_ms
68 } else {
69 capped_ms
70 };
71
72 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
73 }
74 }
75 }
76
77 Err(last_err.unwrap_or_else(|| LogtailError::Http {
78 status: 500,
79 message: "retry exhausted".to_string(),
80 }))
81}
82
83fn bearer_headers(config: &EnvConfig) -> HeaderMap {
85 let logs_source_token = config.logs_source_token.as_str();
86 let bearer_value_str = format!("Bearer {}", logs_source_token);
87 let bearer_value = &bearer_value_str;
88
89 let mut headers = HeaderMap::new();
90
91 headers.insert(
92 "Authorization",
93 HeaderValue::from_str(bearer_value).unwrap(),
94 );
95 headers.insert(
96 "Content-Type",
97 HeaderValue::from_str("application/json").unwrap(),
98 );
99
100 headers
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106 use crate::http_client::mock::MockHttpClient;
107 use crate::r#struct::env_config::{EnvConfig, EnvEnum};
108 use crate::r#struct::log_level::LogLevel;
109 use std::sync::atomic::Ordering;
110
111 fn test_config() -> EnvConfig {
112 EnvConfig::from_values(
113 "1.0.0".to_string(),
114 EnvEnum::QA,
115 "test-source-token".to_string(),
116 false,
117 )
118 }
119
120 fn test_log() -> BetterStackLogSchema {
121 BetterStackLogSchema {
122 env: EnvEnum::QA,
123 message: "test message".to_string(),
124 context: "test context".to_string(),
125 level: LogLevel::Info,
126 app_version: "1.0.0".to_string(),
127 }
128 }
129
130 #[tokio::test]
131 async fn calls_correct_url() {
132 let mock = MockHttpClient::with_success(None);
133 let _ = push_log(&mock, &test_config(), &test_log()).await;
134
135 let url = mock.captured_url.lock().unwrap().clone().unwrap();
136 assert_eq!(url, "https://in.logs.betterstack.com");
137 }
138
139 #[tokio::test]
140 async fn sends_bearer_header() {
141 let mock = MockHttpClient::with_success(None);
142 let _ = push_log(&mock, &test_config(), &test_log()).await;
143
144 let headers = mock.captured_headers.lock().unwrap().clone().unwrap();
145 assert_eq!(
146 headers.get("Authorization").unwrap().to_str().unwrap(),
147 "Bearer test-source-token"
148 );
149 }
150
151 #[tokio::test]
152 async fn sends_content_type_json() {
153 let mock = MockHttpClient::with_success(None);
154 let _ = push_log(&mock, &test_config(), &test_log()).await;
155
156 let headers = mock.captured_headers.lock().unwrap().clone().unwrap();
157 assert_eq!(
158 headers.get("Content-Type").unwrap().to_str().unwrap(),
159 "application/json"
160 );
161 }
162
163 #[tokio::test]
164 async fn sends_serialized_log_body() {
165 let mock = MockHttpClient::with_success(None);
166 let _ = push_log(&mock, &test_config(), &test_log()).await;
167
168 let body = mock.captured_body.lock().unwrap().clone().unwrap();
169 assert_eq!(body["message"], "test message");
170 assert_eq!(body["context"], "test context");
171 assert_eq!(body["level"], "Info");
172 assert_eq!(body["env"], "QA");
173 assert_eq!(body["app_version"], "1.0.0");
174 }
175
176 #[tokio::test]
177 async fn returns_some_on_success() {
178 let response = serde_json::json!({"status": "ok"});
179 let mock = MockHttpClient::with_success(Some(response.clone()));
180
181 let result = push_log(&mock, &test_config(), &test_log()).await;
182 assert_eq!(result.unwrap().unwrap(), response);
183 }
184
185 #[tokio::test]
186 async fn returns_error_on_failure() {
187 let mock = MockHttpClient::with_error("connection refused");
188
189 let result = push_log(&mock, &test_config(), &test_log()).await;
190 assert!(result.is_err());
191 }
192
193 #[tokio::test]
194 async fn returns_none_on_empty_body() {
195 let mock = MockHttpClient::with_success(None);
196
197 let result = push_log(&mock, &test_config(), &test_log()).await;
198 assert!(result.unwrap().is_none());
199 assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
200 }
201}