gdelt 0.1.0

CLI for GDELT Project - optimized for agentic usage with local data caching
//! MCP server implementation.

#![cfg(feature = "mcp")]

use crate::api::client::GdeltClient;
use crate::api::doc::DocApi;
use crate::api::geo::GeoApi;
use crate::api::tv::TvApi;
use crate::db::AnalyticsDb;
use crate::error::{GdeltError, Result};
use serde_json::Value;
use std::io::{BufRead, Write};
use tracing::{debug, error, info};

use super::tools::*;

/// Start the MCP server with the specified transport
pub async fn start_mcp_server(transport: Transport, host: &str, port: u16) -> Result<()> {
    info!("Starting MCP server with {:?} transport", transport);

    match transport {
        Transport::Stdio => run_stdio_server().await,
        Transport::Sse | Transport::Http => {
            // HTTP/SSE transport would require hyper/axum
            Err(GdeltError::Other(format!(
                "HTTP/SSE transport not yet implemented. Use stdio transport."
            )))
        }
    }
}

/// Transport type for MCP
#[derive(Debug, Clone, Copy)]
pub enum Transport {
    Stdio,
    Sse,
    Http,
}

/// Run the MCP server over stdio
async fn run_stdio_server() -> Result<()> {
    let stdin = std::io::stdin();
    let mut stdout = std::io::stdout();

    info!("MCP server listening on stdio");

    for line in stdin.lock().lines() {
        let line = line.map_err(|e| GdeltError::Io(e))?;

        if line.is_empty() {
            continue;
        }

        debug!("Received: {}", line);

        // Parse JSON-RPC request
        let request: Value = match serde_json::from_str(&line) {
            Ok(v) => v,
            Err(e) => {
                let error_response = json_rpc_error(None, -32700, &format!("Parse error: {}", e));
                writeln!(stdout, "{}", serde_json::to_string(&error_response)?)?;
                stdout.flush()?;
                continue;
            }
        };

        let response = handle_request(request).await;
        let response_str = serde_json::to_string(&response)?;
        debug!("Sending: {}", response_str);
        writeln!(stdout, "{}", response_str)?;
        stdout.flush()?;
    }

    Ok(())
}

/// Handle a JSON-RPC request
async fn handle_request(request: Value) -> Value {
    let id = request.get("id").cloned();
    let method = request.get("method").and_then(|m| m.as_str());

    match method {
        Some("initialize") => handle_initialize(id),
        Some("tools/list") => handle_tools_list(id),
        Some("tools/call") => handle_tool_call(id, request.get("params")).await,
        Some("notifications/initialized") => {
            // No response needed for notifications
            Value::Null
        }
        Some(m) => json_rpc_error(id, -32601, &format!("Method not found: {}", m)),
        None => json_rpc_error(id, -32600, "Invalid request: missing method"),
    }
}

fn handle_initialize(id: Option<Value>) -> Value {
    serde_json::json!({
        "jsonrpc": "2.0",
        "id": id,
        "result": {
            "protocolVersion": "2024-11-05",
            "capabilities": {
                "tools": {}
            },
            "serverInfo": {
                "name": "gdelt",
                "version": env!("CARGO_PKG_VERSION")
            }
        }
    })
}

fn handle_tools_list(id: Option<Value>) -> Value {
    let tools = list_tools();
    let tool_list: Vec<Value> = tools
        .iter()
        .map(|t| {
            serde_json::json!({
                "name": t.name,
                "description": t.description,
                "inputSchema": t.input_schema
            })
        })
        .collect();

    serde_json::json!({
        "jsonrpc": "2.0",
        "id": id,
        "result": {
            "tools": tool_list
        }
    })
}

async fn handle_tool_call(id: Option<Value>, params: Option<&Value>) -> Value {
    let params = match params {
        Some(p) => p,
        None => return json_rpc_error(id, -32602, "Missing params"),
    };

    let tool_name = params.get("name").and_then(|n| n.as_str());
    let arguments = params.get("arguments");

    match tool_name {
        Some("gdelt_doc_search") => call_doc_search(id, arguments).await,
        Some("gdelt_events_query") => call_events_query(id, arguments).await,
        Some("gdelt_gkg_query") => call_gkg_query(id, arguments).await,
        Some("gdelt_analytics_trends") => call_analytics_trends(id, arguments).await,
        Some("gdelt_analytics_sentiment") => call_analytics_sentiment(id, arguments).await,
        Some("gdelt_geo_search") => call_geo_search(id, arguments).await,
        Some("gdelt_tv_search") => call_tv_search(id, arguments).await,
        Some(name) => json_rpc_error(id, -32602, &format!("Unknown tool: {}", name)),
        None => json_rpc_error(id, -32602, "Missing tool name"),
    }
}

async fn call_doc_search(id: Option<Value>, args: Option<&Value>) -> Value {
    let input: DocSearchInput = match args.and_then(|a| serde_json::from_value(a.clone()).ok()) {
        Some(i) => i,
        None => return json_rpc_error(id, -32602, "Invalid arguments for gdelt_doc_search"),
    };

    match GdeltClient::new() {
        Ok(client) => {
            let api = DocApi::new(client);
            match api.search(crate::api::doc::DocSearchParams {
                query: input.query,
                timespan: Some(input.timespan),
                max_records: Some(input.max_records),
                source_lang: input.lang,
                source_country: input.country,
                ..Default::default()
            }).await {
                Ok(results) => tool_result(id, ToolResult::success(results)),
                Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
            }
        }
        Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
    }
}

async fn call_events_query(id: Option<Value>, args: Option<&Value>) -> Value {
    let input: EventsQueryInput = match args.and_then(|a| serde_json::from_value(a.clone()).ok()) {
        Some(i) => i,
        None => return json_rpc_error(id, -32602, "Invalid arguments for gdelt_events_query"),
    };

    match AnalyticsDb::open() {
        Ok(db) => {
            let filters = crate::db::duckdb::EventFilters {
                actor: input.actor,
                event_code: input.event_code,
                country: input.country,
                start_date: input.start,
                end_date: input.end,
                limit: input.limit,
                ..Default::default()
            };
            match db.query_events(filters) {
                Ok(results) => tool_result(id, ToolResult::success(results)),
                Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
            }
        }
        Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
    }
}

async fn call_gkg_query(id: Option<Value>, args: Option<&Value>) -> Value {
    let input: GkgQueryInput = match args.and_then(|a| serde_json::from_value(a.clone()).ok()) {
        Some(i) => i,
        None => return json_rpc_error(id, -32602, "Invalid arguments for gdelt_gkg_query"),
    };

    match AnalyticsDb::open() {
        Ok(db) => {
            // Build SQL query based on filters
            let mut conditions = Vec::new();
            if let Some(ref theme) = input.theme {
                conditions.push(format!("list_contains(themes, '{}')", theme));
            }
            if let Some(ref person) = input.person {
                conditions.push(format!("list_contains(persons, '{}')", person));
            }
            if let Some(ref org) = input.organization {
                conditions.push(format!("list_contains(organizations, '{}')", org));
            }

            let where_clause = if conditions.is_empty() {
                String::new()
            } else {
                format!("WHERE {}", conditions.join(" AND "))
            };

            let sql = format!(
                "SELECT * FROM gkg {} LIMIT {}",
                where_clause, input.limit
            );

            match db.query(&sql) {
                Ok(results) => tool_result(id, ToolResult::success(results)),
                Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
            }
        }
        Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
    }
}

async fn call_analytics_trends(id: Option<Value>, args: Option<&Value>) -> Value {
    let input: AnalyticsTrendsInput = match args.and_then(|a| serde_json::from_value(a.clone()).ok()) {
        Some(i) => i,
        None => return json_rpc_error(id, -32602, "Invalid arguments for gdelt_analytics_trends"),
    };

    match AnalyticsDb::open() {
        Ok(db) => {
            let config = crate::analytics::trends::TrendsConfig {
                topics: input.topics,
                detect_anomalies: input.detect_anomalies,
                ..Default::default()
            };
            match crate::analytics::trends::analyze_trends(&db, &config) {
                Ok(results) => tool_result(id, ToolResult::success(results)),
                Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
            }
        }
        Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
    }
}

async fn call_analytics_sentiment(id: Option<Value>, args: Option<&Value>) -> Value {
    let input: AnalyticsSentimentInput = match args.and_then(|a| serde_json::from_value(a.clone()).ok()) {
        Some(i) => i,
        None => return json_rpc_error(id, -32602, "Invalid arguments for gdelt_analytics_sentiment"),
    };

    match AnalyticsDb::open() {
        Ok(db) => {
            let config = crate::analytics::sentiment::SentimentConfig {
                topic: input.topic,
                ..Default::default()
            };
            match crate::analytics::sentiment::analyze_sentiment(&db, &config) {
                Ok(results) => tool_result(id, ToolResult::success(results)),
                Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
            }
        }
        Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
    }
}

async fn call_geo_search(id: Option<Value>, args: Option<&Value>) -> Value {
    let input: GeoSearchInput = match args.and_then(|a| serde_json::from_value(a.clone()).ok()) {
        Some(i) => i,
        None => return json_rpc_error(id, -32602, "Invalid arguments for gdelt_geo_search"),
    };

    match GdeltClient::new() {
        Ok(client) => {
            let api = GeoApi::new(client);
            match api.search(crate::api::geo::GeoSearchParams {
                query: input.query,
                timespan: Some(input.timespan),
                max_points: Some(input.max_points),
                country: input.country,
                ..Default::default()
            }).await {
                Ok(results) => tool_result(id, ToolResult::success(results)),
                Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
            }
        }
        Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
    }
}

async fn call_tv_search(id: Option<Value>, args: Option<&Value>) -> Value {
    let input: TvSearchInput = match args.and_then(|a| serde_json::from_value(a.clone()).ok()) {
        Some(i) => i,
        None => return json_rpc_error(id, -32602, "Invalid arguments for gdelt_tv_search"),
    };

    match GdeltClient::new() {
        Ok(client) => {
            let api = TvApi::new(client);
            match api.search(crate::api::tv::TvSearchParams {
                query: input.query,
                timespan: Some(input.timespan),
                station: input.station,
                ..Default::default()
            }).await {
                Ok(results) => tool_result(id, ToolResult::success(results)),
                Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
            }
        }
        Err(e) => tool_result(id, ToolResult::<()>::error(e.to_string())),
    }
}

fn json_rpc_error(id: Option<Value>, code: i32, message: &str) -> Value {
    serde_json::json!({
        "jsonrpc": "2.0",
        "id": id,
        "error": {
            "code": code,
            "message": message
        }
    })
}

fn tool_result<T: serde::Serialize>(id: Option<Value>, result: ToolResult<T>) -> Value {
    let content = serde_json::to_string(&result).unwrap_or_else(|_| "{}".to_string());
    serde_json::json!({
        "jsonrpc": "2.0",
        "id": id,
        "result": {
            "content": [{
                "type": "text",
                "text": content
            }]
        }
    })
}