1use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use std::collections::{HashMap, VecDeque};
6use std::sync::Arc;
7use tokio::sync::RwLock;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "lowercase")]
14pub enum RealityContinuumType {
15 Synthetic,
17 Blended,
19 Live,
21}
22
23impl RealityContinuumType {
24 pub fn from_blend_ratio(ratio: f64) -> Self {
26 if ratio <= 0.0 {
27 Self::Synthetic
28 } else if ratio >= 1.0 {
29 Self::Live
30 } else {
31 Self::Blended
32 }
33 }
34
35 pub fn name(&self) -> &'static str {
37 match self {
38 RealityContinuumType::Synthetic => "Synthetic",
39 RealityContinuumType::Blended => "Blended",
40 RealityContinuumType::Live => "Live",
41 }
42 }
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct DataSourceBreakdown {
48 #[serde(default)]
50 pub recorded_percent: f64,
51 #[serde(default)]
53 pub generator_percent: f64,
54 #[serde(default)]
56 pub upstream_percent: f64,
57}
58
59impl Default for DataSourceBreakdown {
60 fn default() -> Self {
61 Self {
62 recorded_percent: 0.0,
63 generator_percent: 100.0,
64 upstream_percent: 0.0,
65 }
66 }
67}
68
69impl DataSourceBreakdown {
70 pub fn from_blend_ratio(blend_ratio: f64, recorded_ratio: f64) -> Self {
75 let upstream = blend_ratio * (1.0 - recorded_ratio);
76 let generator = (1.0 - blend_ratio) * (1.0 - recorded_ratio);
77 let recorded = recorded_ratio;
78
79 Self {
80 recorded_percent: recorded * 100.0,
81 generator_percent: generator * 100.0,
82 upstream_percent: upstream * 100.0,
83 }
84 }
85
86 pub fn normalize(&mut self) {
88 let total = self.recorded_percent + self.generator_percent + self.upstream_percent;
89 if total > 0.0 {
90 self.recorded_percent = (self.recorded_percent / total) * 100.0;
91 self.generator_percent = (self.generator_percent / total) * 100.0;
92 self.upstream_percent = (self.upstream_percent / total) * 100.0;
93 }
94 }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct RealityTraceMetadata {
103 #[serde(skip_serializing_if = "Option::is_none")]
105 pub reality_level: Option<crate::reality::RealityLevel>,
106 pub reality_continuum_type: RealityContinuumType,
108 #[serde(default)]
110 pub blend_ratio: f64,
111 pub data_source_breakdown: DataSourceBreakdown,
113 #[serde(skip_serializing_if = "Option::is_none")]
115 pub active_persona_id: Option<String>,
116 #[serde(skip_serializing_if = "Option::is_none")]
118 pub active_scenario: Option<String>,
119 #[serde(default)]
121 pub active_chaos_profiles: Vec<String>,
122 #[serde(default)]
124 pub active_latency_profiles: Vec<String>,
125}
126
127impl Default for RealityTraceMetadata {
128 fn default() -> Self {
129 Self {
130 reality_level: None,
131 reality_continuum_type: RealityContinuumType::Synthetic,
132 blend_ratio: 0.0,
133 data_source_breakdown: DataSourceBreakdown::default(),
134 active_persona_id: None,
135 active_scenario: None,
136 active_chaos_profiles: Vec::new(),
137 active_latency_profiles: Vec::new(),
138 }
139 }
140}
141
142impl RealityTraceMetadata {
143 pub fn from_unified_state(
148 unified_state: &crate::consistency::types::UnifiedState,
149 blend_ratio: f64,
150 _path: &str,
151 ) -> Self {
152 let reality_continuum_type = RealityContinuumType::from_blend_ratio(blend_ratio);
153
154 let active_chaos_profiles: Vec<String> = unified_state
156 .active_chaos_rules
157 .iter()
158 .filter_map(|r| r.get("name").and_then(|v| v.as_str()).map(|s| s.to_string()))
159 .collect();
160
161 let active_latency_profiles = Vec::new();
164
165 let mut breakdown = DataSourceBreakdown::from_blend_ratio(blend_ratio, 0.0);
168 breakdown.normalize();
169
170 Self {
171 reality_level: Some(unified_state.reality_level),
172 reality_continuum_type,
173 blend_ratio,
174 data_source_breakdown: breakdown,
175 active_persona_id: unified_state.active_persona.as_ref().map(|p| p.id.clone()),
176 active_scenario: unified_state.active_scenario.clone(),
177 active_chaos_profiles,
178 active_latency_profiles,
179 }
180 }
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize)]
185pub struct RequestLogEntry {
186 pub id: String,
188 pub timestamp: DateTime<Utc>,
190 pub server_type: String,
192 pub method: String,
194 pub path: String,
196 pub status_code: u16,
198 pub response_time_ms: u64,
200 pub client_ip: Option<String>,
202 pub user_agent: Option<String>,
204 pub headers: HashMap<String, String>,
206 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
208 pub query_params: HashMap<String, String>,
209 pub response_size_bytes: u64,
211 pub error_message: Option<String>,
213 pub metadata: HashMap<String, String>,
215 #[serde(skip_serializing_if = "Option::is_none")]
220 pub reality_metadata: Option<RealityTraceMetadata>,
221}
222
223#[derive(Debug, Clone)]
225pub struct CentralizedRequestLogger {
226 logs: Arc<RwLock<VecDeque<RequestLogEntry>>>,
228 max_logs: usize,
230}
231
232impl Default for CentralizedRequestLogger {
233 fn default() -> Self {
234 Self::new(1000) }
236}
237
238impl CentralizedRequestLogger {
239 pub fn new(max_logs: usize) -> Self {
241 Self {
242 logs: Arc::new(RwLock::new(VecDeque::new())),
243 max_logs,
244 }
245 }
246
247 pub async fn log_request(&self, entry: RequestLogEntry) {
249 let mut logs = self.logs.write().await;
250
251 logs.push_front(entry);
253
254 while logs.len() > self.max_logs {
256 logs.pop_back();
257 }
258 }
259
260 pub async fn get_recent_logs(&self, limit: Option<usize>) -> Vec<RequestLogEntry> {
262 let logs = self.logs.read().await;
263 let take_count = limit.unwrap_or(logs.len()).min(logs.len());
264 logs.iter().take(take_count).cloned().collect()
265 }
266
267 pub async fn get_logs_by_server(
269 &self,
270 server_type: &str,
271 limit: Option<usize>,
272 ) -> Vec<RequestLogEntry> {
273 let logs = self.logs.read().await;
274 logs.iter()
275 .filter(|log| log.server_type == server_type)
276 .take(limit.unwrap_or(logs.len()))
277 .cloned()
278 .collect()
279 }
280
281 pub async fn get_request_counts_by_server(&self) -> HashMap<String, u64> {
283 let logs = self.logs.read().await;
284 let mut counts = HashMap::new();
285
286 for log in logs.iter() {
287 *counts.entry(log.server_type.clone()).or_insert(0) += 1;
288 }
289
290 counts
291 }
292
293 pub async fn clear_logs(&self) {
295 let mut logs = self.logs.write().await;
296 logs.clear();
297 }
298
299 pub async fn find_matching_requests(
305 &self,
306 pattern: &crate::verification::VerificationRequest,
307 ) -> Vec<RequestLogEntry> {
308 let logs = self.logs.read().await;
309 logs.iter()
310 .filter(|entry| crate::verification::matches_verification_pattern(entry, pattern))
311 .cloned()
312 .collect()
313 }
314
315 pub async fn count_matching_requests(
321 &self,
322 pattern: &crate::verification::VerificationRequest,
323 ) -> usize {
324 let logs = self.logs.read().await;
325 logs.iter()
326 .filter(|entry| crate::verification::matches_verification_pattern(entry, pattern))
327 .count()
328 }
329
330 pub async fn get_request_sequence(
336 &self,
337 patterns: &[crate::verification::VerificationRequest],
338 ) -> Vec<RequestLogEntry> {
339 let logs = self.logs.read().await;
340 let mut log_idx = 0;
341 let mut all_matches = Vec::new();
342
343 for pattern in patterns {
344 let mut found = false;
346 while log_idx < logs.len() {
347 if crate::verification::matches_verification_pattern(&logs[log_idx], pattern) {
348 all_matches.push(logs[log_idx].clone());
349 log_idx += 1;
350 found = true;
351 break;
352 }
353 log_idx += 1;
354 }
355
356 if !found {
357 break;
359 }
360 }
361
362 all_matches
363 }
364}
365
366static GLOBAL_LOGGER: once_cell::sync::OnceCell<CentralizedRequestLogger> =
368 once_cell::sync::OnceCell::new();
369
370pub fn init_global_logger(max_logs: usize) -> &'static CentralizedRequestLogger {
372 GLOBAL_LOGGER.get_or_init(|| CentralizedRequestLogger::new(max_logs))
373}
374
375pub fn get_global_logger() -> Option<&'static CentralizedRequestLogger> {
377 GLOBAL_LOGGER.get()
378}
379
380#[derive(Debug, Clone, Serialize, Deserialize)]
384pub struct GlobalRouteInfo {
385 pub method: String,
387 pub path: String,
389 #[serde(skip_serializing_if = "Option::is_none")]
391 pub operation_id: Option<String>,
392 #[serde(skip_serializing_if = "Option::is_none")]
394 pub summary: Option<String>,
395 #[serde(skip_serializing_if = "Option::is_none")]
397 pub description: Option<String>,
398 #[serde(default)]
400 pub parameters: Vec<String>,
401}
402
403static GLOBAL_ROUTE_STORE: once_cell::sync::OnceCell<std::sync::RwLock<Vec<GlobalRouteInfo>>> =
405 once_cell::sync::OnceCell::new();
406
407fn route_store() -> &'static std::sync::RwLock<Vec<GlobalRouteInfo>> {
408 GLOBAL_ROUTE_STORE.get_or_init(|| std::sync::RwLock::new(Vec::new()))
409}
410
411pub fn set_global_routes(routes: Vec<GlobalRouteInfo>) {
413 let mut store = route_store().write().expect("route store poisoned");
414 *store = routes;
415}
416
417pub fn get_global_routes() -> Vec<GlobalRouteInfo> {
419 let store = route_store().read().expect("route store poisoned");
420 store.clone()
421}
422
423pub async fn log_request_global(entry: RequestLogEntry) {
425 if let Some(logger) = get_global_logger() {
426 logger.log_request(entry).await;
427 }
428}
429
430#[allow(clippy::too_many_arguments)]
432pub fn create_http_log_entry(
433 method: &str,
434 path: &str,
435 status_code: u16,
436 response_time_ms: u64,
437 client_ip: Option<String>,
438 user_agent: Option<String>,
439 headers: HashMap<String, String>,
440 response_size_bytes: u64,
441 error_message: Option<String>,
442) -> RequestLogEntry {
443 create_http_log_entry_with_query(
444 method,
445 path,
446 status_code,
447 response_time_ms,
448 client_ip,
449 user_agent,
450 headers,
451 HashMap::new(), response_size_bytes,
453 error_message,
454 )
455}
456
457#[allow(clippy::too_many_arguments)]
459pub fn create_http_log_entry_with_query(
460 method: &str,
461 path: &str,
462 status_code: u16,
463 response_time_ms: u64,
464 client_ip: Option<String>,
465 user_agent: Option<String>,
466 headers: HashMap<String, String>,
467 query_params: HashMap<String, String>,
468 response_size_bytes: u64,
469 error_message: Option<String>,
470) -> RequestLogEntry {
471 RequestLogEntry {
472 id: uuid::Uuid::new_v4().to_string(),
473 timestamp: Utc::now(),
474 server_type: "HTTP".to_string(),
475 method: method.to_string(),
476 path: path.to_string(),
477 status_code,
478 response_time_ms,
479 client_ip,
480 user_agent,
481 headers,
482 query_params,
483 response_size_bytes,
484 error_message,
485 metadata: HashMap::new(),
486 reality_metadata: None,
487 }
488}
489
490pub fn create_websocket_log_entry(
492 event_type: &str, path: &str,
494 status_code: u16,
495 client_ip: Option<String>,
496 message_size_bytes: u64,
497 error_message: Option<String>,
498) -> RequestLogEntry {
499 let mut metadata = HashMap::new();
500 metadata.insert("event_type".to_string(), event_type.to_string());
501
502 RequestLogEntry {
503 id: uuid::Uuid::new_v4().to_string(),
504 timestamp: Utc::now(),
505 server_type: "WebSocket".to_string(),
506 method: event_type.to_uppercase(),
507 path: path.to_string(),
508 status_code,
509 response_time_ms: 0, client_ip,
511 user_agent: None,
512 headers: HashMap::new(),
513 query_params: HashMap::new(),
514 response_size_bytes: message_size_bytes,
515 error_message,
516 metadata,
517 reality_metadata: None,
518 }
519}
520
521#[allow(clippy::too_many_arguments)]
523pub fn create_grpc_log_entry(
524 service: &str,
525 method: &str,
526 status_code: u16, response_time_ms: u64,
528 client_ip: Option<String>,
529 request_size_bytes: u64,
530 response_size_bytes: u64,
531 error_message: Option<String>,
532) -> RequestLogEntry {
533 let mut metadata = HashMap::new();
534 metadata.insert("service".to_string(), service.to_string());
535 metadata.insert("request_size_bytes".to_string(), request_size_bytes.to_string());
536
537 RequestLogEntry {
538 id: uuid::Uuid::new_v4().to_string(),
539 timestamp: Utc::now(),
540 server_type: "gRPC".to_string(),
541 method: format!("{}/{}", service, method),
542 path: format!("/{}/{}", service, method),
543 status_code,
544 response_time_ms,
545 client_ip,
546 user_agent: None,
547 headers: HashMap::new(),
548 query_params: HashMap::new(),
549 response_size_bytes,
550 error_message,
551 metadata,
552 reality_metadata: None,
553 }
554}
555
556#[cfg(test)]
557mod tests {
558 use super::*;
559
560 fn create_test_entry(server_type: &str, method: &str) -> RequestLogEntry {
561 RequestLogEntry {
562 id: uuid::Uuid::new_v4().to_string(),
563 timestamp: Utc::now(),
564 server_type: server_type.to_string(),
565 method: method.to_string(),
566 path: "/test".to_string(),
567 status_code: 200,
568 response_time_ms: 100,
569 client_ip: Some("127.0.0.1".to_string()),
570 user_agent: Some("test-agent".to_string()),
571 headers: HashMap::new(),
572 query_params: HashMap::new(),
573 response_size_bytes: 1024,
574 error_message: None,
575 metadata: HashMap::new(),
576 reality_metadata: None,
577 }
578 }
579
580 #[test]
581 fn test_centralized_logger_new() {
582 let logger = CentralizedRequestLogger::new(500);
583 assert_eq!(logger.max_logs, 500);
584 }
585
586 #[test]
587 fn test_centralized_logger_default() {
588 let logger = CentralizedRequestLogger::default();
589 assert_eq!(logger.max_logs, 1000);
590 }
591
592 #[tokio::test]
593 async fn test_log_request() {
594 let logger = CentralizedRequestLogger::new(10);
595 let entry = create_test_entry("HTTP", "GET");
596
597 logger.log_request(entry).await;
598
599 let logs = logger.get_recent_logs(None).await;
600 assert_eq!(logs.len(), 1);
601 assert_eq!(logs[0].method, "GET");
602 }
603
604 #[tokio::test]
605 async fn test_log_request_maintains_size_limit() {
606 let logger = CentralizedRequestLogger::new(5);
607
608 for i in 0..10 {
610 let mut entry = create_test_entry("HTTP", "GET");
611 entry.id = format!("entry-{}", i);
612 logger.log_request(entry).await;
613 }
614
615 let logs = logger.get_recent_logs(None).await;
616 assert_eq!(logs.len(), 5); }
618
619 #[tokio::test]
620 async fn test_get_recent_logs_with_limit() {
621 let logger = CentralizedRequestLogger::new(100);
622
623 for _ in 0..20 {
624 logger.log_request(create_test_entry("HTTP", "GET")).await;
625 }
626
627 let logs = logger.get_recent_logs(Some(10)).await;
628 assert_eq!(logs.len(), 10);
629 }
630
631 #[tokio::test]
632 async fn test_get_logs_by_server() {
633 let logger = CentralizedRequestLogger::new(100);
634
635 logger.log_request(create_test_entry("HTTP", "GET")).await;
636 logger.log_request(create_test_entry("HTTP", "POST")).await;
637 logger.log_request(create_test_entry("WebSocket", "CONNECT")).await;
638 logger.log_request(create_test_entry("gRPC", "Call")).await;
639
640 let http_logs = logger.get_logs_by_server("HTTP", None).await;
641 assert_eq!(http_logs.len(), 2);
642
643 let ws_logs = logger.get_logs_by_server("WebSocket", None).await;
644 assert_eq!(ws_logs.len(), 1);
645
646 let grpc_logs = logger.get_logs_by_server("gRPC", None).await;
647 assert_eq!(grpc_logs.len(), 1);
648 }
649
650 #[tokio::test]
651 async fn test_get_request_counts_by_server() {
652 let logger = CentralizedRequestLogger::new(100);
653
654 logger.log_request(create_test_entry("HTTP", "GET")).await;
655 logger.log_request(create_test_entry("HTTP", "POST")).await;
656 logger.log_request(create_test_entry("HTTP", "PUT")).await;
657 logger.log_request(create_test_entry("WebSocket", "CONNECT")).await;
658 logger.log_request(create_test_entry("gRPC", "Call")).await;
659 logger.log_request(create_test_entry("gRPC", "Stream")).await;
660
661 let counts = logger.get_request_counts_by_server().await;
662
663 assert_eq!(counts.get("HTTP"), Some(&3));
664 assert_eq!(counts.get("WebSocket"), Some(&1));
665 assert_eq!(counts.get("gRPC"), Some(&2));
666 }
667
668 #[tokio::test]
669 async fn test_clear_logs() {
670 let logger = CentralizedRequestLogger::new(100);
671
672 logger.log_request(create_test_entry("HTTP", "GET")).await;
673 logger.log_request(create_test_entry("HTTP", "POST")).await;
674
675 let logs = logger.get_recent_logs(None).await;
676 assert_eq!(logs.len(), 2);
677
678 logger.clear_logs().await;
679
680 let logs = logger.get_recent_logs(None).await;
681 assert_eq!(logs.len(), 0);
682 }
683
684 #[test]
685 fn test_create_http_log_entry() {
686 let mut headers = HashMap::new();
687 headers.insert("Content-Type".to_string(), "application/json".to_string());
688
689 let entry = create_http_log_entry(
690 "POST",
691 "/api/test",
692 201,
693 150,
694 Some("192.168.1.1".to_string()),
695 Some("Mozilla/5.0".to_string()),
696 headers.clone(),
697 2048,
698 None,
699 );
700
701 assert_eq!(entry.server_type, "HTTP");
702 assert_eq!(entry.method, "POST");
703 assert_eq!(entry.path, "/api/test");
704 assert_eq!(entry.status_code, 201);
705 assert_eq!(entry.response_time_ms, 150);
706 assert_eq!(entry.response_size_bytes, 2048);
707 assert_eq!(entry.client_ip, Some("192.168.1.1".to_string()));
708 assert_eq!(entry.user_agent, Some("Mozilla/5.0".to_string()));
709 assert_eq!(entry.headers.get("Content-Type"), Some(&"application/json".to_string()));
710 assert!(entry.error_message.is_none());
711 }
712
713 #[test]
714 fn test_create_websocket_log_entry() {
715 let entry = create_websocket_log_entry(
716 "connect",
717 "/ws/chat",
718 101,
719 Some("10.0.0.1".to_string()),
720 0,
721 None,
722 );
723
724 assert_eq!(entry.server_type, "WebSocket");
725 assert_eq!(entry.method, "CONNECT");
726 assert_eq!(entry.path, "/ws/chat");
727 assert_eq!(entry.status_code, 101);
728 assert_eq!(entry.response_time_ms, 0);
729 assert_eq!(entry.metadata.get("event_type"), Some(&"connect".to_string()));
730 }
731
732 #[test]
733 fn test_create_grpc_log_entry() {
734 let entry = create_grpc_log_entry(
735 "UserService",
736 "GetUser",
737 0, 50,
739 Some("172.16.0.1".to_string()),
740 128,
741 512,
742 None,
743 );
744
745 assert_eq!(entry.server_type, "gRPC");
746 assert_eq!(entry.method, "UserService/GetUser");
747 assert_eq!(entry.path, "/UserService/GetUser");
748 assert_eq!(entry.status_code, 0);
749 assert_eq!(entry.response_time_ms, 50);
750 assert_eq!(entry.response_size_bytes, 512);
751 assert_eq!(entry.metadata.get("service"), Some(&"UserService".to_string()));
752 assert_eq!(entry.metadata.get("request_size_bytes"), Some(&"128".to_string()));
753 }
754
755 #[test]
756 fn test_request_log_entry_with_error() {
757 let entry = create_http_log_entry(
758 "GET",
759 "/api/error",
760 500,
761 200,
762 None,
763 None,
764 HashMap::new(),
765 0,
766 Some("Internal server error".to_string()),
767 );
768
769 assert_eq!(entry.status_code, 500);
770 assert_eq!(entry.error_message, Some("Internal server error".to_string()));
771 }
772}