#![cfg(feature = "mcp")]
use crate::api::client::GdeltClient;
use crate::api::doc::DocApi;
use crate::api::geo::GeoApi;
use crate::api::tv::TvApi;
use crate::db::AnalyticsDb;
use crate::error::{GdeltError, Result};
use serde_json::Value;
use std::io::{BufRead, Write};
use tracing::{debug, error, info};
use super::tools::*;
pub async fn start_mcp_server(transport: Transport, host: &str, port: u16) -> Result<()> {
info!("Starting MCP server with {:?} transport", transport);
match transport {
Transport::Stdio => run_stdio_server().await,
Transport::Sse | Transport::Http => {
Err(GdeltError::Other(format!(
"HTTP/SSE transport not yet implemented. Use stdio transport."
)))
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum Transport {
Stdio,
Sse,
Http,
}
async fn run_stdio_server() -> Result<()> {
let stdin = std::io::stdin();
let mut stdout = std::io::stdout();
info!("MCP server listening on stdio");
for line in stdin.lock().lines() {
let line = line.map_err(|e| GdeltError::Io(e))?;
if line.is_empty() {
continue;
}
debug!("Received: {}", line);
let request: Value = match serde_json::from_str(&line) {
Ok(v) => v,
Err(e) => {
let error_response = json_rpc_error(None, -32700, &format!("Parse error: {}", e));
writeln!(stdout, "{}", serde_json::to_string(&error_response)?)?;
stdout.flush()?;
continue;
}
};
let response = handle_request(request).await;
let response_str = serde_json::to_string(&response)?;
debug!("Sending: {}", response_str);
writeln!(stdout, "{}", response_str)?;
stdout.flush()?;
}
Ok(())
}
async fn handle_request(request: Value) -> Value {
let id = request.get("id").cloned();
let method = request.get("method").and_then(|m| m.as_str());
match method {
Some("initialize") => handle_initialize(id),
Some("tools/list") => handle_tools_list(id),
Some("tools/call") => handle_tool_call(id, request.get("params")).await,
Some("notifications/initialized") => {
Value::Null
}
Some(m) => json_rpc_error(id, -32601, &format!("Method not found: {}", m)),
None => json_rpc_error(id, -32600, "Invalid request: missing method"),
}
}
fn handle_initialize(id: Option<Value>) -> Value {
serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "gdelt",
"version": env!("CARGO_PKG_VERSION")
}
}
})
}
fn handle_tools_list(id: Option<Value>) -> Value {
let tools = list_tools();
let tool_list: Vec<Value> = tools
.iter()
.map(|t| {
serde_json::json!({
"name": t.name,
"description": t.description,
"inputSchema": t.input_schema
})
})
.collect();
serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"result": {
"tools": tool_list
}
})
}
async fn handle_tool_call(id: Option<Value>, params: Option<&Value>) -> Value {
let params = match params {
Some(p) => p,
None => return json_rpc_error(id, -32602, "Missing params"),
};
let tool_name = params.get("name").and_then(|n| n.as_str());
let arguments = params.get("arguments");
match tool_name {
Some("gdelt_doc_search") => call_doc_search(id, arguments).await,
Some("gdelt_events_query") => call_events_query(id, arguments).await,
Some("gdelt_gkg_query") => call_gkg_query(id, arguments).await,
Some("gdelt_analytics_trends") => call_analytics_trends(id, arguments).await,
Some("gdelt_analytics_sentiment") => call_analytics_sentiment(id, arguments).await,
Some("gdelt_geo_search") => call_geo_search(id, arguments).await,
Some("gdelt_tv_search") => call_tv_search(id, arguments).await,
Some(name) => json_rpc_error(id, -32602, &format!("Unknown tool: {}", name)),
None => json_rpc_error(id, -32602, "Missing tool name"),
}
}
async fn call_doc_search(id: Option<Value>, args: Option<&Value>) -> Value {
let input: DocSearchInput = match args.and_then(|a| serde_json::from_value(a.clone()).ok()) {
Some(i) => i,
None => return json_rpc_error(id, -32602, "Invalid arguments for gdelt_doc_search"),
};
match GdeltClient::new() {
Ok(client) => {
let api = DocApi::new(client);
match api.search(crate::api::doc::DocSearchParams {
query: input.query,
timespan: Some(input.timespan),
max_records: Some(input.max_records),
source_lang: input.lang,
source_country: input.country,
..Default::default()
}).await {
Ok(results) => tool_result(id, ToolResult::success(results)),
Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
}
}
Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
}
}
async fn call_events_query(id: Option<Value>, args: Option<&Value>) -> Value {
let input: EventsQueryInput = match args.and_then(|a| serde_json::from_value(a.clone()).ok()) {
Some(i) => i,
None => return json_rpc_error(id, -32602, "Invalid arguments for gdelt_events_query"),
};
match AnalyticsDb::open() {
Ok(db) => {
let filters = crate::db::duckdb::EventFilters {
actor: input.actor,
event_code: input.event_code,
country: input.country,
start_date: input.start,
end_date: input.end,
limit: input.limit,
..Default::default()
};
match db.query_events(filters) {
Ok(results) => tool_result(id, ToolResult::success(results)),
Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
}
}
Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
}
}
async fn call_gkg_query(id: Option<Value>, args: Option<&Value>) -> Value {
let input: GkgQueryInput = match args.and_then(|a| serde_json::from_value(a.clone()).ok()) {
Some(i) => i,
None => return json_rpc_error(id, -32602, "Invalid arguments for gdelt_gkg_query"),
};
match AnalyticsDb::open() {
Ok(db) => {
let mut conditions = Vec::new();
if let Some(ref theme) = input.theme {
conditions.push(format!("list_contains(themes, '{}')", theme));
}
if let Some(ref person) = input.person {
conditions.push(format!("list_contains(persons, '{}')", person));
}
if let Some(ref org) = input.organization {
conditions.push(format!("list_contains(organizations, '{}')", org));
}
let where_clause = if conditions.is_empty() {
String::new()
} else {
format!("WHERE {}", conditions.join(" AND "))
};
let sql = format!(
"SELECT * FROM gkg {} LIMIT {}",
where_clause, input.limit
);
match db.query(&sql) {
Ok(results) => tool_result(id, ToolResult::success(results)),
Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
}
}
Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
}
}
async fn call_analytics_trends(id: Option<Value>, args: Option<&Value>) -> Value {
let input: AnalyticsTrendsInput = match args.and_then(|a| serde_json::from_value(a.clone()).ok()) {
Some(i) => i,
None => return json_rpc_error(id, -32602, "Invalid arguments for gdelt_analytics_trends"),
};
match AnalyticsDb::open() {
Ok(db) => {
let config = crate::analytics::trends::TrendsConfig {
topics: input.topics,
detect_anomalies: input.detect_anomalies,
..Default::default()
};
match crate::analytics::trends::analyze_trends(&db, &config) {
Ok(results) => tool_result(id, ToolResult::success(results)),
Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
}
}
Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
}
}
async fn call_analytics_sentiment(id: Option<Value>, args: Option<&Value>) -> Value {
let input: AnalyticsSentimentInput = match args.and_then(|a| serde_json::from_value(a.clone()).ok()) {
Some(i) => i,
None => return json_rpc_error(id, -32602, "Invalid arguments for gdelt_analytics_sentiment"),
};
match AnalyticsDb::open() {
Ok(db) => {
let config = crate::analytics::sentiment::SentimentConfig {
topic: input.topic,
..Default::default()
};
match crate::analytics::sentiment::analyze_sentiment(&db, &config) {
Ok(results) => tool_result(id, ToolResult::success(results)),
Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
}
}
Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
}
}
async fn call_geo_search(id: Option<Value>, args: Option<&Value>) -> Value {
let input: GeoSearchInput = match args.and_then(|a| serde_json::from_value(a.clone()).ok()) {
Some(i) => i,
None => return json_rpc_error(id, -32602, "Invalid arguments for gdelt_geo_search"),
};
match GdeltClient::new() {
Ok(client) => {
let api = GeoApi::new(client);
match api.search(crate::api::geo::GeoSearchParams {
query: input.query,
timespan: Some(input.timespan),
max_points: Some(input.max_points),
country: input.country,
..Default::default()
}).await {
Ok(results) => tool_result(id, ToolResult::success(results)),
Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
}
}
Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
}
}
async fn call_tv_search(id: Option<Value>, args: Option<&Value>) -> Value {
let input: TvSearchInput = match args.and_then(|a| serde_json::from_value(a.clone()).ok()) {
Some(i) => i,
None => return json_rpc_error(id, -32602, "Invalid arguments for gdelt_tv_search"),
};
match GdeltClient::new() {
Ok(client) => {
let api = TvApi::new(client);
match api.search(crate::api::tv::TvSearchParams {
query: input.query,
timespan: Some(input.timespan),
station: input.station,
..Default::default()
}).await {
Ok(results) => tool_result(id, ToolResult::success(results)),
Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
}
}
Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
}
}
fn json_rpc_error(id: Option<Value>, code: i32, message: &str) -> Value {
serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"error": {
"code": code,
"message": message
}
})
}
fn tool_result<T: serde::Serialize>(id: Option<Value>, result: ToolResult<T>) -> Value {
let content = serde_json::to_string(&result).unwrap_or_else(|_| "{}".to_string());
serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"result": {
"content": [{
"type": "text",
"text": content
}]
}
})
}