use std::time::Duration;
use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE};
use reqwest::Body;
use reqwest_retry::policies::ExponentialBackoff;
use reqwest_retry::RetryTransientMiddleware;
use serde::{Deserialize, Serialize};
use tracing::{debug, info, warn};
use crate::quickwit::{LogDocument, QuickwitError};
#[derive(Debug, Serialize)]
struct BulkAction {
index: BulkIndex,
}
#[derive(Debug, Serialize)]
struct BulkIndex {
#[serde(rename = "_index")]
index: String,
}
#[derive(Debug, Deserialize)]
struct BulkResponseItem {
index: Option<BulkItemResult>,
}
#[derive(Debug, Deserialize)]
struct BulkItemResult {
#[serde(rename = "_index")]
index: String,
#[serde(rename = "_id")]
id: String,
status: u16,
error: Option<BulkError>,
}
#[derive(Debug, Deserialize)]
struct BulkError {
#[serde(rename = "type")]
error_type: String,
reason: String,
}
#[derive(Debug, Deserialize)]
struct BulkResponse {
took: u64,
errors: bool,
items: Vec<BulkResponseItem>,
}
#[derive(Debug, Clone)]
pub struct QuickwitEsBulkSink {
client: reqwest_middleware::ClientWithMiddleware,
endpoint: String,
index_id: String,
}
impl QuickwitEsBulkSink {
pub fn new(endpoint: String, index_id: String) -> Self {
let retry_policy = ExponentialBackoff::builder()
.base(2)
.retry_bounds(Duration::from_millis(100), Duration::from_secs(10))
.build_with_max_retries(3);
let client = reqwest_middleware::ClientBuilder::new(reqwest::Client::new())
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build();
Self {
client,
endpoint,
index_id,
}
}
fn bulk_url(&self) -> String {
format!(
"{}/api/v1/_elastic/{}/_bulk?refresh=true",
self.endpoint, self.index_id
)
}
pub async fn send_batch(&self, documents: Vec<LogDocument>) -> Result<usize, QuickwitError> {
if documents.is_empty() {
return Ok(0);
}
let body = self.build_bulk_body(&documents).map_err(|e| {
QuickwitError::SerializationError(format!("failed to build bulk body: {}", e))
})?;
let mut headers = HeaderMap::new();
headers.insert(
CONTENT_TYPE,
HeaderValue::from_static("application/x-ndjson"),
);
let url = self.bulk_url();
debug!(
url = %url,
doc_count = documents.len(),
body_bytes = body.len(),
"sending ES bulk request"
);
let response = self
.client
.post(&url)
.headers(headers)
.body(Body::from(body))
.send()
.await
.map_err(|e| QuickwitError::HttpError(format!("request failed: {}", e)))?;
let status = response.status();
let response_body = response
.text()
.await
.map_err(|e| QuickwitError::HttpError(format!("failed to read response: {}", e)))?;
if !status.is_success() {
warn!(
status = %status,
body = %response_body,
"ES bulk request failed"
);
return Err(QuickwitError::HttpError(format!(
"HTTP {}: {}",
status, response_body
)));
}
let bulk_response: BulkResponse = serde_json::from_str(&response_body).map_err(|e| {
QuickwitError::SerializationError(format!(
"failed to parse bulk response: {} (body: {})",
e, response_body
))
})?;
let success_count = bulk_response
.items
.iter()
.filter(|item| {
item.index
.as_ref()
.map(|idx| idx.status >= 200 && idx.status < 300)
.unwrap_or(false)
})
.count();
let error_count = documents.len() - success_count;
if bulk_response.errors {
warn!(
success = success_count,
errors = error_count,
took_ms = bulk_response.took,
"ES bulk completed with errors"
);
} else {
info!(
success = success_count,
took_ms = bulk_response.took,
"ES bulk completed successfully"
);
}
if error_count > 0 {
for item in &bulk_response.items {
if let Some(idx) = &item.index {
if idx.status >= 400 {
if let Some(error) = &idx.error {
warn!(
index = %idx.index,
id = %idx.id,
status = idx.status,
error_type = %error.error_type,
reason = %error.reason,
"bulk item failed"
);
}
}
}
}
}
Ok(success_count)
}
fn build_bulk_body(&self, documents: &[LogDocument]) -> Result<String, serde_json::Error> {
let mut body = String::new();
for doc in documents {
let action = BulkAction {
index: BulkIndex {
index: self.index_id.clone(),
},
};
let action_json = serde_json::to_string(&action)?;
body.push_str(&action_json);
body.push('\n');
let doc_json = serde_json::to_string(doc)?;
body.push_str(&doc_json);
body.push('\n');
}
Ok(body)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::quickwit::LogDocument;
#[test]
fn test_build_bulk_body() {
let sink = QuickwitEsBulkSink::new(
"http://localhost:7280".to_string(),
"test-index".to_string(),
);
let docs = vec![LogDocument {
timestamp: "2026-05-11T10:00:00Z".to_string(),
project_id: "test".to_string(),
level: "INFO".to_string(),
agent_name: "test-agent".to_string(),
layer: "Core".to_string(),
source: "orchestrator".to_string(),
message: "test message".to_string(),
..Default::default()
}];
let body = sink.build_bulk_body(&docs).unwrap();
assert!(body.contains("\"index\""));
assert!(body.contains("\"test-index\""));
assert!(body.contains("test-agent"));
}
#[test]
fn test_bulk_url() {
let sink =
QuickwitEsBulkSink::new("http://localhost:7280".to_string(), "adf-logs".to_string());
assert_eq!(
sink.bulk_url(),
"http://localhost:7280/api/v1/_elastic/adf-logs/_bulk?refresh=true"
);
}
}