mod common;
use common::BETTER_STACK_ACCEPTED_STATUS;
use futures::executor::block_on;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::{debug, error, info, trace, warn};
use tracing_better_stack::{BetterStackConfig, BetterStackLayer};
use tracing_subscriber::prelude::*;
use wiremock::matchers::{bearer_token, method, path};
use wiremock::{Mock, MockServer, Request, ResponseTemplate};
const HTTP_INTERNAL_SERVER_ERROR: u16 = 500;
#[tokio::test]
async fn test_basic_logging() {
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/"))
.and(bearer_token("test-token"))
.respond_with(ResponseTemplate::new(BETTER_STACK_ACCEPTED_STATUS))
.expect(1)
.mount(&mock_server)
.await;
let layer = BetterStackLayer::new(
BetterStackConfig::builder(common::mock_host(&mock_server), "test-token")
.batch_size(3)
.batch_timeout(Duration::from_millis(100))
.build(),
);
let subscriber = tracing_subscriber::registry().with(layer);
tracing::subscriber::with_default(subscriber, || {
info!("Test message 1");
warn!("Test message 2");
error!("Test message 3");
});
tokio::time::sleep(Duration::from_millis(200)).await;
mock_server.verify().await;
}
#[tokio::test]
async fn test_batching_behavior() {
let mock_server = MockServer::start().await;
let (tx, mut rx) = mpsc::channel::<()>(10);
let tx = Arc::new(tx);
let tx_clone = tx.clone();
Mock::given(method("POST"))
.and(path("/"))
.and(bearer_token("test-token"))
.respond_with(move |_req: &Request| {
let tx = tx_clone.clone();
tokio::spawn(async move {
let _ = tx.send(()).await;
});
ResponseTemplate::new(BETTER_STACK_ACCEPTED_STATUS)
})
.expect(2) .mount(&mock_server)
.await;
let layer = BetterStackLayer::new(
BetterStackConfig::builder(common::mock_host(&mock_server), "test-token")
.batch_size(2)
.batch_timeout(Duration::from_millis(5000)) .build(),
);
let subscriber = tracing_subscriber::registry().with(layer);
let _guard = tracing::subscriber::set_default(subscriber);
info!("Message 1");
info!("Message 2");
tokio::time::timeout(Duration::from_millis(500), rx.recv())
.await
.expect("First batch timeout")
.expect("First batch not received");
info!("Message 3");
info!("Message 4");
tokio::time::timeout(Duration::from_millis(500), rx.recv())
.await
.expect("Second batch timeout")
.expect("Second batch not received");
mock_server.verify().await;
}
#[tokio::test]
async fn test_timeout_batching() {
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/"))
.and(bearer_token("test-token"))
.respond_with(ResponseTemplate::new(BETTER_STACK_ACCEPTED_STATUS))
.expect(1)
.mount(&mock_server)
.await;
let layer = BetterStackLayer::new(
BetterStackConfig::builder(common::mock_host(&mock_server), "test-token")
.batch_size(100) .batch_timeout(Duration::from_millis(200))
.build(),
);
let subscriber = tracing_subscriber::registry().with(layer);
tracing::subscriber::with_default(subscriber, || {
info!("Single message");
});
tokio::time::sleep(Duration::from_millis(400)).await;
mock_server.verify().await;
}
#[tokio::test]
async fn test_retry_on_failure() {
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/"))
.and(bearer_token("test-token"))
.respond_with(ResponseTemplate::new(HTTP_INTERNAL_SERVER_ERROR))
.expect(3) .mount(&mock_server)
.await;
let layer = BetterStackLayer::new(
BetterStackConfig::builder(common::mock_host(&mock_server), "test-token")
.batch_size(1)
.max_retries(2)
.initial_retry_delay(Duration::from_millis(50))
.build(),
);
let subscriber = tracing_subscriber::registry().with(layer);
tracing::subscriber::with_default(subscriber, || {
info!("Test message");
});
tokio::time::sleep(Duration::from_millis(300)).await;
mock_server.verify().await;
}
#[tokio::test]
async fn test_retry_with_eventual_success() {
let mock_server = MockServer::start().await;
let attempt_count = Arc::new(tokio::sync::Mutex::new(0));
let attempt_count_clone = attempt_count.clone();
Mock::given(method("POST"))
.and(path("/"))
.and(bearer_token("test-token"))
.respond_with(move |_req: &Request| {
let count = attempt_count_clone.clone();
let mut c = block_on(count.lock());
*c += 1;
if *c <= 2 {
ResponseTemplate::new(HTTP_INTERNAL_SERVER_ERROR)
} else {
ResponseTemplate::new(BETTER_STACK_ACCEPTED_STATUS)
}
})
.expect(3) .mount(&mock_server)
.await;
let layer = BetterStackLayer::new(
BetterStackConfig::builder(common::mock_host(&mock_server), "test-token")
.batch_size(1)
.max_retries(3)
.initial_retry_delay(Duration::from_millis(50))
.build(),
);
let subscriber = tracing_subscriber::registry().with(layer);
tracing::subscriber::with_default(subscriber, || {
info!("Test message that eventually succeeds");
});
tokio::time::sleep(Duration::from_millis(300)).await;
mock_server.verify().await;
let final_count = *attempt_count.lock().await;
assert_eq!(final_count, 3, "Should have made exactly 3 attempts");
}
#[tokio::test]
async fn test_concurrent_logging() {
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/"))
.and(bearer_token("test-token"))
.respond_with(ResponseTemplate::new(BETTER_STACK_ACCEPTED_STATUS))
.expect(3..) .mount(&mock_server)
.await;
let layer = BetterStackLayer::new(
BetterStackConfig::builder(common::mock_host(&mock_server), "test-token")
.batch_size(10)
.batch_timeout(Duration::from_millis(100))
.build(),
);
let _subscriber = tracing::subscriber::set_default(tracing_subscriber::registry().with(layer));
let handles: Vec<_> = (0..5)
.map(|thread_id| {
tokio::spawn(async move {
for i in 0..5 {
info!(thread = thread_id, iter = i, "Concurrent log");
tokio::time::sleep(Duration::from_micros(100)).await;
}
})
})
.collect();
for handle in handles {
handle.await.unwrap();
}
tokio::time::sleep(Duration::from_millis(200)).await;
mock_server.verify().await;
}
#[tokio::test]
async fn test_graceful_shutdown() {
let mock_server = MockServer::start().await;
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
Mock::given(method("POST"))
.and(path("/"))
.and(bearer_token("test-token"))
.respond_with(move |_req: &Request| {
let tx = shutdown_tx.clone();
tokio::spawn(async move {
let _ = tx.send(()).await;
});
ResponseTemplate::new(BETTER_STACK_ACCEPTED_STATUS)
})
.expect(1)
.mount(&mock_server)
.await;
{
let layer = BetterStackLayer::new(
BetterStackConfig::builder(common::mock_host(&mock_server), "test-token")
.batch_size(10) .batch_timeout(Duration::from_secs(10))
.build(),
);
let subscriber = tracing_subscriber::registry().with(layer);
tracing::subscriber::with_default(subscriber, || {
info!("Log before shutdown");
warn!("This should be sent on drop");
});
}
tokio::time::timeout(Duration::from_millis(500), shutdown_rx.recv())
.await
.expect("Shutdown flush timeout")
.expect("Shutdown batch not received");
mock_server.verify().await;
}
#[tokio::test]
async fn test_configuration_validation() {
let config = BetterStackConfig::builder("", "token")
.batch_size(1)
.build();
assert!(config.ingesting_host.is_empty());
let config = BetterStackConfig::builder("host", "").build();
assert!(config.source_token.is_empty());
let config = BetterStackConfig::builder("host", "token")
.batch_size(1)
.max_retries(0)
.build();
assert!(config.batch_size >= 1, "Batch size should be at least 1");
}
#[tokio::test]
async fn test_large_payload_handling() {
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/"))
.and(bearer_token("test-token"))
.respond_with(ResponseTemplate::new(BETTER_STACK_ACCEPTED_STATUS))
.expect(1)
.mount(&mock_server)
.await;
let layer = BetterStackLayer::new(
BetterStackConfig::builder(common::mock_host(&mock_server), "test-token")
.batch_size(5)
.batch_timeout(Duration::from_millis(100))
.build(),
);
let subscriber = tracing_subscriber::registry().with(layer);
tracing::subscriber::with_default(subscriber, || {
let large_string = "x".repeat(10000);
info!(large_field = %large_string, "Large message");
for i in 0..4 {
let data = format!("data_{}", "y".repeat(1000));
warn!(id = i, data = %data, "Large field message");
}
});
tokio::time::sleep(Duration::from_millis(200)).await;
mock_server.verify().await;
}
#[tokio::test]
async fn test_different_log_levels() {
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/"))
.and(bearer_token("test-token"))
.respond_with(ResponseTemplate::new(BETTER_STACK_ACCEPTED_STATUS))
.expect(1)
.mount(&mock_server)
.await;
let layer = BetterStackLayer::new(
BetterStackConfig::builder(common::mock_host(&mock_server), "test-token")
.batch_size(5)
.batch_timeout(Duration::from_millis(100))
.build(),
);
let subscriber = tracing_subscriber::registry().with(layer);
tracing::subscriber::with_default(subscriber, || {
trace!("Trace level message");
debug!("Debug level message");
info!("Info level message");
warn!("Warning level message");
error!("Error level message");
});
tokio::time::sleep(Duration::from_millis(200)).await;
mock_server.verify().await;
}
#[tokio::test]
async fn test_message_ordering_preservation() {
let mock_server = MockServer::start().await;
let captured_bodies = Arc::new(tokio::sync::Mutex::new(Vec::new()));
let body_matcher = common::BodyCaptureMatcher {
captured: captured_bodies.clone(),
};
Mock::given(method("POST"))
.and(path("/"))
.and(bearer_token("test-token"))
.and(body_matcher)
.respond_with(ResponseTemplate::new(BETTER_STACK_ACCEPTED_STATUS))
.expect(1)
.mount(&mock_server)
.await;
let layer = BetterStackLayer::new(
BetterStackConfig::builder(common::mock_host(&mock_server), "test-token")
.batch_size(10)
.batch_timeout(Duration::from_millis(100))
.build(),
);
let subscriber = tracing_subscriber::registry().with(layer);
tracing::subscriber::with_default(subscriber, || {
for i in 0..10 {
info!(seq = i, "Ordered message {}", i);
}
});
tokio::time::sleep(Duration::from_millis(200)).await;
mock_server.verify().await;
let bodies = captured_bodies.lock().await;
assert_eq!(bodies.len(), 1, "Should have captured one batch");
#[cfg(feature = "json")]
{
let body = &bodies[0];
let json: serde_json::Value =
serde_json::from_slice(body).expect("Body should be valid JSON");
let batch_array = json.as_array().unwrap();
for (i, log) in batch_array.iter().enumerate() {
assert_eq!(log["fields"]["seq"], i as i64);
assert_eq!(log["message"], format!("Ordered message {}", i));
}
}
#[cfg(feature = "message_pack")]
{
let body = &bodies[0];
let value: serde_json::Value =
rmp_serde::from_slice(body).expect("Body should be valid MessagePack");
let batch_array = value.as_array().unwrap();
for (i, log) in batch_array.iter().enumerate() {
assert_eq!(log["fields"]["seq"], i as i64);
assert_eq!(log["message"], format!("Ordered message {}", i));
}
}
}