use rmcp::model::LoggingLevel;
use serde_json::{Value, json};
use std::{sync::Arc, time::Duration};
use tokio::{sync::Mutex, sync::RwLock, time::Instant};
const LOG_LEVEL_MAP: &[(LoggingLevel, u8)] = &[
(LoggingLevel::Emergency, 0),
(LoggingLevel::Alert, 1),
(LoggingLevel::Critical, 2),
(LoggingLevel::Error, 3),
(LoggingLevel::Warning, 4),
(LoggingLevel::Notice, 5),
(LoggingLevel::Info, 6),
(LoggingLevel::Debug, 7),
];
const DEFAULT_MAX_LOGS_PER_WINDOW: u32 = 50;
const DEFAULT_LOG_WINDOW: Duration = Duration::from_secs(1);
const MAX_TEXT_FIELD_CHARS: usize = 2048;
fn level_to_value(level: LoggingLevel) -> u8 {
LOG_LEVEL_MAP
.iter()
.find(|(l, _)| *l == level)
.map(|(_, v)| *v)
.unwrap_or(6) }
pub fn should_log(level: LoggingLevel, min_level: LoggingLevel) -> bool {
let numeric_level = level_to_value(level);
let min_numeric_level = level_to_value(min_level);
numeric_level <= min_numeric_level
}
#[derive(Debug)]
pub(crate) struct LogRateLimiter {
window_started_at: Instant,
emitted_in_window: u32,
max_per_window: u32,
window: Duration,
}
impl Default for LogRateLimiter {
fn default() -> Self {
Self::new(DEFAULT_MAX_LOGS_PER_WINDOW, DEFAULT_LOG_WINDOW)
}
}
impl LogRateLimiter {
pub(crate) fn new(max_per_window: u32, window: Duration) -> Self {
Self {
window_started_at: Instant::now(),
emitted_in_window: 0,
max_per_window,
window,
}
}
pub(crate) fn should_emit(&mut self) -> bool {
if self.window_started_at.elapsed() >= self.window {
self.window_started_at = Instant::now();
self.emitted_in_window = 0;
}
if self.emitted_in_window < self.max_per_window {
self.emitted_in_window += 1;
true
} else {
false
}
}
}
fn truncate_text(text: &str) -> String {
let char_count = text.chars().count();
if char_count <= MAX_TEXT_FIELD_CHARS {
text.to_string()
} else {
let truncated: String = text.chars().take(MAX_TEXT_FIELD_CHARS).collect();
format!("{}...[truncated]", truncated)
}
}
#[derive(Debug, Clone)]
pub struct LogData {
pub message: String,
pub fields: Vec<(String, Value)>,
}
impl LogData {
pub fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
fields: Vec::new(),
}
}
pub fn with_field(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
self.fields.push((key.into(), value.into()));
self
}
pub fn to_json(&self) -> Value {
let mut map = serde_json::Map::new();
map.insert("message".to_string(), json!(truncate_text(&self.message)));
for (key, value) in &self.fields {
map.insert(key.clone(), value.clone());
}
json!(map)
}
}
pub async fn send_log(
level: LoggingLevel,
logger: Option<String>,
data: LogData,
min_level: Arc<RwLock<Option<LoggingLevel>>>,
rate_limiter: Arc<Mutex<LogRateLimiter>>,
peer: &rmcp::service::Peer<rmcp::RoleServer>,
) {
tracing::debug!(
"Preparing to send MCP log: level={:?}, logger={:?}, data={:?}",
level,
logger,
data
);
let current_min = min_level.read().await;
if let Some(min) = *current_min {
if !should_log(level, min) {
tracing::debug!("Log level {:?} is below minimum {:?}, not sending log", level, min);
return;
}
} else {
tracing::debug!("No minimum log level set, not sending MCP log");
return;
}
drop(current_min);
{
let mut limiter = rate_limiter.lock().await;
if !limiter.should_emit() {
tracing::debug!(level = ?level, "MCP log rate limit reached, dropping notification");
return;
}
}
if let Err(e) = peer
.notify_logging_message(rmcp::model::LoggingMessageNotificationParam {
level,
logger,
data: data.to_json(),
})
.await
{
tracing::error!(
error = ?e,
level = ?level,
"Failed to send MCP log notification"
);
}
}