use faucet_core::Sink;
use faucet_sink_http::{HttpBatchMode, HttpSink, HttpSinkConfig};
use serde_json::{Value, json};
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
fn make_records(n: usize) -> Vec<Value> {
(0..n).map(|i| json!({"id": i, "name": "row"})).collect()
}
async fn mock_server_with_success() -> MockServer {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/ingest"))
.respond_with(ResponseTemplate::new(200))
.mount(&server)
.await;
server
}
fn url(server: &MockServer) -> String {
format!("{}/ingest", server.uri())
}
#[tokio::test]
async fn array_mode_rechunks_into_batch_size_requests() {
let server = mock_server_with_success().await;
let config = HttpSinkConfig::new(url(&server))
.batch_mode(HttpBatchMode::Array)
.with_batch_size(500);
let sink = HttpSink::new(config);
let written = sink.write_batch(&make_records(1_500)).await.unwrap();
assert_eq!(written, 1_500);
let requests = server.received_requests().await.unwrap();
assert_eq!(
requests.len(),
3,
"expected exactly 3 POSTs (1500 records / batch_size=500)"
);
for req in &requests {
let body: Value = serde_json::from_slice(&req.body).unwrap();
let arr = body.as_array().expect("body is JSON array");
assert!(arr.len() <= 500);
}
}
#[tokio::test]
async fn array_mode_rechunks_uneven_remainder() {
let server = mock_server_with_success().await;
let config = HttpSinkConfig::new(url(&server))
.batch_mode(HttpBatchMode::Array)
.with_batch_size(500);
let sink = HttpSink::new(config);
sink.write_batch(&make_records(1_200)).await.unwrap();
let requests = server.received_requests().await.unwrap();
assert_eq!(requests.len(), 3);
let last_body: Value = serde_json::from_slice(&requests[2].body).unwrap();
assert_eq!(
last_body.as_array().unwrap().len(),
200,
"tail chunk must hold the remaining 200 records"
);
}
#[tokio::test]
async fn array_mode_sentinel_zero_sends_single_request() {
let server = mock_server_with_success().await;
let config = HttpSinkConfig::new(url(&server))
.batch_mode(HttpBatchMode::Array)
.with_batch_size(0);
let sink = HttpSink::new(config);
sink.write_batch(&make_records(2_500)).await.unwrap();
let requests = server.received_requests().await.unwrap();
assert_eq!(
requests.len(),
1,
"batch_size = 0 must forward the whole slice in a single POST"
);
let body: Value = serde_json::from_slice(&requests[0].body).unwrap();
assert_eq!(body.as_array().unwrap().len(), 2_500);
}
#[tokio::test]
async fn array_mode_smaller_than_batch_size_makes_one_request() {
let server = mock_server_with_success().await;
let config = HttpSinkConfig::new(url(&server))
.batch_mode(HttpBatchMode::Array)
.with_batch_size(500);
let sink = HttpSink::new(config);
sink.write_batch(&make_records(42)).await.unwrap();
let requests = server.received_requests().await.unwrap();
assert_eq!(requests.len(), 1);
}
#[tokio::test]
async fn array_mode_empty_records_makes_no_requests() {
let server = mock_server_with_success().await;
let config = HttpSinkConfig::new(url(&server))
.batch_mode(HttpBatchMode::Array)
.with_batch_size(500);
let sink = HttpSink::new(config);
let written = sink.write_batch(&[]).await.unwrap();
assert_eq!(written, 0);
let requests = server.received_requests().await.unwrap();
assert!(requests.is_empty());
}
#[tokio::test]
async fn individual_mode_ignores_batch_size_for_wire_framing() {
let server = mock_server_with_success().await;
let config = HttpSinkConfig::new(url(&server))
.batch_mode(HttpBatchMode::Individual)
.with_batch_size(500)
.concurrency(16);
let sink = HttpSink::new(config);
sink.write_batch(&make_records(10)).await.unwrap();
let requests = server.received_requests().await.unwrap();
assert_eq!(
requests.len(),
10,
"Individual mode sends one request per record regardless of batch_size"
);
}
#[tokio::test]
async fn individual_mode_does_not_deadlock_when_records_exceed_concurrency() {
let server = mock_server_with_success().await;
let config = HttpSinkConfig::new(url(&server))
.batch_mode(HttpBatchMode::Individual)
.concurrency(4);
let sink = HttpSink::new(config);
let result = tokio::time::timeout(
std::time::Duration::from_secs(30),
sink.write_batch(&make_records(100)),
)
.await
.expect("Individual mode write_batch deadlocked (regression of #59)");
result.expect("Individual mode write_batch returned an error");
let requests = server.received_requests().await.unwrap();
assert_eq!(requests.len(), 100);
}