use super::{HttpClient, LogtailError, RetryConfig};
use crate::r#struct::betterstack_log_schema::BetterStackLogSchema;
use crate::r#struct::env_config::EnvConfig;
use reqwest::header::{HeaderMap, HeaderValue};
use serde_json::Value;
use std::time::Duration;
pub async fn push_log(
client: &impl HttpClient,
config: &EnvConfig,
log: &BetterStackLogSchema,
) -> Result<Option<Value>, LogtailError> {
let logs_url = "https://in.logs.betterstack.com";
let bearer_header = bearer_headers(config);
let body = serde_json::to_value(log)?;
client.post_json(logs_url, &body, Some(bearer_header)).await
}
pub async fn push_log_with_retry(
client: &impl HttpClient,
config: &EnvConfig,
log: &BetterStackLogSchema,
retry_config: &RetryConfig,
) -> Result<Option<Value>, LogtailError> {
let mut last_err = None;
for attempt in 0..=retry_config.max_retries {
match push_log(client, config, log).await {
Ok(val) => return Ok(val),
Err(err) => {
if !err.is_retryable() || attempt == retry_config.max_retries {
return Err(err);
}
last_err = Some(err);
let base_ms = retry_config
.base_delay
.as_millis()
.saturating_mul(2u128.saturating_pow(attempt))
as u64;
let capped_ms = base_ms.min(retry_config.max_delay.as_millis() as u64);
let delay_ms = if retry_config.jitter && capped_ms > 0 {
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.subsec_nanos() as u64;
nanos % capped_ms
} else {
capped_ms
};
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
}
}
}
Err(last_err.unwrap_or_else(|| LogtailError::Http {
status: 500,
message: "retry exhausted".to_string(),
}))
}
fn bearer_headers(config: &EnvConfig) -> HeaderMap {
let logs_source_token = config.logs_source_token.as_str();
let bearer_value_str = format!("Bearer {}", logs_source_token);
let bearer_value = &bearer_value_str;
let mut headers = HeaderMap::new();
headers.insert(
"Authorization",
HeaderValue::from_str(bearer_value).unwrap(),
);
headers.insert(
"Content-Type",
HeaderValue::from_str("application/json").unwrap(),
);
headers
}
#[cfg(test)]
mod tests {
use super::*;
use crate::http_client::mock::MockHttpClient;
use crate::r#struct::env_config::{EnvConfig, EnvEnum};
use crate::r#struct::log_level::LogLevel;
use std::sync::atomic::Ordering;
fn test_config() -> EnvConfig {
EnvConfig::from_values(
"1.0.0".to_string(),
EnvEnum::QA,
"test-source-token".to_string(),
false,
)
}
fn test_log() -> BetterStackLogSchema {
BetterStackLogSchema {
env: EnvEnum::QA,
message: "test message".to_string(),
context: "test context".to_string(),
level: LogLevel::Info,
app_version: "1.0.0".to_string(),
}
}
#[tokio::test]
async fn calls_correct_url() {
let mock = MockHttpClient::with_success(None);
let _ = push_log(&mock, &test_config(), &test_log()).await;
let url = mock.captured_url.lock().unwrap().clone().unwrap();
assert_eq!(url, "https://in.logs.betterstack.com");
}
#[tokio::test]
async fn sends_bearer_header() {
let mock = MockHttpClient::with_success(None);
let _ = push_log(&mock, &test_config(), &test_log()).await;
let headers = mock.captured_headers.lock().unwrap().clone().unwrap();
assert_eq!(
headers.get("Authorization").unwrap().to_str().unwrap(),
"Bearer test-source-token"
);
}
#[tokio::test]
async fn sends_content_type_json() {
let mock = MockHttpClient::with_success(None);
let _ = push_log(&mock, &test_config(), &test_log()).await;
let headers = mock.captured_headers.lock().unwrap().clone().unwrap();
assert_eq!(
headers.get("Content-Type").unwrap().to_str().unwrap(),
"application/json"
);
}
#[tokio::test]
async fn sends_serialized_log_body() {
let mock = MockHttpClient::with_success(None);
let _ = push_log(&mock, &test_config(), &test_log()).await;
let body = mock.captured_body.lock().unwrap().clone().unwrap();
assert_eq!(body["message"], "test message");
assert_eq!(body["context"], "test context");
assert_eq!(body["level"], "Info");
assert_eq!(body["env"], "QA");
assert_eq!(body["app_version"], "1.0.0");
}
#[tokio::test]
async fn returns_some_on_success() {
let response = serde_json::json!({"status": "ok"});
let mock = MockHttpClient::with_success(Some(response.clone()));
let result = push_log(&mock, &test_config(), &test_log()).await;
assert_eq!(result.unwrap().unwrap(), response);
}
#[tokio::test]
async fn returns_error_on_failure() {
let mock = MockHttpClient::with_error("connection refused");
let result = push_log(&mock, &test_config(), &test_log()).await;
assert!(result.is_err());
}
#[tokio::test]
async fn returns_none_on_empty_body() {
let mock = MockHttpClient::with_success(None);
let result = push_log(&mock, &test_config(), &test_log()).await;
assert!(result.unwrap().is_none());
assert_eq!(mock.call_count.load(Ordering::SeqCst), 1);
}
}