mockforge_observability/
log_shipper.rs1use chrono::{DateTime, Utc};
28use serde::{Deserialize, Serialize};
29use std::sync::Arc;
30use std::time::Duration;
31use tokio::sync::mpsc;
32use tracing::{debug, warn};
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct RequestLogEvent {
38 pub timestamp: DateTime<Utc>,
39 pub method: String,
40 pub path: String,
41 pub status: u16,
42 pub latency_ms: u32,
43 #[serde(skip_serializing_if = "Option::is_none")]
44 pub matched_route: Option<String>,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub client_ip: Option<String>,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 pub user_agent: Option<String>,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub request_id: Option<String>,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub bytes_in: Option<u64>,
53 #[serde(skip_serializing_if = "Option::is_none")]
54 pub bytes_out: Option<u64>,
55}
56
57#[derive(Debug, Serialize)]
58struct IngestPayload<'a> {
59 events: &'a [RequestLogEvent],
60}
61
62#[derive(Clone)]
66pub struct LogShipperHandle {
67 inner: Option<Arc<Inner>>,
68}
69
70struct Inner {
71 sender: mpsc::Sender<RequestLogEvent>,
72}
73
74impl LogShipperHandle {
75 pub fn disabled() -> Self {
78 Self { inner: None }
79 }
80
81 pub fn enqueue(&self, event: RequestLogEvent) {
85 if let Some(inner) = &self.inner {
86 let _ = inner.sender.try_send(event);
87 }
88 }
89
90 pub fn is_active(&self) -> bool {
93 self.inner.is_some()
94 }
95}
96
97pub fn from_env() -> LogShipperHandle {
102 let url = match std::env::var("MOCKFORGE_LOG_INGEST_URL") {
103 Ok(u) if !u.trim().is_empty() => u,
104 _ => return LogShipperHandle::disabled(),
105 };
106 let token = match std::env::var("MOCKFORGE_LOG_INGEST_TOKEN") {
107 Ok(t) if !t.trim().is_empty() => t,
108 _ => return LogShipperHandle::disabled(),
109 };
110 let batch_size: usize = std::env::var("MOCKFORGE_LOG_INGEST_BATCH_SIZE")
111 .ok()
112 .and_then(|s| s.parse().ok())
113 .unwrap_or(50);
114 let flush_ms: u64 = std::env::var("MOCKFORGE_LOG_INGEST_FLUSH_MS")
115 .ok()
116 .and_then(|s| s.parse().ok())
117 .unwrap_or(2000);
118 let buffer: usize = std::env::var("MOCKFORGE_LOG_INGEST_BUFFER")
119 .ok()
120 .and_then(|s| s.parse().ok())
121 .unwrap_or(1024);
122
123 let client = match reqwest::Client::builder().timeout(Duration::from_secs(5)).build() {
124 Ok(c) => c,
125 Err(e) => {
126 warn!("LogShipper HTTP client init failed: {}", e);
127 return LogShipperHandle::disabled();
128 }
129 };
130
131 let (sender, receiver) = mpsc::channel::<RequestLogEvent>(buffer);
132 let inner = Arc::new(Inner { sender });
133
134 tokio::spawn(run(receiver, client, url, token, batch_size, flush_ms));
135
136 LogShipperHandle { inner: Some(inner) }
137}
138
139async fn run(
144 mut receiver: mpsc::Receiver<RequestLogEvent>,
145 client: reqwest::Client,
146 url: String,
147 token: String,
148 batch_size: usize,
149 flush_ms: u64,
150) {
151 let flush_after = Duration::from_millis(flush_ms);
152 let mut batch: Vec<RequestLogEvent> = Vec::with_capacity(batch_size);
153 let mut deadline = tokio::time::Instant::now() + flush_after;
154
155 loop {
156 let timeout = tokio::time::sleep_until(deadline);
157 tokio::pin!(timeout);
158
159 tokio::select! {
160 biased;
161 evt = receiver.recv() => {
162 match evt {
163 Some(e) => {
164 batch.push(e);
165 if batch.len() >= batch_size {
166 send_batch(&client, &url, &token, &batch).await;
167 batch.clear();
168 deadline = tokio::time::Instant::now() + flush_after;
169 }
170 }
171 None => {
172 if !batch.is_empty() {
175 send_batch(&client, &url, &token, &batch).await;
176 }
177 return;
178 }
179 }
180 }
181 _ = &mut timeout => {
182 if !batch.is_empty() {
183 send_batch(&client, &url, &token, &batch).await;
184 batch.clear();
185 }
186 deadline = tokio::time::Instant::now() + flush_after;
187 }
188 }
189 }
190}
191
192async fn send_batch(client: &reqwest::Client, url: &str, token: &str, events: &[RequestLogEvent]) {
193 let payload = IngestPayload { events };
194 debug!(count = events.len(), "Shipping log batch");
195 match client.post(url).bearer_auth(token).json(&payload).send().await {
196 Ok(resp) if resp.status().is_success() => {}
197 Ok(resp) => {
198 warn!(
199 status = %resp.status(),
200 count = events.len(),
201 "Log ingest non-success; dropping batch"
202 );
203 }
204 Err(e) => {
205 warn!(error = %e, count = events.len(), "Log ingest send failed; dropping batch");
206 }
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213
214 #[test]
215 fn disabled_handle_is_inert() {
216 let h = LogShipperHandle::disabled();
217 assert!(!h.is_active());
218 h.enqueue(RequestLogEvent {
220 timestamp: Utc::now(),
221 method: "GET".into(),
222 path: "/test".into(),
223 status: 200,
224 latency_ms: 1,
225 matched_route: None,
226 client_ip: None,
227 user_agent: None,
228 request_id: None,
229 bytes_in: None,
230 bytes_out: None,
231 });
232 }
233
234 #[test]
235 fn from_env_returns_disabled_without_url_or_token() {
236 std::env::remove_var("MOCKFORGE_LOG_INGEST_URL");
237 std::env::remove_var("MOCKFORGE_LOG_INGEST_TOKEN");
238 assert!(!from_env().is_active());
239 }
240}