use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::{RwLock, Semaphore};
const MAX_PERSIST_WRITE_CONCURRENCY: usize = 32;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProxyRequestLog {
pub id: String,
pub timestamp: i64,
pub method: String,
pub url: String,
pub status: u16,
pub duration: u64,
pub model: Option<String>,
pub mapped_model: Option<String>,
pub account_email: Option<String>,
pub client_ip: Option<String>,
pub correlation_id: Option<String>,
pub request_id: Option<String>,
pub error: Option<String>,
pub request_body: Option<String>,
pub response_body: Option<String>,
pub input_tokens: Option<u32>,
pub output_tokens: Option<u32>,
pub protocol: Option<String>,
pub username: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ProxyStats {
pub total_requests: u64,
pub success_count: u64,
pub error_count: u64,
}
pub struct ProxyMonitor {
pub logs: RwLock<VecDeque<ProxyRequestLog>>,
pub stats: RwLock<ProxyStats>,
pub max_logs: usize,
pub enabled: AtomicBool,
persist_write_semaphore: Arc<Semaphore>,
dropped_persist_writes: Arc<AtomicU64>,
}
impl ProxyMonitor {
pub fn new(max_logs: usize) -> Self {
Self {
logs: RwLock::new(VecDeque::with_capacity(max_logs)),
stats: RwLock::new(ProxyStats::default()),
max_logs,
enabled: AtomicBool::new(false),
persist_write_semaphore: Arc::new(Semaphore::new(MAX_PERSIST_WRITE_CONCURRENCY)),
dropped_persist_writes: Arc::new(AtomicU64::new(0)),
}
}
pub async fn run_startup_maintenance(&self) {
let result = tokio::task::spawn_blocking(|| {
crate::modules::persistence::proxy_db::init_db()?;
crate::modules::persistence::proxy_db::cleanup_old_logs(30)
})
.await;
match result {
Ok(Ok(deleted)) => {
if deleted > 0 {
tracing::info!("Auto cleanup: removed {} old logs (>30 days)", deleted);
}
}
Ok(Err(e)) => {
tracing::error!("Monitor startup maintenance failed: {}", e);
}
Err(e) => {
tracing::error!("Monitor startup maintenance join failed: {}", e);
}
}
}
pub fn set_enabled(&self, enabled: bool) {
self.enabled.store(enabled, Ordering::Relaxed);
}
pub fn is_enabled(&self) -> bool {
self.enabled.load(Ordering::Relaxed)
}
pub fn dropped_persist_writes(&self) -> u64 {
self.dropped_persist_writes.load(Ordering::Relaxed)
}
pub async fn log_request(&self, log: ProxyRequestLog) {
if !self.is_enabled() {
return;
}
match (log.request_id.as_deref(), log.correlation_id.as_deref()) {
(Some(request_id), Some(correlation_id)) => tracing::info!(
request_id = %request_id,
correlation_id = %correlation_id,
"[Monitor] Logging request: {} {}",
log.method,
log.url
),
(Some(request_id), None) => tracing::info!(
request_id = %request_id,
"[Monitor] Logging request: {} {}",
log.method,
log.url
),
(None, Some(correlation_id)) => tracing::info!(
correlation_id = %correlation_id,
"[Monitor] Logging request: {} {}",
log.method,
log.url
),
(None, None) => tracing::info!("[Monitor] Logging request: {} {}", log.method, log.url),
};
{
let mut stats = self.stats.write().await;
stats.total_requests += 1;
if log.status >= 200 && log.status < 400 {
stats.success_count += 1;
} else {
stats.error_count += 1;
}
}
{
let mut logs = self.logs.write().await;
if logs.len() >= self.max_logs {
logs.pop_back();
}
logs.push_front(log.clone());
}
let persist_write_semaphore = self.persist_write_semaphore.clone();
let dropped_persist_writes = self.dropped_persist_writes.clone();
let permit = match persist_write_semaphore.try_acquire_owned() {
Ok(permit) => permit,
Err(_) => {
let dropped = dropped_persist_writes.fetch_add(1, Ordering::Relaxed) + 1;
if dropped == 1 || dropped % 100 == 0 {
tracing::warn!(
"Dropping monitor DB persistence write due to backpressure (dropped={})",
dropped
);
}
return;
}
};
let log_to_save = log.clone();
tokio::spawn(async move {
let _permit = permit;
if let Err(e) = crate::modules::persistence::proxy_db::save_log(&log_to_save) {
tracing::error!("Failed to save proxy log to DB: {}", e);
}
if let Some(ip) = &log_to_save.client_ip {
let security_log = crate::modules::persistence::security_db::IpAccessLog {
id: uuid::Uuid::new_v4().to_string(),
client_ip: ip.clone(),
timestamp: log_to_save.timestamp / 1000,
method: Some(log_to_save.method.clone()),
path: Some(log_to_save.url.clone()),
user_agent: None,
status: Some(log_to_save.status as i32),
duration: Some(log_to_save.duration as i64),
api_key_hash: None,
blocked: false,
block_reason: None,
username: log_to_save.username.clone(),
};
if let Err(e) =
crate::modules::persistence::security_db::save_ip_access_log(&security_log)
{
tracing::error!("Failed to save security log: {}", e);
}
}
if let (Some(account), Some(input), Some(output)) = (
&log_to_save.account_email,
log_to_save.input_tokens,
log_to_save.output_tokens,
) {
let model = log_to_save
.model
.clone()
.unwrap_or_else(|| "unknown".to_string());
if let Err(e) =
crate::modules::stats::token_stats::record_usage(account, &model, input, output)
{
tracing::debug!("Failed to record token stats: {}", e);
}
}
});
}
pub async fn get_logs(&self, limit: usize) -> Vec<ProxyRequestLog> {
let db_result = tokio::task::spawn_blocking(move || {
crate::modules::persistence::proxy_db::get_logs(limit)
})
.await;
match db_result {
Ok(Ok(logs)) => logs,
Ok(Err(e)) => {
tracing::error!("Failed to get logs from DB: {}", e);
let logs = self.logs.read().await;
logs.iter().take(limit).cloned().collect()
}
Err(e) => {
tracing::error!("Spawn blocking failed for get_logs: {}", e);
let logs = self.logs.read().await;
logs.iter().take(limit).cloned().collect()
}
}
}
pub async fn get_stats(&self) -> ProxyStats {
let db_result =
tokio::task::spawn_blocking(crate::modules::persistence::proxy_db::get_stats).await;
match db_result {
Ok(Ok(stats)) => stats,
Ok(Err(e)) => {
tracing::error!("Failed to get stats from DB: {}", e);
self.stats.read().await.clone()
}
Err(e) => {
tracing::error!("Spawn blocking failed for get_stats: {}", e);
self.stats.read().await.clone()
}
}
}
pub async fn get_logs_filtered(
&self,
page: usize,
page_size: usize,
search_text: Option<String>,
level: Option<String>,
) -> Result<Vec<ProxyRequestLog>, String> {
let offset = (page.max(1) - 1) * page_size;
let errors_only = level.as_deref() == Some("error");
let search = search_text.unwrap_or_default();
let res = tokio::task::spawn_blocking(move || {
crate::modules::persistence::proxy_db::get_logs_filtered(
&search,
errors_only,
page_size,
offset,
)
})
.await;
match res {
Ok(r) => r,
Err(e) => Err(format!("Spawn blocking failed: {}", e)),
}
}
pub async fn clear(&self) {
let mut logs = self.logs.write().await;
logs.clear();
let mut stats = self.stats.write().await;
*stats = ProxyStats::default();
let _ = tokio::task::spawn_blocking(|| {
if let Err(e) = crate::modules::persistence::proxy_db::clear_logs() {
tracing::error!("Failed to clear logs in DB: {}", e);
}
})
.await;
}
}
#[cfg(test)]
mod tests {
use super::{ProxyMonitor, ProxyRequestLog, MAX_PERSIST_WRITE_CONCURRENCY};
fn sample_log() -> ProxyRequestLog {
ProxyRequestLog {
id: "log-1".to_string(),
timestamp: chrono::Utc::now().timestamp_millis(),
method: "POST".to_string(),
url: "/v1/messages".to_string(),
status: 200,
duration: 10,
model: Some("claude-sonnet-4-5".to_string()),
mapped_model: Some("gemini-2.5-pro".to_string()),
account_email: Some("acct@example.com".to_string()),
client_ip: Some("127.0.0.1".to_string()),
correlation_id: None,
request_id: None,
error: None,
request_body: None,
response_body: None,
input_tokens: Some(10),
output_tokens: Some(20),
protocol: Some("anthropic".to_string()),
username: Some("tester".to_string()),
}
}
#[test]
fn constructor_is_runtime_safe() {
let result = std::panic::catch_unwind(|| ProxyMonitor::new(16));
assert!(
result.is_ok(),
"constructor should not require Tokio runtime"
);
}
#[tokio::test]
async fn log_request_is_noop_when_monitor_disabled() {
let monitor = ProxyMonitor::new(16);
monitor.log_request(sample_log()).await;
let stats = monitor.stats.read().await.clone();
let logs = monitor.logs.read().await;
assert_eq!(stats.total_requests, 0);
assert_eq!(stats.success_count, 0);
assert_eq!(stats.error_count, 0);
assert!(logs.is_empty(), "disabled monitor must not buffer logs");
assert_eq!(monitor.dropped_persist_writes(), 0);
}
#[tokio::test]
async fn log_request_drops_persistence_when_write_concurrency_is_saturated() {
let monitor = ProxyMonitor::new(16);
monitor.set_enabled(true);
let _all_permits = monitor
.persist_write_semaphore
.clone()
.acquire_many_owned(MAX_PERSIST_WRITE_CONCURRENCY as u32)
.await
.expect("acquire all permits");
monitor.log_request(sample_log()).await;
assert_eq!(monitor.dropped_persist_writes(), 1);
}
}