1use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use std::collections::{HashMap, VecDeque};
6use std::sync::Arc;
7use tokio::sync::RwLock;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct RequestLogEntry {
12 pub id: String,
14 pub timestamp: DateTime<Utc>,
16 pub server_type: String,
18 pub method: String,
20 pub path: String,
22 pub status_code: u16,
24 pub response_time_ms: u64,
26 pub client_ip: Option<String>,
28 pub user_agent: Option<String>,
30 pub headers: HashMap<String, String>,
32 pub response_size_bytes: u64,
34 pub error_message: Option<String>,
36 pub metadata: HashMap<String, String>,
38}
39
40#[derive(Debug, Clone)]
42pub struct CentralizedRequestLogger {
43 logs: Arc<RwLock<VecDeque<RequestLogEntry>>>,
45 max_logs: usize,
47}
48
49impl Default for CentralizedRequestLogger {
50 fn default() -> Self {
51 Self::new(1000) }
53}
54
55impl CentralizedRequestLogger {
56 pub fn new(max_logs: usize) -> Self {
58 Self {
59 logs: Arc::new(RwLock::new(VecDeque::new())),
60 max_logs,
61 }
62 }
63
64 pub async fn log_request(&self, entry: RequestLogEntry) {
66 let mut logs = self.logs.write().await;
67
68 logs.push_front(entry);
70
71 while logs.len() > self.max_logs {
73 logs.pop_back();
74 }
75 }
76
77 pub async fn get_recent_logs(&self, limit: Option<usize>) -> Vec<RequestLogEntry> {
79 let logs = self.logs.read().await;
80 let take_count = limit.unwrap_or(logs.len()).min(logs.len());
81 logs.iter().take(take_count).cloned().collect()
82 }
83
84 pub async fn get_logs_by_server(
86 &self,
87 server_type: &str,
88 limit: Option<usize>,
89 ) -> Vec<RequestLogEntry> {
90 let logs = self.logs.read().await;
91 logs.iter()
92 .filter(|log| log.server_type == server_type)
93 .take(limit.unwrap_or(logs.len()))
94 .cloned()
95 .collect()
96 }
97
98 pub async fn get_request_counts_by_server(&self) -> HashMap<String, u64> {
100 let logs = self.logs.read().await;
101 let mut counts = HashMap::new();
102
103 for log in logs.iter() {
104 *counts.entry(log.server_type.clone()).or_insert(0) += 1;
105 }
106
107 counts
108 }
109
110 pub async fn clear_logs(&self) {
112 let mut logs = self.logs.write().await;
113 logs.clear();
114 }
115}
116
117static GLOBAL_LOGGER: once_cell::sync::OnceCell<CentralizedRequestLogger> =
119 once_cell::sync::OnceCell::new();
120
121pub fn init_global_logger(max_logs: usize) -> &'static CentralizedRequestLogger {
123 GLOBAL_LOGGER.get_or_init(|| CentralizedRequestLogger::new(max_logs))
124}
125
126pub fn get_global_logger() -> Option<&'static CentralizedRequestLogger> {
128 GLOBAL_LOGGER.get()
129}
130
131pub async fn log_request_global(entry: RequestLogEntry) {
133 if let Some(logger) = get_global_logger() {
134 logger.log_request(entry).await;
135 }
136}
137
138#[allow(clippy::too_many_arguments)]
140pub fn create_http_log_entry(
141 method: &str,
142 path: &str,
143 status_code: u16,
144 response_time_ms: u64,
145 client_ip: Option<String>,
146 user_agent: Option<String>,
147 headers: HashMap<String, String>,
148 response_size_bytes: u64,
149 error_message: Option<String>,
150) -> RequestLogEntry {
151 RequestLogEntry {
152 id: uuid::Uuid::new_v4().to_string(),
153 timestamp: Utc::now(),
154 server_type: "HTTP".to_string(),
155 method: method.to_string(),
156 path: path.to_string(),
157 status_code,
158 response_time_ms,
159 client_ip,
160 user_agent,
161 headers,
162 response_size_bytes,
163 error_message,
164 metadata: HashMap::new(),
165 }
166}
167
168pub fn create_websocket_log_entry(
170 event_type: &str, path: &str,
172 status_code: u16,
173 client_ip: Option<String>,
174 message_size_bytes: u64,
175 error_message: Option<String>,
176) -> RequestLogEntry {
177 let mut metadata = HashMap::new();
178 metadata.insert("event_type".to_string(), event_type.to_string());
179
180 RequestLogEntry {
181 id: uuid::Uuid::new_v4().to_string(),
182 timestamp: Utc::now(),
183 server_type: "WebSocket".to_string(),
184 method: event_type.to_uppercase(),
185 path: path.to_string(),
186 status_code,
187 response_time_ms: 0, client_ip,
189 user_agent: None,
190 headers: HashMap::new(),
191 response_size_bytes: message_size_bytes,
192 error_message,
193 metadata,
194 }
195}
196
197#[allow(clippy::too_many_arguments)]
199pub fn create_grpc_log_entry(
200 service: &str,
201 method: &str,
202 status_code: u16, response_time_ms: u64,
204 client_ip: Option<String>,
205 request_size_bytes: u64,
206 response_size_bytes: u64,
207 error_message: Option<String>,
208) -> RequestLogEntry {
209 let mut metadata = HashMap::new();
210 metadata.insert("service".to_string(), service.to_string());
211 metadata.insert("request_size_bytes".to_string(), request_size_bytes.to_string());
212
213 RequestLogEntry {
214 id: uuid::Uuid::new_v4().to_string(),
215 timestamp: Utc::now(),
216 server_type: "gRPC".to_string(),
217 method: format!("{}/{}", service, method),
218 path: format!("/{}/{}", service, method),
219 status_code,
220 response_time_ms,
221 client_ip,
222 user_agent: None,
223 headers: HashMap::new(),
224 response_size_bytes,
225 error_message,
226 metadata,
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233
234 fn create_test_entry(server_type: &str, method: &str) -> RequestLogEntry {
235 RequestLogEntry {
236 id: uuid::Uuid::new_v4().to_string(),
237 timestamp: Utc::now(),
238 server_type: server_type.to_string(),
239 method: method.to_string(),
240 path: "/test".to_string(),
241 status_code: 200,
242 response_time_ms: 100,
243 client_ip: Some("127.0.0.1".to_string()),
244 user_agent: Some("test-agent".to_string()),
245 headers: HashMap::new(),
246 response_size_bytes: 1024,
247 error_message: None,
248 metadata: HashMap::new(),
249 }
250 }
251
252 #[test]
253 fn test_centralized_logger_new() {
254 let logger = CentralizedRequestLogger::new(500);
255 assert_eq!(logger.max_logs, 500);
256 }
257
258 #[test]
259 fn test_centralized_logger_default() {
260 let logger = CentralizedRequestLogger::default();
261 assert_eq!(logger.max_logs, 1000);
262 }
263
264 #[tokio::test]
265 async fn test_log_request() {
266 let logger = CentralizedRequestLogger::new(10);
267 let entry = create_test_entry("HTTP", "GET");
268
269 logger.log_request(entry).await;
270
271 let logs = logger.get_recent_logs(None).await;
272 assert_eq!(logs.len(), 1);
273 assert_eq!(logs[0].method, "GET");
274 }
275
276 #[tokio::test]
277 async fn test_log_request_maintains_size_limit() {
278 let logger = CentralizedRequestLogger::new(5);
279
280 for i in 0..10 {
282 let mut entry = create_test_entry("HTTP", "GET");
283 entry.id = format!("entry-{}", i);
284 logger.log_request(entry).await;
285 }
286
287 let logs = logger.get_recent_logs(None).await;
288 assert_eq!(logs.len(), 5); }
290
291 #[tokio::test]
292 async fn test_get_recent_logs_with_limit() {
293 let logger = CentralizedRequestLogger::new(100);
294
295 for _ in 0..20 {
296 logger.log_request(create_test_entry("HTTP", "GET")).await;
297 }
298
299 let logs = logger.get_recent_logs(Some(10)).await;
300 assert_eq!(logs.len(), 10);
301 }
302
303 #[tokio::test]
304 async fn test_get_logs_by_server() {
305 let logger = CentralizedRequestLogger::new(100);
306
307 logger.log_request(create_test_entry("HTTP", "GET")).await;
308 logger.log_request(create_test_entry("HTTP", "POST")).await;
309 logger.log_request(create_test_entry("WebSocket", "CONNECT")).await;
310 logger.log_request(create_test_entry("gRPC", "Call")).await;
311
312 let http_logs = logger.get_logs_by_server("HTTP", None).await;
313 assert_eq!(http_logs.len(), 2);
314
315 let ws_logs = logger.get_logs_by_server("WebSocket", None).await;
316 assert_eq!(ws_logs.len(), 1);
317
318 let grpc_logs = logger.get_logs_by_server("gRPC", None).await;
319 assert_eq!(grpc_logs.len(), 1);
320 }
321
322 #[tokio::test]
323 async fn test_get_request_counts_by_server() {
324 let logger = CentralizedRequestLogger::new(100);
325
326 logger.log_request(create_test_entry("HTTP", "GET")).await;
327 logger.log_request(create_test_entry("HTTP", "POST")).await;
328 logger.log_request(create_test_entry("HTTP", "PUT")).await;
329 logger.log_request(create_test_entry("WebSocket", "CONNECT")).await;
330 logger.log_request(create_test_entry("gRPC", "Call")).await;
331 logger.log_request(create_test_entry("gRPC", "Stream")).await;
332
333 let counts = logger.get_request_counts_by_server().await;
334
335 assert_eq!(counts.get("HTTP"), Some(&3));
336 assert_eq!(counts.get("WebSocket"), Some(&1));
337 assert_eq!(counts.get("gRPC"), Some(&2));
338 }
339
340 #[tokio::test]
341 async fn test_clear_logs() {
342 let logger = CentralizedRequestLogger::new(100);
343
344 logger.log_request(create_test_entry("HTTP", "GET")).await;
345 logger.log_request(create_test_entry("HTTP", "POST")).await;
346
347 let logs = logger.get_recent_logs(None).await;
348 assert_eq!(logs.len(), 2);
349
350 logger.clear_logs().await;
351
352 let logs = logger.get_recent_logs(None).await;
353 assert_eq!(logs.len(), 0);
354 }
355
356 #[test]
357 fn test_create_http_log_entry() {
358 let mut headers = HashMap::new();
359 headers.insert("Content-Type".to_string(), "application/json".to_string());
360
361 let entry = create_http_log_entry(
362 "POST",
363 "/api/test",
364 201,
365 150,
366 Some("192.168.1.1".to_string()),
367 Some("Mozilla/5.0".to_string()),
368 headers.clone(),
369 2048,
370 None,
371 );
372
373 assert_eq!(entry.server_type, "HTTP");
374 assert_eq!(entry.method, "POST");
375 assert_eq!(entry.path, "/api/test");
376 assert_eq!(entry.status_code, 201);
377 assert_eq!(entry.response_time_ms, 150);
378 assert_eq!(entry.response_size_bytes, 2048);
379 assert_eq!(entry.client_ip, Some("192.168.1.1".to_string()));
380 assert_eq!(entry.user_agent, Some("Mozilla/5.0".to_string()));
381 assert_eq!(entry.headers.get("Content-Type"), Some(&"application/json".to_string()));
382 assert!(entry.error_message.is_none());
383 }
384
385 #[test]
386 fn test_create_websocket_log_entry() {
387 let entry = create_websocket_log_entry(
388 "connect",
389 "/ws/chat",
390 101,
391 Some("10.0.0.1".to_string()),
392 0,
393 None,
394 );
395
396 assert_eq!(entry.server_type, "WebSocket");
397 assert_eq!(entry.method, "CONNECT");
398 assert_eq!(entry.path, "/ws/chat");
399 assert_eq!(entry.status_code, 101);
400 assert_eq!(entry.response_time_ms, 0);
401 assert_eq!(entry.metadata.get("event_type"), Some(&"connect".to_string()));
402 }
403
404 #[test]
405 fn test_create_grpc_log_entry() {
406 let entry = create_grpc_log_entry(
407 "UserService",
408 "GetUser",
409 0, 50,
411 Some("172.16.0.1".to_string()),
412 128,
413 512,
414 None,
415 );
416
417 assert_eq!(entry.server_type, "gRPC");
418 assert_eq!(entry.method, "UserService/GetUser");
419 assert_eq!(entry.path, "/UserService/GetUser");
420 assert_eq!(entry.status_code, 0);
421 assert_eq!(entry.response_time_ms, 50);
422 assert_eq!(entry.response_size_bytes, 512);
423 assert_eq!(entry.metadata.get("service"), Some(&"UserService".to_string()));
424 assert_eq!(entry.metadata.get("request_size_bytes"), Some(&"128".to_string()));
425 }
426
427 #[test]
428 fn test_request_log_entry_with_error() {
429 let entry = create_http_log_entry(
430 "GET",
431 "/api/error",
432 500,
433 200,
434 None,
435 None,
436 HashMap::new(),
437 0,
438 Some("Internal server error".to_string()),
439 );
440
441 assert_eq!(entry.status_code, 500);
442 assert_eq!(entry.error_message, Some("Internal server error".to_string()));
443 }
444}