use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::{debug, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RequestLogEvent {
pub timestamp: DateTime<Utc>,
pub method: String,
pub path: String,
pub status: u16,
pub latency_ms: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub matched_route: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub client_ip: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub user_agent: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub request_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub bytes_in: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub bytes_out: Option<u64>,
}
#[derive(Debug, Serialize)]
struct IngestPayload<'a> {
events: &'a [RequestLogEvent],
}
#[derive(Clone)]
pub struct LogShipperHandle {
inner: Option<Arc<Inner>>,
}
struct Inner {
sender: mpsc::Sender<RequestLogEvent>,
}
impl LogShipperHandle {
pub fn disabled() -> Self {
Self { inner: None }
}
pub fn enqueue(&self, event: RequestLogEvent) {
if let Some(inner) = &self.inner {
let _ = inner.sender.try_send(event);
}
}
pub fn is_active(&self) -> bool {
self.inner.is_some()
}
}
pub fn from_env() -> LogShipperHandle {
let url = match std::env::var("MOCKFORGE_LOG_INGEST_URL") {
Ok(u) if !u.trim().is_empty() => u,
_ => return LogShipperHandle::disabled(),
};
let token = match std::env::var("MOCKFORGE_LOG_INGEST_TOKEN") {
Ok(t) if !t.trim().is_empty() => t,
_ => return LogShipperHandle::disabled(),
};
let batch_size: usize = std::env::var("MOCKFORGE_LOG_INGEST_BATCH_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(50);
let flush_ms: u64 = std::env::var("MOCKFORGE_LOG_INGEST_FLUSH_MS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2000);
let buffer: usize = std::env::var("MOCKFORGE_LOG_INGEST_BUFFER")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(1024);
let client = match reqwest::Client::builder().timeout(Duration::from_secs(5)).build() {
Ok(c) => c,
Err(e) => {
warn!("LogShipper HTTP client init failed: {}", e);
return LogShipperHandle::disabled();
}
};
let (sender, receiver) = mpsc::channel::<RequestLogEvent>(buffer);
let inner = Arc::new(Inner { sender });
tokio::spawn(run(receiver, client, url, token, batch_size, flush_ms));
LogShipperHandle { inner: Some(inner) }
}
async fn run(
mut receiver: mpsc::Receiver<RequestLogEvent>,
client: reqwest::Client,
url: String,
token: String,
batch_size: usize,
flush_ms: u64,
) {
let flush_after = Duration::from_millis(flush_ms);
let mut batch: Vec<RequestLogEvent> = Vec::with_capacity(batch_size);
let mut deadline = tokio::time::Instant::now() + flush_after;
loop {
let timeout = tokio::time::sleep_until(deadline);
tokio::pin!(timeout);
tokio::select! {
biased;
evt = receiver.recv() => {
match evt {
Some(e) => {
batch.push(e);
if batch.len() >= batch_size {
send_batch(&client, &url, &token, &batch).await;
batch.clear();
deadline = tokio::time::Instant::now() + flush_after;
}
}
None => {
if !batch.is_empty() {
send_batch(&client, &url, &token, &batch).await;
}
return;
}
}
}
_ = &mut timeout => {
if !batch.is_empty() {
send_batch(&client, &url, &token, &batch).await;
batch.clear();
}
deadline = tokio::time::Instant::now() + flush_after;
}
}
}
}
async fn send_batch(client: &reqwest::Client, url: &str, token: &str, events: &[RequestLogEvent]) {
let payload = IngestPayload { events };
debug!(count = events.len(), "Shipping log batch");
match client.post(url).bearer_auth(token).json(&payload).send().await {
Ok(resp) if resp.status().is_success() => {}
Ok(resp) => {
warn!(
status = %resp.status(),
count = events.len(),
"Log ingest non-success; dropping batch"
);
}
Err(e) => {
warn!(error = %e, count = events.len(), "Log ingest send failed; dropping batch");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn disabled_handle_is_inert() {
let h = LogShipperHandle::disabled();
assert!(!h.is_active());
h.enqueue(RequestLogEvent {
timestamp: Utc::now(),
method: "GET".into(),
path: "/test".into(),
status: 200,
latency_ms: 1,
matched_route: None,
client_ip: None,
user_agent: None,
request_id: None,
bytes_in: None,
bytes_out: None,
});
}
#[test]
fn from_env_returns_disabled_without_url_or_token() {
std::env::remove_var("MOCKFORGE_LOG_INGEST_URL");
std::env::remove_var("MOCKFORGE_LOG_INGEST_TOKEN");
assert!(!from_env().is_active());
}
}