1use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use std::collections::{HashMap, VecDeque};
6use std::sync::Arc;
7use tokio::sync::RwLock;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "lowercase")]
14pub enum RealityContinuumType {
15 Synthetic,
17 Blended,
19 Live,
21}
22
23impl RealityContinuumType {
24 pub fn from_blend_ratio(ratio: f64) -> Self {
26 if ratio <= 0.0 {
27 Self::Synthetic
28 } else if ratio >= 1.0 {
29 Self::Live
30 } else {
31 Self::Blended
32 }
33 }
34
35 pub fn name(&self) -> &'static str {
37 match self {
38 RealityContinuumType::Synthetic => "Synthetic",
39 RealityContinuumType::Blended => "Blended",
40 RealityContinuumType::Live => "Live",
41 }
42 }
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct DataSourceBreakdown {
48 #[serde(default)]
50 pub recorded_percent: f64,
51 #[serde(default)]
53 pub generator_percent: f64,
54 #[serde(default)]
56 pub upstream_percent: f64,
57}
58
59impl Default for DataSourceBreakdown {
60 fn default() -> Self {
61 Self {
62 recorded_percent: 0.0,
63 generator_percent: 100.0,
64 upstream_percent: 0.0,
65 }
66 }
67}
68
69impl DataSourceBreakdown {
70 pub fn from_blend_ratio(blend_ratio: f64, recorded_ratio: f64) -> Self {
75 let upstream = blend_ratio * (1.0 - recorded_ratio);
76 let generator = (1.0 - blend_ratio) * (1.0 - recorded_ratio);
77 let recorded = recorded_ratio;
78
79 Self {
80 recorded_percent: recorded * 100.0,
81 generator_percent: generator * 100.0,
82 upstream_percent: upstream * 100.0,
83 }
84 }
85
86 pub fn normalize(&mut self) {
88 let total = self.recorded_percent + self.generator_percent + self.upstream_percent;
89 if total > 0.0 {
90 self.recorded_percent = (self.recorded_percent / total) * 100.0;
91 self.generator_percent = (self.generator_percent / total) * 100.0;
92 self.upstream_percent = (self.upstream_percent / total) * 100.0;
93 }
94 }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct RealityTraceMetadata {
103 #[serde(skip_serializing_if = "Option::is_none")]
105 pub reality_level: Option<crate::reality::RealityLevel>,
106 pub reality_continuum_type: RealityContinuumType,
108 #[serde(default)]
110 pub blend_ratio: f64,
111 pub data_source_breakdown: DataSourceBreakdown,
113 #[serde(skip_serializing_if = "Option::is_none")]
115 pub active_persona_id: Option<String>,
116 #[serde(skip_serializing_if = "Option::is_none")]
118 pub active_scenario: Option<String>,
119 #[serde(default)]
121 pub active_chaos_profiles: Vec<String>,
122 #[serde(default)]
124 pub active_latency_profiles: Vec<String>,
125}
126
127impl Default for RealityTraceMetadata {
128 fn default() -> Self {
129 Self {
130 reality_level: None,
131 reality_continuum_type: RealityContinuumType::Synthetic,
132 blend_ratio: 0.0,
133 data_source_breakdown: DataSourceBreakdown::default(),
134 active_persona_id: None,
135 active_scenario: None,
136 active_chaos_profiles: Vec::new(),
137 active_latency_profiles: Vec::new(),
138 }
139 }
140}
141
142impl RealityTraceMetadata {
143 pub fn from_unified_state(
148 unified_state: &crate::consistency::types::UnifiedState,
149 blend_ratio: f64,
150 path: &str,
151 ) -> Self {
152 let reality_continuum_type = RealityContinuumType::from_blend_ratio(blend_ratio);
153
154 let active_chaos_profiles: Vec<String> = unified_state
156 .active_chaos_rules
157 .iter()
158 .filter_map(|r| r.get("name").and_then(|v| v.as_str()).map(|s| s.to_string()))
159 .collect();
160
161 let active_latency_profiles = Vec::new();
164
165 let mut breakdown = DataSourceBreakdown::from_blend_ratio(blend_ratio, 0.0);
168 breakdown.normalize();
169
170 Self {
171 reality_level: Some(unified_state.reality_level),
172 reality_continuum_type,
173 blend_ratio,
174 data_source_breakdown: breakdown,
175 active_persona_id: unified_state.active_persona.as_ref().map(|p| p.id.clone()),
176 active_scenario: unified_state.active_scenario.clone(),
177 active_chaos_profiles,
178 active_latency_profiles,
179 }
180 }
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize)]
185pub struct RequestLogEntry {
186 pub id: String,
188 pub timestamp: DateTime<Utc>,
190 pub server_type: String,
192 pub method: String,
194 pub path: String,
196 pub status_code: u16,
198 pub response_time_ms: u64,
200 pub client_ip: Option<String>,
202 pub user_agent: Option<String>,
204 pub headers: HashMap<String, String>,
206 pub response_size_bytes: u64,
208 pub error_message: Option<String>,
210 pub metadata: HashMap<String, String>,
212 #[serde(skip_serializing_if = "Option::is_none")]
217 pub reality_metadata: Option<RealityTraceMetadata>,
218}
219
220#[derive(Debug, Clone)]
222pub struct CentralizedRequestLogger {
223 logs: Arc<RwLock<VecDeque<RequestLogEntry>>>,
225 max_logs: usize,
227}
228
229impl Default for CentralizedRequestLogger {
230 fn default() -> Self {
231 Self::new(1000) }
233}
234
235impl CentralizedRequestLogger {
236 pub fn new(max_logs: usize) -> Self {
238 Self {
239 logs: Arc::new(RwLock::new(VecDeque::new())),
240 max_logs,
241 }
242 }
243
244 pub async fn log_request(&self, entry: RequestLogEntry) {
246 let mut logs = self.logs.write().await;
247
248 logs.push_front(entry);
250
251 while logs.len() > self.max_logs {
253 logs.pop_back();
254 }
255 }
256
257 pub async fn get_recent_logs(&self, limit: Option<usize>) -> Vec<RequestLogEntry> {
259 let logs = self.logs.read().await;
260 let take_count = limit.unwrap_or(logs.len()).min(logs.len());
261 logs.iter().take(take_count).cloned().collect()
262 }
263
264 pub async fn get_logs_by_server(
266 &self,
267 server_type: &str,
268 limit: Option<usize>,
269 ) -> Vec<RequestLogEntry> {
270 let logs = self.logs.read().await;
271 logs.iter()
272 .filter(|log| log.server_type == server_type)
273 .take(limit.unwrap_or(logs.len()))
274 .cloned()
275 .collect()
276 }
277
278 pub async fn get_request_counts_by_server(&self) -> HashMap<String, u64> {
280 let logs = self.logs.read().await;
281 let mut counts = HashMap::new();
282
283 for log in logs.iter() {
284 *counts.entry(log.server_type.clone()).or_insert(0) += 1;
285 }
286
287 counts
288 }
289
290 pub async fn clear_logs(&self) {
292 let mut logs = self.logs.write().await;
293 logs.clear();
294 }
295
296 pub async fn find_matching_requests(
302 &self,
303 pattern: &crate::verification::VerificationRequest,
304 ) -> Vec<RequestLogEntry> {
305 let logs = self.logs.read().await;
306 logs.iter()
307 .filter(|entry| crate::verification::matches_verification_pattern(entry, pattern))
308 .cloned()
309 .collect()
310 }
311
312 pub async fn count_matching_requests(
318 &self,
319 pattern: &crate::verification::VerificationRequest,
320 ) -> usize {
321 let logs = self.logs.read().await;
322 logs.iter()
323 .filter(|entry| crate::verification::matches_verification_pattern(entry, pattern))
324 .count()
325 }
326
327 pub async fn get_request_sequence(
333 &self,
334 patterns: &[crate::verification::VerificationRequest],
335 ) -> Vec<RequestLogEntry> {
336 let logs = self.logs.read().await;
337 let mut log_idx = 0;
338 let mut all_matches = Vec::new();
339
340 for pattern in patterns {
341 let mut found = false;
343 while log_idx < logs.len() {
344 if crate::verification::matches_verification_pattern(&logs[log_idx], pattern) {
345 all_matches.push(logs[log_idx].clone());
346 log_idx += 1;
347 found = true;
348 break;
349 }
350 log_idx += 1;
351 }
352
353 if !found {
354 break;
356 }
357 }
358
359 all_matches
360 }
361}
362
363static GLOBAL_LOGGER: once_cell::sync::OnceCell<CentralizedRequestLogger> =
365 once_cell::sync::OnceCell::new();
366
367pub fn init_global_logger(max_logs: usize) -> &'static CentralizedRequestLogger {
369 GLOBAL_LOGGER.get_or_init(|| CentralizedRequestLogger::new(max_logs))
370}
371
372pub fn get_global_logger() -> Option<&'static CentralizedRequestLogger> {
374 GLOBAL_LOGGER.get()
375}
376
377pub async fn log_request_global(entry: RequestLogEntry) {
379 if let Some(logger) = get_global_logger() {
380 logger.log_request(entry).await;
381 }
382}
383
384#[allow(clippy::too_many_arguments)]
386pub fn create_http_log_entry(
387 method: &str,
388 path: &str,
389 status_code: u16,
390 response_time_ms: u64,
391 client_ip: Option<String>,
392 user_agent: Option<String>,
393 headers: HashMap<String, String>,
394 response_size_bytes: u64,
395 error_message: Option<String>,
396) -> RequestLogEntry {
397 RequestLogEntry {
398 id: uuid::Uuid::new_v4().to_string(),
399 timestamp: Utc::now(),
400 server_type: "HTTP".to_string(),
401 method: method.to_string(),
402 path: path.to_string(),
403 status_code,
404 response_time_ms,
405 client_ip,
406 user_agent,
407 headers,
408 response_size_bytes,
409 error_message,
410 metadata: HashMap::new(),
411 reality_metadata: None,
412 }
413}
414
415pub fn create_websocket_log_entry(
417 event_type: &str, path: &str,
419 status_code: u16,
420 client_ip: Option<String>,
421 message_size_bytes: u64,
422 error_message: Option<String>,
423) -> RequestLogEntry {
424 let mut metadata = HashMap::new();
425 metadata.insert("event_type".to_string(), event_type.to_string());
426
427 RequestLogEntry {
428 id: uuid::Uuid::new_v4().to_string(),
429 timestamp: Utc::now(),
430 server_type: "WebSocket".to_string(),
431 method: event_type.to_uppercase(),
432 path: path.to_string(),
433 status_code,
434 response_time_ms: 0, client_ip,
436 user_agent: None,
437 headers: HashMap::new(),
438 response_size_bytes: message_size_bytes,
439 error_message,
440 metadata,
441 reality_metadata: None,
442 }
443}
444
445#[allow(clippy::too_many_arguments)]
447pub fn create_grpc_log_entry(
448 service: &str,
449 method: &str,
450 status_code: u16, response_time_ms: u64,
452 client_ip: Option<String>,
453 request_size_bytes: u64,
454 response_size_bytes: u64,
455 error_message: Option<String>,
456) -> RequestLogEntry {
457 let mut metadata = HashMap::new();
458 metadata.insert("service".to_string(), service.to_string());
459 metadata.insert("request_size_bytes".to_string(), request_size_bytes.to_string());
460
461 RequestLogEntry {
462 id: uuid::Uuid::new_v4().to_string(),
463 timestamp: Utc::now(),
464 server_type: "gRPC".to_string(),
465 method: format!("{}/{}", service, method),
466 path: format!("/{}/{}", service, method),
467 status_code,
468 response_time_ms,
469 client_ip,
470 user_agent: None,
471 headers: HashMap::new(),
472 response_size_bytes,
473 error_message,
474 metadata,
475 reality_metadata: None,
476 }
477}
478
479#[cfg(test)]
480mod tests {
481 use super::*;
482
483 fn create_test_entry(server_type: &str, method: &str) -> RequestLogEntry {
484 RequestLogEntry {
485 id: uuid::Uuid::new_v4().to_string(),
486 timestamp: Utc::now(),
487 server_type: server_type.to_string(),
488 method: method.to_string(),
489 path: "/test".to_string(),
490 status_code: 200,
491 response_time_ms: 100,
492 client_ip: Some("127.0.0.1".to_string()),
493 user_agent: Some("test-agent".to_string()),
494 headers: HashMap::new(),
495 response_size_bytes: 1024,
496 error_message: None,
497 metadata: HashMap::new(),
498 reality_metadata: None,
499 }
500 }
501
502 #[test]
503 fn test_centralized_logger_new() {
504 let logger = CentralizedRequestLogger::new(500);
505 assert_eq!(logger.max_logs, 500);
506 }
507
508 #[test]
509 fn test_centralized_logger_default() {
510 let logger = CentralizedRequestLogger::default();
511 assert_eq!(logger.max_logs, 1000);
512 }
513
514 #[tokio::test]
515 async fn test_log_request() {
516 let logger = CentralizedRequestLogger::new(10);
517 let entry = create_test_entry("HTTP", "GET");
518
519 logger.log_request(entry).await;
520
521 let logs = logger.get_recent_logs(None).await;
522 assert_eq!(logs.len(), 1);
523 assert_eq!(logs[0].method, "GET");
524 }
525
526 #[tokio::test]
527 async fn test_log_request_maintains_size_limit() {
528 let logger = CentralizedRequestLogger::new(5);
529
530 for i in 0..10 {
532 let mut entry = create_test_entry("HTTP", "GET");
533 entry.id = format!("entry-{}", i);
534 logger.log_request(entry).await;
535 }
536
537 let logs = logger.get_recent_logs(None).await;
538 assert_eq!(logs.len(), 5); }
540
541 #[tokio::test]
542 async fn test_get_recent_logs_with_limit() {
543 let logger = CentralizedRequestLogger::new(100);
544
545 for _ in 0..20 {
546 logger.log_request(create_test_entry("HTTP", "GET")).await;
547 }
548
549 let logs = logger.get_recent_logs(Some(10)).await;
550 assert_eq!(logs.len(), 10);
551 }
552
553 #[tokio::test]
554 async fn test_get_logs_by_server() {
555 let logger = CentralizedRequestLogger::new(100);
556
557 logger.log_request(create_test_entry("HTTP", "GET")).await;
558 logger.log_request(create_test_entry("HTTP", "POST")).await;
559 logger.log_request(create_test_entry("WebSocket", "CONNECT")).await;
560 logger.log_request(create_test_entry("gRPC", "Call")).await;
561
562 let http_logs = logger.get_logs_by_server("HTTP", None).await;
563 assert_eq!(http_logs.len(), 2);
564
565 let ws_logs = logger.get_logs_by_server("WebSocket", None).await;
566 assert_eq!(ws_logs.len(), 1);
567
568 let grpc_logs = logger.get_logs_by_server("gRPC", None).await;
569 assert_eq!(grpc_logs.len(), 1);
570 }
571
572 #[tokio::test]
573 async fn test_get_request_counts_by_server() {
574 let logger = CentralizedRequestLogger::new(100);
575
576 logger.log_request(create_test_entry("HTTP", "GET")).await;
577 logger.log_request(create_test_entry("HTTP", "POST")).await;
578 logger.log_request(create_test_entry("HTTP", "PUT")).await;
579 logger.log_request(create_test_entry("WebSocket", "CONNECT")).await;
580 logger.log_request(create_test_entry("gRPC", "Call")).await;
581 logger.log_request(create_test_entry("gRPC", "Stream")).await;
582
583 let counts = logger.get_request_counts_by_server().await;
584
585 assert_eq!(counts.get("HTTP"), Some(&3));
586 assert_eq!(counts.get("WebSocket"), Some(&1));
587 assert_eq!(counts.get("gRPC"), Some(&2));
588 }
589
590 #[tokio::test]
591 async fn test_clear_logs() {
592 let logger = CentralizedRequestLogger::new(100);
593
594 logger.log_request(create_test_entry("HTTP", "GET")).await;
595 logger.log_request(create_test_entry("HTTP", "POST")).await;
596
597 let logs = logger.get_recent_logs(None).await;
598 assert_eq!(logs.len(), 2);
599
600 logger.clear_logs().await;
601
602 let logs = logger.get_recent_logs(None).await;
603 assert_eq!(logs.len(), 0);
604 }
605
606 #[test]
607 fn test_create_http_log_entry() {
608 let mut headers = HashMap::new();
609 headers.insert("Content-Type".to_string(), "application/json".to_string());
610
611 let entry = create_http_log_entry(
612 "POST",
613 "/api/test",
614 201,
615 150,
616 Some("192.168.1.1".to_string()),
617 Some("Mozilla/5.0".to_string()),
618 headers.clone(),
619 2048,
620 None,
621 );
622
623 assert_eq!(entry.server_type, "HTTP");
624 assert_eq!(entry.method, "POST");
625 assert_eq!(entry.path, "/api/test");
626 assert_eq!(entry.status_code, 201);
627 assert_eq!(entry.response_time_ms, 150);
628 assert_eq!(entry.response_size_bytes, 2048);
629 assert_eq!(entry.client_ip, Some("192.168.1.1".to_string()));
630 assert_eq!(entry.user_agent, Some("Mozilla/5.0".to_string()));
631 assert_eq!(entry.headers.get("Content-Type"), Some(&"application/json".to_string()));
632 assert!(entry.error_message.is_none());
633 }
634
635 #[test]
636 fn test_create_websocket_log_entry() {
637 let entry = create_websocket_log_entry(
638 "connect",
639 "/ws/chat",
640 101,
641 Some("10.0.0.1".to_string()),
642 0,
643 None,
644 );
645
646 assert_eq!(entry.server_type, "WebSocket");
647 assert_eq!(entry.method, "CONNECT");
648 assert_eq!(entry.path, "/ws/chat");
649 assert_eq!(entry.status_code, 101);
650 assert_eq!(entry.response_time_ms, 0);
651 assert_eq!(entry.metadata.get("event_type"), Some(&"connect".to_string()));
652 }
653
654 #[test]
655 fn test_create_grpc_log_entry() {
656 let entry = create_grpc_log_entry(
657 "UserService",
658 "GetUser",
659 0, 50,
661 Some("172.16.0.1".to_string()),
662 128,
663 512,
664 None,
665 );
666
667 assert_eq!(entry.server_type, "gRPC");
668 assert_eq!(entry.method, "UserService/GetUser");
669 assert_eq!(entry.path, "/UserService/GetUser");
670 assert_eq!(entry.status_code, 0);
671 assert_eq!(entry.response_time_ms, 50);
672 assert_eq!(entry.response_size_bytes, 512);
673 assert_eq!(entry.metadata.get("service"), Some(&"UserService".to_string()));
674 assert_eq!(entry.metadata.get("request_size_bytes"), Some(&"128".to_string()));
675 }
676
677 #[test]
678 fn test_request_log_entry_with_error() {
679 let entry = create_http_log_entry(
680 "GET",
681 "/api/error",
682 500,
683 200,
684 None,
685 None,
686 HashMap::new(),
687 0,
688 Some("Internal server error".to_string()),
689 );
690
691 assert_eq!(entry.status_code, 500);
692 assert_eq!(entry.error_message, Some("Internal server error".to_string()));
693 }
694}