Skip to main content

fetchium_mcp/
lib.rs

1//! Fetchium MCP Server — Model Context Protocol integration (PRD §30).
2//!
3//! Implements the MCP protocol as JSON-RPC 2.0 over stdio.
4//! All log output goes to stderr; all MCP protocol output goes to stdout.
5//!
6//! Provides Fetchium-branded composite tools over MCP.
7
8pub mod handlers;
9pub mod tools;
10
11use crate::tools::{
12    EstimateInput, ExpandInput, FetchInput, HackerNewsSearchInput, RedditSearchInput,
13    ResearchInput, SearchInput, SocialResearchInput, YouTubeAnalyzeInput, YouTubeSearchInput,
14    YouTubeTranscriptInput, YouTubeWatchInput,
15};
16use axum::{
17    extract::State,
18    http::StatusCode,
19    routing::{get, post},
20    Json, Router,
21};
22use fetchium_core::cache::MemoryCache;
23use fetchium_core::config::FetchiumConfig;
24use fetchium_core::http::client::HttpClient;
25use fetchium_core::summarize::SummarizeConfig;
26use serde::{Deserialize, Serialize};
27use serde_json::{json, Value};
28use std::io::{self, BufRead, Write};
29use std::sync::Arc;
30use tower_http::trace::{DefaultOnFailure, DefaultOnRequest, DefaultOnResponse, TraceLayer};
31
32// ─── JSON-RPC 2.0 types ──────────────────────────────────────────
33
34#[derive(Debug, Deserialize)]
35struct JsonRpcRequest {
36    #[allow(dead_code)]
37    jsonrpc: String,
38    id: Option<Value>,
39    method: String,
40    params: Option<Value>,
41}
42
43#[derive(Debug, Serialize)]
44struct JsonRpcResponse {
45    jsonrpc: &'static str,
46    id: Value,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    result: Option<Value>,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    error: Option<JsonRpcError>,
51}
52
53#[derive(Debug, Serialize)]
54struct JsonRpcError {
55    code: i32,
56    message: String,
57}
58
59#[derive(Clone)]
60struct McpHttpState {
61    config: FetchiumConfig,
62    http: HttpClient,
63    cache: MemoryCache,
64}
65
66impl JsonRpcResponse {
67    fn ok(id: Value, result: Value) -> Self {
68        Self {
69            jsonrpc: "2.0",
70            id,
71            result: Some(result),
72            error: None,
73        }
74    }
75
76    fn err(id: Value, code: i32, message: String) -> Self {
77        Self {
78            jsonrpc: "2.0",
79            id,
80            result: None,
81            error: Some(JsonRpcError { code, message }),
82        }
83    }
84}
85
86/// Run the MCP server in stdio mode.
87///
88/// Reads JSON-RPC requests from stdin line by line, dispatches to handlers,
89/// and writes JSON-RPC responses to stdout. All diagnostics go to stderr.
90pub async fn run_mcp_stdio(config: FetchiumConfig) -> anyhow::Result<()> {
91    eprintln!("[fetchium-mcp] Fetchium MCP server starting (stdio transport)");
92
93    let http = HttpClient::new(&config)?;
94    let cache = MemoryCache::new(50, 3600, true);
95
96    let stdin = io::stdin();
97    let stdout = io::stdout();
98
99    for line in stdin.lock().lines() {
100        let line = match line {
101            Ok(l) if l.trim().is_empty() => continue,
102            Ok(l) => l,
103            Err(e) => {
104                eprintln!("[fetchium-mcp] stdin read error: {e}");
105                break;
106            }
107        };
108
109        let response = handle_message(&line, &config, &http, &cache).await;
110
111        let json_out = serde_json::to_string(&response).unwrap_or_else(|e| {
112            format!(
113                r#"{{"jsonrpc":"2.0","id":null,"error":{{"code":-32603,"message":"{}"}}}}"#,
114                e
115            )
116        });
117
118        let mut out = stdout.lock();
119        let _ = writeln!(out, "{json_out}");
120        let _ = out.flush();
121    }
122
123    eprintln!("[fetchium-mcp] Server shutting down.");
124    Ok(())
125}
126
127/// Run the MCP server over HTTP JSON-RPC on `/mcp`.
128pub async fn run_mcp_http(config: FetchiumConfig, port: u16) -> anyhow::Result<()> {
129    let http = HttpClient::new(&config)?;
130    let cache = MemoryCache::new(50, 3600, true);
131    let state = Arc::new(McpHttpState {
132        config,
133        http,
134        cache,
135    });
136
137    let app = Router::new()
138        .route("/health", get(mcp_health))
139        .route("/mcp", post(mcp_http_handler))
140        .layer(
141            TraceLayer::new_for_http()
142                .on_request(DefaultOnRequest::new().level(tracing::Level::INFO))
143                .on_response(DefaultOnResponse::new().level(tracing::Level::INFO))
144                .on_failure(DefaultOnFailure::new().level(tracing::Level::ERROR)),
145        )
146        .with_state(state);
147
148    let addr = format!("0.0.0.0:{port}");
149    let listener = tokio::net::TcpListener::bind(&addr).await?;
150    eprintln!("[fetchium-mcp] Fetchium MCP server starting (http transport) on http://{addr}/mcp");
151    axum::serve(listener, app).await?;
152    Ok(())
153}
154
155/// Dispatch a single JSON-RPC message line and return the response.
156async fn handle_message(
157    line: &str,
158    config: &FetchiumConfig,
159    http: &HttpClient,
160    cache: &MemoryCache,
161) -> JsonRpcResponse {
162    let req: JsonRpcRequest = match serde_json::from_str(line) {
163        Ok(r) => r,
164        Err(e) => {
165            return JsonRpcResponse::err(Value::Null, -32700, format!("Parse error: {e}"));
166        }
167    };
168
169    handle_request(req, config, http, cache).await
170}
171
172async fn handle_request(
173    req: JsonRpcRequest,
174    config: &FetchiumConfig,
175    http: &HttpClient,
176    cache: &MemoryCache,
177) -> JsonRpcResponse {
178    let id = req.id.clone().unwrap_or(Value::Null);
179    let params = req.params.unwrap_or(Value::Null);
180
181    match req.method.as_str() {
182        // MCP protocol lifecycle
183        "initialize" => {
184            eprintln!("[fetchium-mcp] initialize");
185            JsonRpcResponse::ok(
186                id,
187                json!({
188                    "protocolVersion": "2024-11-05",
189                    "capabilities": {
190                        "tools": {}
191                    },
192                    "serverInfo": {
193                        "name": "fetchium",
194                        "version": env!("CARGO_PKG_VERSION"),
195                    }
196                }),
197            )
198        }
199
200        "notifications/initialized" => {
201            // Notification — no response needed; return empty result
202            eprintln!("[fetchium-mcp] initialized");
203            JsonRpcResponse::ok(id, Value::Null)
204        }
205
206        // List available tools
207        "tools/list" => JsonRpcResponse::ok(id, json!({ "tools": tools::tool_definitions() })),
208
209        // Tool call dispatch
210        "tools/call" => {
211            let tool_name = params.get("name").and_then(|v| v.as_str()).unwrap_or("");
212            let arguments = params
213                .get("arguments")
214                .cloned()
215                .unwrap_or(Value::Object(Default::default()));
216
217            let result = dispatch_tool(tool_name, arguments, config, http, cache).await;
218            JsonRpcResponse::ok(
219                id,
220                json!({
221                    "content": [{ "type": "text", "text": serde_json::to_string(&result).unwrap_or_default() }]
222                }),
223            )
224        }
225
226        // Ping
227        "ping" => JsonRpcResponse::ok(id, json!({})),
228
229        other => JsonRpcResponse::err(id, -32601, format!("Method not found: {other}")),
230    }
231}
232
233async fn mcp_health() -> Json<Value> {
234    Json(json!({
235        "status": "ok",
236        "service": "fetchium-mcp",
237        "transport": "http",
238        "version": env!("CARGO_PKG_VERSION"),
239    }))
240}
241
242async fn mcp_http_handler(
243    State(state): State<Arc<McpHttpState>>,
244    Json(req): Json<JsonRpcRequest>,
245) -> (StatusCode, Json<JsonRpcResponse>) {
246    let response = handle_request(req, &state.config, &state.http, &state.cache).await;
247    (StatusCode::OK, Json(response))
248}
249
250/// Dispatch a `tools/call` to the appropriate handler.
251async fn dispatch_tool(
252    name: &str,
253    args: Value,
254    config: &FetchiumConfig,
255    http: &HttpClient,
256    cache: &MemoryCache,
257) -> Value {
258    match name {
259        "fetchium_search" => match serde_json::from_value::<SearchInput>(args) {
260            Ok(input) => handlers::handle_search(input, config, http, Some(cache)).await,
261            Err(e) => json!({ "error": format!("Invalid input: {e}") }),
262        },
263        "fetchium_fetch" => match serde_json::from_value::<FetchInput>(args) {
264            Ok(input) => handlers::handle_fetch(input, config, http, Some(cache)).await,
265            Err(e) => json!({ "error": format!("Invalid input: {e}") }),
266        },
267        "fetchium_research" => match serde_json::from_value::<ResearchInput>(args) {
268            Ok(input) => handlers::handle_research(input, config, http, Some(cache)).await,
269            Err(e) => json!({ "error": format!("Invalid input: {e}") }),
270        },
271        "fetchium_estimate" => match serde_json::from_value::<EstimateInput>(args) {
272            Ok(input) => handlers::handle_estimate(input, config, http, Some(cache)).await,
273            Err(e) => json!({ "error": format!("Invalid input: {e}") }),
274        },
275        "fetchium_expand" => match serde_json::from_value::<ExpandInput>(args) {
276            Ok(input) => handlers::handle_expand(input, config, http, Some(cache)).await,
277            Err(e) => json!({ "error": format!("Invalid input: {e}") }),
278        },
279        "youtube_search" => match serde_json::from_value::<YouTubeSearchInput>(args) {
280            Ok(input) => {
281                let pipeline_config = fetchium_core::youtube::types::YouTubePipelineConfig {
282                    query: input.query,
283                    max_videos: input.max_results.unwrap_or(5),
284                    fetch_transcript: false,
285                    fetch_comments: false,
286                    fact_check: input.fact_check.unwrap_or(false),
287                    ..Default::default()
288                };
289                match fetchium_core::youtube::pipeline::run_youtube_pipeline(
290                    &pipeline_config,
291                    config,
292                    http,
293                )
294                .await
295                {
296                    Ok(result) => {
297                        serde_json::to_value(&result).unwrap_or(json!({"error": "serialization"}))
298                    }
299                    Err(e) => json!({ "error": e.to_string() }),
300                }
301            }
302            Err(e) => json!({ "error": format!("Invalid input: {e}") }),
303        },
304        "youtube_analyze" => match serde_json::from_value::<YouTubeAnalyzeInput>(args) {
305            Ok(input) => {
306                match fetchium_core::youtube::pipeline::analyze_single_video(
307                    &input.url,
308                    config,
309                    http,
310                    input.comments.unwrap_or(true),
311                    input.transcript.unwrap_or(true),
312                    input.teaching.unwrap_or(false),
313                )
314                .await
315                {
316                    Ok(result) => {
317                        serde_json::to_value(&result).unwrap_or(json!({"error": "serialization"}))
318                    }
319                    Err(e) => json!({ "error": e.to_string() }),
320                }
321            }
322            Err(e) => json!({ "error": format!("Invalid input: {e}") }),
323        },
324        "youtube_watch" => match serde_json::from_value::<YouTubeWatchInput>(args) {
325            Ok(input) => {
326                match fetchium_core::youtube::pipeline::analyze_single_video(
327                    &input.url,
328                    config,
329                    http,
330                    input.comments.unwrap_or(true),
331                    input.transcript.unwrap_or(true),
332                    false,
333                )
334                .await
335                {
336                    Ok(result) => {
337                        let summary = fetchium_core::summarize::summarize(
338                            &input.url,
339                            &SummarizeConfig::default(),
340                            config,
341                        )
342                        .await
343                        .ok()
344                        .map(|s| s.summary);
345                        let highlights = result
346                            .videos
347                            .first()
348                            .and_then(|v| v.transcript.as_ref())
349                            .map(|t| {
350                                let mut moments = t.key_moments.clone();
351                                moments.sort_by(|a, b| {
352                                    b.importance
353                                        .partial_cmp(&a.importance)
354                                        .unwrap_or(std::cmp::Ordering::Equal)
355                                });
356                                moments
357                                    .into_iter()
358                                    .take(input.highlights.unwrap_or(5))
359                                    .collect::<Vec<_>>()
360                            })
361                            .unwrap_or_default();
362                        json!({
363                            "analysis": result,
364                            "summary": summary,
365                            "highlights": highlights
366                        })
367                    }
368                    Err(e) => json!({ "error": e.to_string() }),
369                }
370            }
371            Err(e) => json!({ "error": format!("Invalid input: {e}") }),
372        },
373        "youtube_transcript" => match serde_json::from_value::<YouTubeTranscriptInput>(args) {
374            Ok(input) => {
375                match fetchium_core::youtube::universal::fetch_universal_transcript(
376                    &input.url, http, config,
377                )
378                .await
379                {
380                    Ok(transcript) => {
381                        let mut highlights = transcript.key_moments.clone();
382                        highlights.sort_by(|a, b| {
383                            b.importance
384                                .partial_cmp(&a.importance)
385                                .unwrap_or(std::cmp::Ordering::Equal)
386                        });
387                        json!({
388                            "transcript": transcript,
389                            "highlights": highlights.into_iter().take(input.highlights.unwrap_or(5)).collect::<Vec<_>>()
390                        })
391                    }
392                    Err(e) => json!({ "error": e.to_string() }),
393                }
394            }
395            Err(e) => json!({ "error": format!("Invalid input: {e}") }),
396        },
397        "social_research" => match serde_json::from_value::<SocialResearchInput>(args) {
398            Ok(input) => {
399                use fetchium_core::social::types::{SocialPipelineConfig, SocialPlatform};
400                let platforms = input
401                    .platforms
402                    .as_deref()
403                    .map(|ps| {
404                        ps.iter()
405                            .filter_map(|p| match p.as_str() {
406                                "twitter" => Some(SocialPlatform::Twitter),
407                                "reddit" => Some(SocialPlatform::Reddit),
408                                "tiktok" => Some(SocialPlatform::TikTok),
409                                "hackernews" | "hn" => Some(SocialPlatform::HackerNews),
410                                "youtube" => Some(SocialPlatform::YouTube),
411                                _ => None,
412                            })
413                            .collect()
414                    })
415                    .unwrap_or_else(|| {
416                        vec![
417                            SocialPlatform::Twitter,
418                            SocialPlatform::Reddit,
419                            SocialPlatform::TikTok,
420                            SocialPlatform::HackerNews,
421                            SocialPlatform::YouTube,
422                        ]
423                    });
424                let cfg = SocialPipelineConfig {
425                    query: input.query,
426                    platforms,
427                    max_posts_per_platform: input.max_per_platform.unwrap_or(20),
428                    include_trends: true,
429                    generate_ideas: input.generate_ideas.unwrap_or(true),
430                    deep_analysis: false,
431                    timeout_secs: 30,
432                };
433                match fetchium_core::social::unified::engine::run_social_pipeline(
434                    &cfg, config, http,
435                )
436                .await
437                {
438                    Ok(result) => {
439                        serde_json::to_value(&result).unwrap_or(json!({"error": "serialization"}))
440                    }
441                    Err(e) => json!({ "error": e.to_string() }),
442                }
443            }
444            Err(e) => json!({ "error": format!("Invalid input: {e}") }),
445        },
446        "reddit_search" => match serde_json::from_value::<RedditSearchInput>(args) {
447            Ok(input) => {
448                use fetchium_core::social::reddit::{
449                    pipeline as rd_pipeline, types::RedditPipelineConfig,
450                };
451                let cfg = RedditPipelineConfig {
452                    query: input.query,
453                    subreddits: input.subreddits.unwrap_or_default(),
454                    max_posts: input.max_posts.unwrap_or(25),
455                    ..Default::default()
456                };
457                match rd_pipeline::run_reddit_pipeline(&cfg, config, http).await {
458                    Ok(result) => {
459                        serde_json::to_value(&result).unwrap_or(json!({"error": "serialization"}))
460                    }
461                    Err(e) => json!({ "error": e.to_string() }),
462                }
463            }
464            Err(e) => json!({ "error": format!("Invalid input: {e}") }),
465        },
466        "hackernews_search" => match serde_json::from_value::<HackerNewsSearchInput>(args) {
467            Ok(input) => {
468                match fetchium_core::social::hackernews::search_stories(
469                    &input.query,
470                    input.max_results.unwrap_or(20),
471                    http,
472                    15,
473                )
474                .await
475                {
476                    Ok(stories) => {
477                        serde_json::to_value(&stories).unwrap_or(json!({"error": "serialization"}))
478                    }
479                    Err(e) => json!({ "error": e.to_string() }),
480                }
481            }
482            Err(e) => json!({ "error": format!("Invalid input: {e}") }),
483        },
484        other => json!({ "error": format!("Unknown tool: {other}") }),
485    }
486}
487
488#[cfg(test)]
489mod tests {
490    use super::*;
491    use fetchium_core::config::FetchiumConfig;
492
493    #[test]
494    fn tool_definitions_has_correct_count() {
495        let tools = tools::tool_definitions();
496        // 5 fetchium core + 4 YouTube + 3 social tools
497        assert_eq!(tools.len(), 12);
498    }
499
500    #[test]
501    fn tool_names_are_correct() {
502        let tools = tools::tool_definitions();
503        let names: Vec<&str> = tools
504            .iter()
505            .filter_map(|t| t.get("name").and_then(|n| n.as_str()))
506            .collect();
507        assert!(names.contains(&"fetchium_search"));
508        assert!(names.contains(&"fetchium_fetch"));
509        assert!(names.contains(&"fetchium_research"));
510        assert!(names.contains(&"fetchium_estimate"));
511        assert!(names.contains(&"fetchium_expand"));
512    }
513
514    #[tokio::test]
515    async fn initialize_request_returns_fetchium_server_info() {
516        let config = FetchiumConfig::default();
517        let http = HttpClient::new(&config).unwrap();
518        let cache = MemoryCache::new(10, 60, true);
519        let response = handle_request(
520            JsonRpcRequest {
521                jsonrpc: "2.0".into(),
522                id: Some(json!(1)),
523                method: "initialize".into(),
524                params: Some(json!({})),
525            },
526            &config,
527            &http,
528            &cache,
529        )
530        .await;
531
532        let result = response.result.unwrap();
533        assert_eq!(result["serverInfo"]["name"], "fetchium");
534    }
535}