use crate::error::{McpError, McpResult};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::path::Path;
use tracing::debug;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscribeRecordParams {
pub socket_path: String,
pub record_name: String,
pub max_samples: Option<usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UnsubscribeRecordParams {
pub subscription_id: String,
}
fn sanitize_record_name(name: &str) -> String {
name.chars()
.map(|c| match c {
'a'..='z' | 'A'..='Z' | '0'..='9' | '_' | '-' => c,
':' => '_', _ => '_',
})
.collect()
}
fn get_notification_file_path(notification_dir: &Path, record_name: &str) -> String {
let date = chrono::Utc::now().format("%Y-%m-%d").to_string();
let sanitized_record = sanitize_record_name(record_name);
let filename = format!("{}__{}.jsonl", date, sanitized_record);
notification_dir.join(filename).display().to_string()
}
pub async fn subscribe_record(args: Option<Value>) -> McpResult<Value> {
let params: SubscribeRecordParams = serde_json::from_value(
args.ok_or_else(|| McpError::InvalidParams("Missing arguments".to_string()))?,
)
.map_err(|e| McpError::InvalidParams(format!("Invalid parameters: {}", e)))?;
debug!(
"🔔 Subscribing to record: {} at {}",
params.record_name, params.socket_path
);
let manager = crate::tools::subscription_manager()
.ok_or_else(|| McpError::Internal("Subscription manager not initialized".to_string()))?;
let subscription_id = manager
.subscribe(
std::path::PathBuf::from(¶ms.socket_path),
params.record_name.clone(),
10, params.max_samples,
)
.await?;
let info = manager
.get_subscription(&subscription_id)
.await
.ok_or_else(|| McpError::Internal("Subscription vanished after creation".to_string()))?;
let notification_file = crate::tools::notification_dir()
.map(|dir| get_notification_file_path(dir, ¶ms.record_name));
let mut result = json!({
"subscription_id": info.subscription_id,
"socket_path": info.socket_path.display().to_string(),
"record_name": info.record_name,
"aimx_subscription_id": info.aimx_subscription_id,
"created_at": info.created_at
});
if let Some(file_path) = notification_file {
result
.as_object_mut()
.unwrap()
.insert("notification_file".to_string(), json!(file_path));
}
Ok(result)
}
pub async fn unsubscribe_record(args: Option<Value>) -> McpResult<Value> {
let params: UnsubscribeRecordParams = serde_json::from_value(
args.ok_or_else(|| McpError::InvalidParams("Missing arguments".to_string()))?,
)
.map_err(|e| McpError::InvalidParams(format!("Invalid parameters: {}", e)))?;
debug!("🔕 Unsubscribing: {}", params.subscription_id);
let manager = crate::tools::subscription_manager()
.ok_or_else(|| McpError::Internal("Subscription manager not initialized".to_string()))?;
manager.unsubscribe(¶ms.subscription_id).await?;
Ok(json!({
"success": true,
"subscription_id": params.subscription_id
}))
}
pub async fn list_subscriptions(_args: Option<Value>) -> McpResult<Value> {
debug!("📋 Listing active subscriptions");
let manager = crate::tools::subscription_manager()
.ok_or_else(|| McpError::Internal("Subscription manager not initialized".to_string()))?;
let subscriptions = manager.list_subscriptions().await;
let notification_dir = crate::tools::notification_dir();
let subs_json: Vec<Value> = subscriptions
.iter()
.map(|info| {
let mut sub_json = json!({
"subscription_id": info.subscription_id,
"socket_path": info.socket_path.display().to_string(),
"record_name": info.record_name,
"aimx_subscription_id": info.aimx_subscription_id,
"created_at": info.created_at
});
if let Some(dir) = notification_dir {
let file_path = get_notification_file_path(dir, &info.record_name);
sub_json
.as_object_mut()
.unwrap()
.insert("notification_file".to_string(), json!(file_path));
}
sub_json
})
.collect();
Ok(json!({
"count": subs_json.len(),
"subscriptions": subs_json
}))
}
pub async fn get_notification_directory(_args: Option<Value>) -> McpResult<Value> {
debug!("📁 Getting notification directory");
let notification_dir = crate::tools::notification_dir()
.ok_or_else(|| McpError::Internal("Notification directory not initialized".to_string()))?;
Ok(json!({
"notification_directory": notification_dir.display().to_string(),
"enabled": true,
"file_pattern": "{date}__{record_name}.jsonl",
"example": format!("{}/2025-11-04__server__Temperature.jsonl", notification_dir.display()),
"tip": "Use list_dir to see available files, then read_file to access notification data"
}))
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_subscribe_record_missing_args() {
let result = subscribe_record(None).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), McpError::InvalidParams(_)));
}
#[tokio::test]
async fn test_unsubscribe_record_missing_args() {
let result = unsubscribe_record(None).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), McpError::InvalidParams(_)));
}
}