1use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use std::collections::{HashMap, VecDeque};
6use std::sync::Arc;
7use tokio::sync::RwLock;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct RequestLogEntry {
12 pub id: String,
14 pub timestamp: DateTime<Utc>,
16 pub server_type: String,
18 pub method: String,
20 pub path: String,
22 pub status_code: u16,
24 pub response_time_ms: u64,
26 pub client_ip: Option<String>,
28 pub user_agent: Option<String>,
30 pub headers: HashMap<String, String>,
32 pub response_size_bytes: u64,
34 pub error_message: Option<String>,
36 pub metadata: HashMap<String, String>,
38}
39
40#[derive(Debug, Clone)]
42pub struct CentralizedRequestLogger {
43 logs: Arc<RwLock<VecDeque<RequestLogEntry>>>,
45 max_logs: usize,
47}
48
49impl Default for CentralizedRequestLogger {
50 fn default() -> Self {
51 Self::new(1000) }
53}
54
55impl CentralizedRequestLogger {
56 pub fn new(max_logs: usize) -> Self {
58 Self {
59 logs: Arc::new(RwLock::new(VecDeque::new())),
60 max_logs,
61 }
62 }
63
64 pub async fn log_request(&self, entry: RequestLogEntry) {
66 let mut logs = self.logs.write().await;
67
68 logs.push_front(entry);
70
71 while logs.len() > self.max_logs {
73 logs.pop_back();
74 }
75 }
76
77 pub async fn get_recent_logs(&self, limit: Option<usize>) -> Vec<RequestLogEntry> {
79 let logs = self.logs.read().await;
80 let take_count = limit.unwrap_or(logs.len()).min(logs.len());
81 logs.iter().take(take_count).cloned().collect()
82 }
83
84 pub async fn get_logs_by_server(
86 &self,
87 server_type: &str,
88 limit: Option<usize>,
89 ) -> Vec<RequestLogEntry> {
90 let logs = self.logs.read().await;
91 logs.iter()
92 .filter(|log| log.server_type == server_type)
93 .take(limit.unwrap_or(logs.len()))
94 .cloned()
95 .collect()
96 }
97
98 pub async fn get_request_counts_by_server(&self) -> HashMap<String, u64> {
100 let logs = self.logs.read().await;
101 let mut counts = HashMap::new();
102
103 for log in logs.iter() {
104 *counts.entry(log.server_type.clone()).or_insert(0) += 1;
105 }
106
107 counts
108 }
109
110 pub async fn clear_logs(&self) {
112 let mut logs = self.logs.write().await;
113 logs.clear();
114 }
115
116 pub async fn find_matching_requests(
122 &self,
123 pattern: &crate::verification::VerificationRequest,
124 ) -> Vec<RequestLogEntry> {
125 let logs = self.logs.read().await;
126 logs.iter()
127 .filter(|entry| crate::verification::matches_verification_pattern(entry, pattern))
128 .cloned()
129 .collect()
130 }
131
132 pub async fn count_matching_requests(
138 &self,
139 pattern: &crate::verification::VerificationRequest,
140 ) -> usize {
141 let logs = self.logs.read().await;
142 logs.iter()
143 .filter(|entry| crate::verification::matches_verification_pattern(entry, pattern))
144 .count()
145 }
146
147 pub async fn get_request_sequence(
153 &self,
154 patterns: &[crate::verification::VerificationRequest],
155 ) -> Vec<RequestLogEntry> {
156 let logs = self.logs.read().await;
157 let mut log_idx = 0;
158 let mut all_matches = Vec::new();
159
160 for pattern in patterns {
161 let mut found = false;
163 while log_idx < logs.len() {
164 if crate::verification::matches_verification_pattern(&logs[log_idx], pattern) {
165 all_matches.push(logs[log_idx].clone());
166 log_idx += 1;
167 found = true;
168 break;
169 }
170 log_idx += 1;
171 }
172
173 if !found {
174 break;
176 }
177 }
178
179 all_matches
180 }
181}
182
183static GLOBAL_LOGGER: once_cell::sync::OnceCell<CentralizedRequestLogger> =
185 once_cell::sync::OnceCell::new();
186
187pub fn init_global_logger(max_logs: usize) -> &'static CentralizedRequestLogger {
189 GLOBAL_LOGGER.get_or_init(|| CentralizedRequestLogger::new(max_logs))
190}
191
192pub fn get_global_logger() -> Option<&'static CentralizedRequestLogger> {
194 GLOBAL_LOGGER.get()
195}
196
197pub async fn log_request_global(entry: RequestLogEntry) {
199 if let Some(logger) = get_global_logger() {
200 logger.log_request(entry).await;
201 }
202}
203
204#[allow(clippy::too_many_arguments)]
206pub fn create_http_log_entry(
207 method: &str,
208 path: &str,
209 status_code: u16,
210 response_time_ms: u64,
211 client_ip: Option<String>,
212 user_agent: Option<String>,
213 headers: HashMap<String, String>,
214 response_size_bytes: u64,
215 error_message: Option<String>,
216) -> RequestLogEntry {
217 RequestLogEntry {
218 id: uuid::Uuid::new_v4().to_string(),
219 timestamp: Utc::now(),
220 server_type: "HTTP".to_string(),
221 method: method.to_string(),
222 path: path.to_string(),
223 status_code,
224 response_time_ms,
225 client_ip,
226 user_agent,
227 headers,
228 response_size_bytes,
229 error_message,
230 metadata: HashMap::new(),
231 }
232}
233
234pub fn create_websocket_log_entry(
236 event_type: &str, path: &str,
238 status_code: u16,
239 client_ip: Option<String>,
240 message_size_bytes: u64,
241 error_message: Option<String>,
242) -> RequestLogEntry {
243 let mut metadata = HashMap::new();
244 metadata.insert("event_type".to_string(), event_type.to_string());
245
246 RequestLogEntry {
247 id: uuid::Uuid::new_v4().to_string(),
248 timestamp: Utc::now(),
249 server_type: "WebSocket".to_string(),
250 method: event_type.to_uppercase(),
251 path: path.to_string(),
252 status_code,
253 response_time_ms: 0, client_ip,
255 user_agent: None,
256 headers: HashMap::new(),
257 response_size_bytes: message_size_bytes,
258 error_message,
259 metadata,
260 }
261}
262
263#[allow(clippy::too_many_arguments)]
265pub fn create_grpc_log_entry(
266 service: &str,
267 method: &str,
268 status_code: u16, response_time_ms: u64,
270 client_ip: Option<String>,
271 request_size_bytes: u64,
272 response_size_bytes: u64,
273 error_message: Option<String>,
274) -> RequestLogEntry {
275 let mut metadata = HashMap::new();
276 metadata.insert("service".to_string(), service.to_string());
277 metadata.insert("request_size_bytes".to_string(), request_size_bytes.to_string());
278
279 RequestLogEntry {
280 id: uuid::Uuid::new_v4().to_string(),
281 timestamp: Utc::now(),
282 server_type: "gRPC".to_string(),
283 method: format!("{}/{}", service, method),
284 path: format!("/{}/{}", service, method),
285 status_code,
286 response_time_ms,
287 client_ip,
288 user_agent: None,
289 headers: HashMap::new(),
290 response_size_bytes,
291 error_message,
292 metadata,
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299
300 fn create_test_entry(server_type: &str, method: &str) -> RequestLogEntry {
301 RequestLogEntry {
302 id: uuid::Uuid::new_v4().to_string(),
303 timestamp: Utc::now(),
304 server_type: server_type.to_string(),
305 method: method.to_string(),
306 path: "/test".to_string(),
307 status_code: 200,
308 response_time_ms: 100,
309 client_ip: Some("127.0.0.1".to_string()),
310 user_agent: Some("test-agent".to_string()),
311 headers: HashMap::new(),
312 response_size_bytes: 1024,
313 error_message: None,
314 metadata: HashMap::new(),
315 }
316 }
317
318 #[test]
319 fn test_centralized_logger_new() {
320 let logger = CentralizedRequestLogger::new(500);
321 assert_eq!(logger.max_logs, 500);
322 }
323
324 #[test]
325 fn test_centralized_logger_default() {
326 let logger = CentralizedRequestLogger::default();
327 assert_eq!(logger.max_logs, 1000);
328 }
329
330 #[tokio::test]
331 async fn test_log_request() {
332 let logger = CentralizedRequestLogger::new(10);
333 let entry = create_test_entry("HTTP", "GET");
334
335 logger.log_request(entry).await;
336
337 let logs = logger.get_recent_logs(None).await;
338 assert_eq!(logs.len(), 1);
339 assert_eq!(logs[0].method, "GET");
340 }
341
342 #[tokio::test]
343 async fn test_log_request_maintains_size_limit() {
344 let logger = CentralizedRequestLogger::new(5);
345
346 for i in 0..10 {
348 let mut entry = create_test_entry("HTTP", "GET");
349 entry.id = format!("entry-{}", i);
350 logger.log_request(entry).await;
351 }
352
353 let logs = logger.get_recent_logs(None).await;
354 assert_eq!(logs.len(), 5); }
356
357 #[tokio::test]
358 async fn test_get_recent_logs_with_limit() {
359 let logger = CentralizedRequestLogger::new(100);
360
361 for _ in 0..20 {
362 logger.log_request(create_test_entry("HTTP", "GET")).await;
363 }
364
365 let logs = logger.get_recent_logs(Some(10)).await;
366 assert_eq!(logs.len(), 10);
367 }
368
369 #[tokio::test]
370 async fn test_get_logs_by_server() {
371 let logger = CentralizedRequestLogger::new(100);
372
373 logger.log_request(create_test_entry("HTTP", "GET")).await;
374 logger.log_request(create_test_entry("HTTP", "POST")).await;
375 logger.log_request(create_test_entry("WebSocket", "CONNECT")).await;
376 logger.log_request(create_test_entry("gRPC", "Call")).await;
377
378 let http_logs = logger.get_logs_by_server("HTTP", None).await;
379 assert_eq!(http_logs.len(), 2);
380
381 let ws_logs = logger.get_logs_by_server("WebSocket", None).await;
382 assert_eq!(ws_logs.len(), 1);
383
384 let grpc_logs = logger.get_logs_by_server("gRPC", None).await;
385 assert_eq!(grpc_logs.len(), 1);
386 }
387
388 #[tokio::test]
389 async fn test_get_request_counts_by_server() {
390 let logger = CentralizedRequestLogger::new(100);
391
392 logger.log_request(create_test_entry("HTTP", "GET")).await;
393 logger.log_request(create_test_entry("HTTP", "POST")).await;
394 logger.log_request(create_test_entry("HTTP", "PUT")).await;
395 logger.log_request(create_test_entry("WebSocket", "CONNECT")).await;
396 logger.log_request(create_test_entry("gRPC", "Call")).await;
397 logger.log_request(create_test_entry("gRPC", "Stream")).await;
398
399 let counts = logger.get_request_counts_by_server().await;
400
401 assert_eq!(counts.get("HTTP"), Some(&3));
402 assert_eq!(counts.get("WebSocket"), Some(&1));
403 assert_eq!(counts.get("gRPC"), Some(&2));
404 }
405
406 #[tokio::test]
407 async fn test_clear_logs() {
408 let logger = CentralizedRequestLogger::new(100);
409
410 logger.log_request(create_test_entry("HTTP", "GET")).await;
411 logger.log_request(create_test_entry("HTTP", "POST")).await;
412
413 let logs = logger.get_recent_logs(None).await;
414 assert_eq!(logs.len(), 2);
415
416 logger.clear_logs().await;
417
418 let logs = logger.get_recent_logs(None).await;
419 assert_eq!(logs.len(), 0);
420 }
421
422 #[test]
423 fn test_create_http_log_entry() {
424 let mut headers = HashMap::new();
425 headers.insert("Content-Type".to_string(), "application/json".to_string());
426
427 let entry = create_http_log_entry(
428 "POST",
429 "/api/test",
430 201,
431 150,
432 Some("192.168.1.1".to_string()),
433 Some("Mozilla/5.0".to_string()),
434 headers.clone(),
435 2048,
436 None,
437 );
438
439 assert_eq!(entry.server_type, "HTTP");
440 assert_eq!(entry.method, "POST");
441 assert_eq!(entry.path, "/api/test");
442 assert_eq!(entry.status_code, 201);
443 assert_eq!(entry.response_time_ms, 150);
444 assert_eq!(entry.response_size_bytes, 2048);
445 assert_eq!(entry.client_ip, Some("192.168.1.1".to_string()));
446 assert_eq!(entry.user_agent, Some("Mozilla/5.0".to_string()));
447 assert_eq!(entry.headers.get("Content-Type"), Some(&"application/json".to_string()));
448 assert!(entry.error_message.is_none());
449 }
450
451 #[test]
452 fn test_create_websocket_log_entry() {
453 let entry = create_websocket_log_entry(
454 "connect",
455 "/ws/chat",
456 101,
457 Some("10.0.0.1".to_string()),
458 0,
459 None,
460 );
461
462 assert_eq!(entry.server_type, "WebSocket");
463 assert_eq!(entry.method, "CONNECT");
464 assert_eq!(entry.path, "/ws/chat");
465 assert_eq!(entry.status_code, 101);
466 assert_eq!(entry.response_time_ms, 0);
467 assert_eq!(entry.metadata.get("event_type"), Some(&"connect".to_string()));
468 }
469
470 #[test]
471 fn test_create_grpc_log_entry() {
472 let entry = create_grpc_log_entry(
473 "UserService",
474 "GetUser",
475 0, 50,
477 Some("172.16.0.1".to_string()),
478 128,
479 512,
480 None,
481 );
482
483 assert_eq!(entry.server_type, "gRPC");
484 assert_eq!(entry.method, "UserService/GetUser");
485 assert_eq!(entry.path, "/UserService/GetUser");
486 assert_eq!(entry.status_code, 0);
487 assert_eq!(entry.response_time_ms, 50);
488 assert_eq!(entry.response_size_bytes, 512);
489 assert_eq!(entry.metadata.get("service"), Some(&"UserService".to_string()));
490 assert_eq!(entry.metadata.get("request_size_bytes"), Some(&"128".to_string()));
491 }
492
493 #[test]
494 fn test_request_log_entry_with_error() {
495 let entry = create_http_log_entry(
496 "GET",
497 "/api/error",
498 500,
499 200,
500 None,
501 None,
502 HashMap::new(),
503 0,
504 Some("Internal server error".to_string()),
505 );
506
507 assert_eq!(entry.status_code, 500);
508 assert_eq!(entry.error_message, Some("Internal server error".to_string()));
509 }
510}