use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum RealityContinuumType {
Synthetic,
Blended,
Live,
}
impl RealityContinuumType {
pub fn from_blend_ratio(ratio: f64) -> Self {
if ratio <= 0.0 {
Self::Synthetic
} else if ratio >= 1.0 {
Self::Live
} else {
Self::Blended
}
}
pub fn name(&self) -> &'static str {
match self {
RealityContinuumType::Synthetic => "Synthetic",
RealityContinuumType::Blended => "Blended",
RealityContinuumType::Live => "Live",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DataSourceBreakdown {
#[serde(default)]
pub recorded_percent: f64,
#[serde(default)]
pub generator_percent: f64,
#[serde(default)]
pub upstream_percent: f64,
}
impl Default for DataSourceBreakdown {
fn default() -> Self {
Self {
recorded_percent: 0.0,
generator_percent: 100.0,
upstream_percent: 0.0,
}
}
}
impl DataSourceBreakdown {
pub fn from_blend_ratio(blend_ratio: f64, recorded_ratio: f64) -> Self {
let upstream = blend_ratio * (1.0 - recorded_ratio);
let generator = (1.0 - blend_ratio) * (1.0 - recorded_ratio);
let recorded = recorded_ratio;
Self {
recorded_percent: recorded * 100.0,
generator_percent: generator * 100.0,
upstream_percent: upstream * 100.0,
}
}
pub fn normalize(&mut self) {
let total = self.recorded_percent + self.generator_percent + self.upstream_percent;
if total > 0.0 {
self.recorded_percent = (self.recorded_percent / total) * 100.0;
self.generator_percent = (self.generator_percent / total) * 100.0;
self.upstream_percent = (self.upstream_percent / total) * 100.0;
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RealityTraceMetadata {
#[serde(skip_serializing_if = "Option::is_none")]
pub reality_level: Option<crate::reality::RealityLevel>,
pub reality_continuum_type: RealityContinuumType,
#[serde(default)]
pub blend_ratio: f64,
pub data_source_breakdown: DataSourceBreakdown,
#[serde(skip_serializing_if = "Option::is_none")]
pub active_persona_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub active_scenario: Option<String>,
#[serde(default)]
pub active_chaos_profiles: Vec<String>,
#[serde(default)]
pub active_latency_profiles: Vec<String>,
}
impl Default for RealityTraceMetadata {
fn default() -> Self {
Self {
reality_level: None,
reality_continuum_type: RealityContinuumType::Synthetic,
blend_ratio: 0.0,
data_source_breakdown: DataSourceBreakdown::default(),
active_persona_id: None,
active_scenario: None,
active_chaos_profiles: Vec::new(),
active_latency_profiles: Vec::new(),
}
}
}
impl RealityTraceMetadata {
pub fn from_unified_state(
unified_state: &crate::consistency::types::UnifiedState,
blend_ratio: f64,
_path: &str,
) -> Self {
let reality_continuum_type = RealityContinuumType::from_blend_ratio(blend_ratio);
let active_chaos_profiles: Vec<String> = unified_state
.active_chaos_rules
.iter()
.filter_map(|r| r.get("name").and_then(|v| v.as_str()).map(|s| s.to_string()))
.collect();
let active_latency_profiles = Vec::new();
let mut breakdown = DataSourceBreakdown::from_blend_ratio(blend_ratio, 0.0);
breakdown.normalize();
Self {
reality_level: Some(unified_state.reality_level),
reality_continuum_type,
blend_ratio,
data_source_breakdown: breakdown,
active_persona_id: unified_state.active_persona.as_ref().map(|p| p.id.clone()),
active_scenario: unified_state.active_scenario.clone(),
active_chaos_profiles,
active_latency_profiles,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RequestLogEntry {
pub id: String,
pub timestamp: DateTime<Utc>,
pub server_type: String,
pub method: String,
pub path: String,
pub status_code: u16,
pub response_time_ms: u64,
pub client_ip: Option<String>,
pub user_agent: Option<String>,
pub headers: HashMap<String, String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub query_params: HashMap<String, String>,
pub response_size_bytes: u64,
pub error_message: Option<String>,
pub metadata: HashMap<String, String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reality_metadata: Option<RealityTraceMetadata>,
}
#[derive(Debug, Clone)]
pub struct CentralizedRequestLogger {
logs: Arc<RwLock<VecDeque<RequestLogEntry>>>,
max_logs: usize,
}
impl Default for CentralizedRequestLogger {
fn default() -> Self {
Self::new(1000) }
}
impl CentralizedRequestLogger {
pub fn new(max_logs: usize) -> Self {
Self {
logs: Arc::new(RwLock::new(VecDeque::new())),
max_logs,
}
}
pub async fn log_request(&self, entry: RequestLogEntry) {
let mut logs = self.logs.write().await;
logs.push_front(entry);
while logs.len() > self.max_logs {
logs.pop_back();
}
}
pub async fn get_recent_logs(&self, limit: Option<usize>) -> Vec<RequestLogEntry> {
let logs = self.logs.read().await;
let take_count = limit.unwrap_or(logs.len()).min(logs.len());
logs.iter().take(take_count).cloned().collect()
}
pub async fn get_logs_by_server(
&self,
server_type: &str,
limit: Option<usize>,
) -> Vec<RequestLogEntry> {
let logs = self.logs.read().await;
logs.iter()
.filter(|log| log.server_type == server_type)
.take(limit.unwrap_or(logs.len()))
.cloned()
.collect()
}
pub async fn get_request_counts_by_server(&self) -> HashMap<String, u64> {
let logs = self.logs.read().await;
let mut counts = HashMap::new();
for log in logs.iter() {
*counts.entry(log.server_type.clone()).or_insert(0) += 1;
}
counts
}
pub async fn clear_logs(&self) {
let mut logs = self.logs.write().await;
logs.clear();
}
pub async fn find_matching_requests(
&self,
pattern: &crate::verification::VerificationRequest,
) -> Vec<RequestLogEntry> {
let logs = self.logs.read().await;
logs.iter()
.filter(|entry| crate::verification::matches_verification_pattern(entry, pattern))
.cloned()
.collect()
}
pub async fn count_matching_requests(
&self,
pattern: &crate::verification::VerificationRequest,
) -> usize {
let logs = self.logs.read().await;
logs.iter()
.filter(|entry| crate::verification::matches_verification_pattern(entry, pattern))
.count()
}
pub async fn get_request_sequence(
&self,
patterns: &[crate::verification::VerificationRequest],
) -> Vec<RequestLogEntry> {
let logs = self.logs.read().await;
let mut log_idx = 0;
let mut all_matches = Vec::new();
for pattern in patterns {
let mut found = false;
while log_idx < logs.len() {
if crate::verification::matches_verification_pattern(&logs[log_idx], pattern) {
all_matches.push(logs[log_idx].clone());
log_idx += 1;
found = true;
break;
}
log_idx += 1;
}
if !found {
break;
}
}
all_matches
}
}
static GLOBAL_LOGGER: once_cell::sync::OnceCell<CentralizedRequestLogger> =
once_cell::sync::OnceCell::new();
pub fn init_global_logger(max_logs: usize) -> &'static CentralizedRequestLogger {
GLOBAL_LOGGER.get_or_init(|| CentralizedRequestLogger::new(max_logs))
}
pub fn get_global_logger() -> Option<&'static CentralizedRequestLogger> {
GLOBAL_LOGGER.get()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GlobalRouteInfo {
pub method: String,
pub path: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub operation_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub summary: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(default)]
pub parameters: Vec<String>,
}
static GLOBAL_ROUTE_STORE: once_cell::sync::OnceCell<std::sync::RwLock<Vec<GlobalRouteInfo>>> =
once_cell::sync::OnceCell::new();
fn route_store() -> &'static std::sync::RwLock<Vec<GlobalRouteInfo>> {
GLOBAL_ROUTE_STORE.get_or_init(|| std::sync::RwLock::new(Vec::new()))
}
pub fn set_global_routes(routes: Vec<GlobalRouteInfo>) {
let mut store = route_store().write().expect("route store poisoned");
*store = routes;
}
pub fn get_global_routes() -> Vec<GlobalRouteInfo> {
let store = route_store().read().expect("route store poisoned");
store.clone()
}
pub async fn log_request_global(entry: RequestLogEntry) {
if let Some(logger) = get_global_logger() {
logger.log_request(entry).await;
}
}
#[allow(clippy::too_many_arguments)]
pub fn create_http_log_entry(
method: &str,
path: &str,
status_code: u16,
response_time_ms: u64,
client_ip: Option<String>,
user_agent: Option<String>,
headers: HashMap<String, String>,
response_size_bytes: u64,
error_message: Option<String>,
) -> RequestLogEntry {
create_http_log_entry_with_query(
method,
path,
status_code,
response_time_ms,
client_ip,
user_agent,
headers,
HashMap::new(), response_size_bytes,
error_message,
)
}
#[allow(clippy::too_many_arguments)]
pub fn create_http_log_entry_with_query(
method: &str,
path: &str,
status_code: u16,
response_time_ms: u64,
client_ip: Option<String>,
user_agent: Option<String>,
headers: HashMap<String, String>,
query_params: HashMap<String, String>,
response_size_bytes: u64,
error_message: Option<String>,
) -> RequestLogEntry {
RequestLogEntry {
id: uuid::Uuid::new_v4().to_string(),
timestamp: Utc::now(),
server_type: "HTTP".to_string(),
method: method.to_string(),
path: path.to_string(),
status_code,
response_time_ms,
client_ip,
user_agent,
headers,
query_params,
response_size_bytes,
error_message,
metadata: HashMap::new(),
reality_metadata: None,
}
}
pub fn create_websocket_log_entry(
event_type: &str, path: &str,
status_code: u16,
client_ip: Option<String>,
message_size_bytes: u64,
error_message: Option<String>,
) -> RequestLogEntry {
let mut metadata = HashMap::new();
metadata.insert("event_type".to_string(), event_type.to_string());
RequestLogEntry {
id: uuid::Uuid::new_v4().to_string(),
timestamp: Utc::now(),
server_type: "WebSocket".to_string(),
method: event_type.to_uppercase(),
path: path.to_string(),
status_code,
response_time_ms: 0, client_ip,
user_agent: None,
headers: HashMap::new(),
query_params: HashMap::new(),
response_size_bytes: message_size_bytes,
error_message,
metadata,
reality_metadata: None,
}
}
#[allow(clippy::too_many_arguments)]
pub fn create_grpc_log_entry(
service: &str,
method: &str,
status_code: u16, response_time_ms: u64,
client_ip: Option<String>,
request_size_bytes: u64,
response_size_bytes: u64,
error_message: Option<String>,
) -> RequestLogEntry {
let mut metadata = HashMap::new();
metadata.insert("service".to_string(), service.to_string());
metadata.insert("request_size_bytes".to_string(), request_size_bytes.to_string());
RequestLogEntry {
id: uuid::Uuid::new_v4().to_string(),
timestamp: Utc::now(),
server_type: "gRPC".to_string(),
method: format!("{}/{}", service, method),
path: format!("/{}/{}", service, method),
status_code,
response_time_ms,
client_ip,
user_agent: None,
headers: HashMap::new(),
query_params: HashMap::new(),
response_size_bytes,
error_message,
metadata,
reality_metadata: None,
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_entry(server_type: &str, method: &str) -> RequestLogEntry {
RequestLogEntry {
id: uuid::Uuid::new_v4().to_string(),
timestamp: Utc::now(),
server_type: server_type.to_string(),
method: method.to_string(),
path: "/test".to_string(),
status_code: 200,
response_time_ms: 100,
client_ip: Some("127.0.0.1".to_string()),
user_agent: Some("test-agent".to_string()),
headers: HashMap::new(),
query_params: HashMap::new(),
response_size_bytes: 1024,
error_message: None,
metadata: HashMap::new(),
reality_metadata: None,
}
}
#[test]
fn test_centralized_logger_new() {
let logger = CentralizedRequestLogger::new(500);
assert_eq!(logger.max_logs, 500);
}
#[test]
fn test_centralized_logger_default() {
let logger = CentralizedRequestLogger::default();
assert_eq!(logger.max_logs, 1000);
}
#[tokio::test]
async fn test_log_request() {
let logger = CentralizedRequestLogger::new(10);
let entry = create_test_entry("HTTP", "GET");
logger.log_request(entry).await;
let logs = logger.get_recent_logs(None).await;
assert_eq!(logs.len(), 1);
assert_eq!(logs[0].method, "GET");
}
#[tokio::test]
async fn test_log_request_maintains_size_limit() {
let logger = CentralizedRequestLogger::new(5);
for i in 0..10 {
let mut entry = create_test_entry("HTTP", "GET");
entry.id = format!("entry-{}", i);
logger.log_request(entry).await;
}
let logs = logger.get_recent_logs(None).await;
assert_eq!(logs.len(), 5); }
#[tokio::test]
async fn test_get_recent_logs_with_limit() {
let logger = CentralizedRequestLogger::new(100);
for _ in 0..20 {
logger.log_request(create_test_entry("HTTP", "GET")).await;
}
let logs = logger.get_recent_logs(Some(10)).await;
assert_eq!(logs.len(), 10);
}
#[tokio::test]
async fn test_get_logs_by_server() {
let logger = CentralizedRequestLogger::new(100);
logger.log_request(create_test_entry("HTTP", "GET")).await;
logger.log_request(create_test_entry("HTTP", "POST")).await;
logger.log_request(create_test_entry("WebSocket", "CONNECT")).await;
logger.log_request(create_test_entry("gRPC", "Call")).await;
let http_logs = logger.get_logs_by_server("HTTP", None).await;
assert_eq!(http_logs.len(), 2);
let ws_logs = logger.get_logs_by_server("WebSocket", None).await;
assert_eq!(ws_logs.len(), 1);
let grpc_logs = logger.get_logs_by_server("gRPC", None).await;
assert_eq!(grpc_logs.len(), 1);
}
#[tokio::test]
async fn test_get_request_counts_by_server() {
let logger = CentralizedRequestLogger::new(100);
logger.log_request(create_test_entry("HTTP", "GET")).await;
logger.log_request(create_test_entry("HTTP", "POST")).await;
logger.log_request(create_test_entry("HTTP", "PUT")).await;
logger.log_request(create_test_entry("WebSocket", "CONNECT")).await;
logger.log_request(create_test_entry("gRPC", "Call")).await;
logger.log_request(create_test_entry("gRPC", "Stream")).await;
let counts = logger.get_request_counts_by_server().await;
assert_eq!(counts.get("HTTP"), Some(&3));
assert_eq!(counts.get("WebSocket"), Some(&1));
assert_eq!(counts.get("gRPC"), Some(&2));
}
#[tokio::test]
async fn test_clear_logs() {
let logger = CentralizedRequestLogger::new(100);
logger.log_request(create_test_entry("HTTP", "GET")).await;
logger.log_request(create_test_entry("HTTP", "POST")).await;
let logs = logger.get_recent_logs(None).await;
assert_eq!(logs.len(), 2);
logger.clear_logs().await;
let logs = logger.get_recent_logs(None).await;
assert_eq!(logs.len(), 0);
}
#[test]
fn test_create_http_log_entry() {
let mut headers = HashMap::new();
headers.insert("Content-Type".to_string(), "application/json".to_string());
let entry = create_http_log_entry(
"POST",
"/api/test",
201,
150,
Some("192.168.1.1".to_string()),
Some("Mozilla/5.0".to_string()),
headers.clone(),
2048,
None,
);
assert_eq!(entry.server_type, "HTTP");
assert_eq!(entry.method, "POST");
assert_eq!(entry.path, "/api/test");
assert_eq!(entry.status_code, 201);
assert_eq!(entry.response_time_ms, 150);
assert_eq!(entry.response_size_bytes, 2048);
assert_eq!(entry.client_ip, Some("192.168.1.1".to_string()));
assert_eq!(entry.user_agent, Some("Mozilla/5.0".to_string()));
assert_eq!(entry.headers.get("Content-Type"), Some(&"application/json".to_string()));
assert!(entry.error_message.is_none());
}
#[test]
fn test_create_websocket_log_entry() {
let entry = create_websocket_log_entry(
"connect",
"/ws/chat",
101,
Some("10.0.0.1".to_string()),
0,
None,
);
assert_eq!(entry.server_type, "WebSocket");
assert_eq!(entry.method, "CONNECT");
assert_eq!(entry.path, "/ws/chat");
assert_eq!(entry.status_code, 101);
assert_eq!(entry.response_time_ms, 0);
assert_eq!(entry.metadata.get("event_type"), Some(&"connect".to_string()));
}
#[test]
fn test_create_grpc_log_entry() {
let entry = create_grpc_log_entry(
"UserService",
"GetUser",
0, 50,
Some("172.16.0.1".to_string()),
128,
512,
None,
);
assert_eq!(entry.server_type, "gRPC");
assert_eq!(entry.method, "UserService/GetUser");
assert_eq!(entry.path, "/UserService/GetUser");
assert_eq!(entry.status_code, 0);
assert_eq!(entry.response_time_ms, 50);
assert_eq!(entry.response_size_bytes, 512);
assert_eq!(entry.metadata.get("service"), Some(&"UserService".to_string()));
assert_eq!(entry.metadata.get("request_size_bytes"), Some(&"128".to_string()));
}
#[test]
fn test_request_log_entry_with_error() {
let entry = create_http_log_entry(
"GET",
"/api/error",
500,
200,
None,
None,
HashMap::new(),
0,
Some("Internal server error".to_string()),
);
assert_eq!(entry.status_code, 500);
assert_eq!(entry.error_message, Some("Internal server error".to_string()));
}
}