1pub mod handlers;
9pub mod tools;
10
11use crate::tools::{
12 EstimateInput, ExpandInput, FetchInput, HackerNewsSearchInput, RedditSearchInput,
13 ResearchInput, SearchInput, SocialResearchInput, YouTubeAnalyzeInput, YouTubeSearchInput,
14 YouTubeTranscriptInput, YouTubeWatchInput,
15};
16use axum::{
17 extract::State,
18 http::StatusCode,
19 routing::{get, post},
20 Json, Router,
21};
22use fetchium_core::cache::MemoryCache;
23use fetchium_core::config::FetchiumConfig;
24use fetchium_core::http::client::HttpClient;
25use fetchium_core::summarize::SummarizeConfig;
26use serde::{Deserialize, Serialize};
27use serde_json::{json, Value};
28use std::io::{self, BufRead, Write};
29use std::sync::Arc;
30use tower_http::trace::{DefaultOnFailure, DefaultOnRequest, DefaultOnResponse, TraceLayer};
31
32#[derive(Debug, Deserialize)]
35struct JsonRpcRequest {
36 #[allow(dead_code)]
37 jsonrpc: String,
38 id: Option<Value>,
39 method: String,
40 params: Option<Value>,
41}
42
43#[derive(Debug, Serialize)]
44struct JsonRpcResponse {
45 jsonrpc: &'static str,
46 id: Value,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 result: Option<Value>,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 error: Option<JsonRpcError>,
51}
52
53#[derive(Debug, Serialize)]
54struct JsonRpcError {
55 code: i32,
56 message: String,
57}
58
59#[derive(Clone)]
60struct McpHttpState {
61 config: FetchiumConfig,
62 http: HttpClient,
63 cache: MemoryCache,
64}
65
66impl JsonRpcResponse {
67 fn ok(id: Value, result: Value) -> Self {
68 Self {
69 jsonrpc: "2.0",
70 id,
71 result: Some(result),
72 error: None,
73 }
74 }
75
76 fn err(id: Value, code: i32, message: String) -> Self {
77 Self {
78 jsonrpc: "2.0",
79 id,
80 result: None,
81 error: Some(JsonRpcError { code, message }),
82 }
83 }
84}
85
86pub async fn run_mcp_stdio(config: FetchiumConfig) -> anyhow::Result<()> {
91 eprintln!("[fetchium-mcp] Fetchium MCP server starting (stdio transport)");
92
93 let http = HttpClient::new(&config)?;
94 let cache = MemoryCache::new(50, 3600, true);
95
96 let stdin = io::stdin();
97 let stdout = io::stdout();
98
99 for line in stdin.lock().lines() {
100 let line = match line {
101 Ok(l) if l.trim().is_empty() => continue,
102 Ok(l) => l,
103 Err(e) => {
104 eprintln!("[fetchium-mcp] stdin read error: {e}");
105 break;
106 }
107 };
108
109 let response = handle_message(&line, &config, &http, &cache).await;
110
111 let json_out = serde_json::to_string(&response).unwrap_or_else(|e| {
112 format!(
113 r#"{{"jsonrpc":"2.0","id":null,"error":{{"code":-32603,"message":"{}"}}}}"#,
114 e
115 )
116 });
117
118 let mut out = stdout.lock();
119 let _ = writeln!(out, "{json_out}");
120 let _ = out.flush();
121 }
122
123 eprintln!("[fetchium-mcp] Server shutting down.");
124 Ok(())
125}
126
127pub async fn run_mcp_http(config: FetchiumConfig, port: u16) -> anyhow::Result<()> {
129 let http = HttpClient::new(&config)?;
130 let cache = MemoryCache::new(50, 3600, true);
131 let state = Arc::new(McpHttpState {
132 config,
133 http,
134 cache,
135 });
136
137 let app = Router::new()
138 .route("/health", get(mcp_health))
139 .route("/mcp", post(mcp_http_handler))
140 .layer(
141 TraceLayer::new_for_http()
142 .on_request(DefaultOnRequest::new().level(tracing::Level::INFO))
143 .on_response(DefaultOnResponse::new().level(tracing::Level::INFO))
144 .on_failure(DefaultOnFailure::new().level(tracing::Level::ERROR)),
145 )
146 .with_state(state);
147
148 let addr = format!("0.0.0.0:{port}");
149 let listener = tokio::net::TcpListener::bind(&addr).await?;
150 eprintln!("[fetchium-mcp] Fetchium MCP server starting (http transport) on http://{addr}/mcp");
151 axum::serve(listener, app).await?;
152 Ok(())
153}
154
155async fn handle_message(
157 line: &str,
158 config: &FetchiumConfig,
159 http: &HttpClient,
160 cache: &MemoryCache,
161) -> JsonRpcResponse {
162 let req: JsonRpcRequest = match serde_json::from_str(line) {
163 Ok(r) => r,
164 Err(e) => {
165 return JsonRpcResponse::err(Value::Null, -32700, format!("Parse error: {e}"));
166 }
167 };
168
169 handle_request(req, config, http, cache).await
170}
171
172async fn handle_request(
173 req: JsonRpcRequest,
174 config: &FetchiumConfig,
175 http: &HttpClient,
176 cache: &MemoryCache,
177) -> JsonRpcResponse {
178 let id = req.id.clone().unwrap_or(Value::Null);
179 let params = req.params.unwrap_or(Value::Null);
180
181 match req.method.as_str() {
182 "initialize" => {
184 eprintln!("[fetchium-mcp] initialize");
185 JsonRpcResponse::ok(
186 id,
187 json!({
188 "protocolVersion": "2024-11-05",
189 "capabilities": {
190 "tools": {}
191 },
192 "serverInfo": {
193 "name": "fetchium",
194 "version": env!("CARGO_PKG_VERSION"),
195 }
196 }),
197 )
198 }
199
200 "notifications/initialized" => {
201 eprintln!("[fetchium-mcp] initialized");
203 JsonRpcResponse::ok(id, Value::Null)
204 }
205
206 "tools/list" => JsonRpcResponse::ok(id, json!({ "tools": tools::tool_definitions() })),
208
209 "tools/call" => {
211 let tool_name = params.get("name").and_then(|v| v.as_str()).unwrap_or("");
212 let arguments = params
213 .get("arguments")
214 .cloned()
215 .unwrap_or(Value::Object(Default::default()));
216
217 let result = dispatch_tool(tool_name, arguments, config, http, cache).await;
218 JsonRpcResponse::ok(
219 id,
220 json!({
221 "content": [{ "type": "text", "text": serde_json::to_string(&result).unwrap_or_default() }]
222 }),
223 )
224 }
225
226 "ping" => JsonRpcResponse::ok(id, json!({})),
228
229 other => JsonRpcResponse::err(id, -32601, format!("Method not found: {other}")),
230 }
231}
232
233async fn mcp_health() -> Json<Value> {
234 Json(json!({
235 "status": "ok",
236 "service": "fetchium-mcp",
237 "transport": "http",
238 "version": env!("CARGO_PKG_VERSION"),
239 }))
240}
241
242async fn mcp_http_handler(
243 State(state): State<Arc<McpHttpState>>,
244 Json(req): Json<JsonRpcRequest>,
245) -> (StatusCode, Json<JsonRpcResponse>) {
246 let response = handle_request(req, &state.config, &state.http, &state.cache).await;
247 (StatusCode::OK, Json(response))
248}
249
250async fn dispatch_tool(
252 name: &str,
253 args: Value,
254 config: &FetchiumConfig,
255 http: &HttpClient,
256 cache: &MemoryCache,
257) -> Value {
258 match name {
259 "fetchium_search" => match serde_json::from_value::<SearchInput>(args) {
260 Ok(input) => handlers::handle_search(input, config, http, Some(cache)).await,
261 Err(e) => json!({ "error": format!("Invalid input: {e}") }),
262 },
263 "fetchium_fetch" => match serde_json::from_value::<FetchInput>(args) {
264 Ok(input) => handlers::handle_fetch(input, config, http, Some(cache)).await,
265 Err(e) => json!({ "error": format!("Invalid input: {e}") }),
266 },
267 "fetchium_research" => match serde_json::from_value::<ResearchInput>(args) {
268 Ok(input) => handlers::handle_research(input, config, http, Some(cache)).await,
269 Err(e) => json!({ "error": format!("Invalid input: {e}") }),
270 },
271 "fetchium_estimate" => match serde_json::from_value::<EstimateInput>(args) {
272 Ok(input) => handlers::handle_estimate(input, config, http, Some(cache)).await,
273 Err(e) => json!({ "error": format!("Invalid input: {e}") }),
274 },
275 "fetchium_expand" => match serde_json::from_value::<ExpandInput>(args) {
276 Ok(input) => handlers::handle_expand(input, config, http, Some(cache)).await,
277 Err(e) => json!({ "error": format!("Invalid input: {e}") }),
278 },
279 "youtube_search" => match serde_json::from_value::<YouTubeSearchInput>(args) {
280 Ok(input) => {
281 let pipeline_config = fetchium_core::youtube::types::YouTubePipelineConfig {
282 query: input.query,
283 max_videos: input.max_results.unwrap_or(5),
284 fetch_transcript: false,
285 fetch_comments: false,
286 fact_check: input.fact_check.unwrap_or(false),
287 ..Default::default()
288 };
289 match fetchium_core::youtube::pipeline::run_youtube_pipeline(
290 &pipeline_config,
291 config,
292 http,
293 )
294 .await
295 {
296 Ok(result) => {
297 serde_json::to_value(&result).unwrap_or(json!({"error": "serialization"}))
298 }
299 Err(e) => json!({ "error": e.to_string() }),
300 }
301 }
302 Err(e) => json!({ "error": format!("Invalid input: {e}") }),
303 },
304 "youtube_analyze" => match serde_json::from_value::<YouTubeAnalyzeInput>(args) {
305 Ok(input) => {
306 match fetchium_core::youtube::pipeline::analyze_single_video(
307 &input.url,
308 config,
309 http,
310 input.comments.unwrap_or(true),
311 input.transcript.unwrap_or(true),
312 input.teaching.unwrap_or(false),
313 )
314 .await
315 {
316 Ok(result) => {
317 serde_json::to_value(&result).unwrap_or(json!({"error": "serialization"}))
318 }
319 Err(e) => json!({ "error": e.to_string() }),
320 }
321 }
322 Err(e) => json!({ "error": format!("Invalid input: {e}") }),
323 },
324 "youtube_watch" => match serde_json::from_value::<YouTubeWatchInput>(args) {
325 Ok(input) => {
326 match fetchium_core::youtube::pipeline::analyze_single_video(
327 &input.url,
328 config,
329 http,
330 input.comments.unwrap_or(true),
331 input.transcript.unwrap_or(true),
332 false,
333 )
334 .await
335 {
336 Ok(result) => {
337 let summary = fetchium_core::summarize::summarize(
338 &input.url,
339 &SummarizeConfig::default(),
340 config,
341 )
342 .await
343 .ok()
344 .map(|s| s.summary);
345 let highlights = result
346 .videos
347 .first()
348 .and_then(|v| v.transcript.as_ref())
349 .map(|t| {
350 let mut moments = t.key_moments.clone();
351 moments.sort_by(|a, b| {
352 b.importance
353 .partial_cmp(&a.importance)
354 .unwrap_or(std::cmp::Ordering::Equal)
355 });
356 moments
357 .into_iter()
358 .take(input.highlights.unwrap_or(5))
359 .collect::<Vec<_>>()
360 })
361 .unwrap_or_default();
362 json!({
363 "analysis": result,
364 "summary": summary,
365 "highlights": highlights
366 })
367 }
368 Err(e) => json!({ "error": e.to_string() }),
369 }
370 }
371 Err(e) => json!({ "error": format!("Invalid input: {e}") }),
372 },
373 "youtube_transcript" => match serde_json::from_value::<YouTubeTranscriptInput>(args) {
374 Ok(input) => {
375 match fetchium_core::youtube::universal::fetch_universal_transcript(
376 &input.url, http, config,
377 )
378 .await
379 {
380 Ok(transcript) => {
381 let mut highlights = transcript.key_moments.clone();
382 highlights.sort_by(|a, b| {
383 b.importance
384 .partial_cmp(&a.importance)
385 .unwrap_or(std::cmp::Ordering::Equal)
386 });
387 json!({
388 "transcript": transcript,
389 "highlights": highlights.into_iter().take(input.highlights.unwrap_or(5)).collect::<Vec<_>>()
390 })
391 }
392 Err(e) => json!({ "error": e.to_string() }),
393 }
394 }
395 Err(e) => json!({ "error": format!("Invalid input: {e}") }),
396 },
397 "social_research" => match serde_json::from_value::<SocialResearchInput>(args) {
398 Ok(input) => {
399 use fetchium_core::social::types::{SocialPipelineConfig, SocialPlatform};
400 let platforms = input
401 .platforms
402 .as_deref()
403 .map(|ps| {
404 ps.iter()
405 .filter_map(|p| match p.as_str() {
406 "twitter" => Some(SocialPlatform::Twitter),
407 "reddit" => Some(SocialPlatform::Reddit),
408 "tiktok" => Some(SocialPlatform::TikTok),
409 "hackernews" | "hn" => Some(SocialPlatform::HackerNews),
410 "youtube" => Some(SocialPlatform::YouTube),
411 _ => None,
412 })
413 .collect()
414 })
415 .unwrap_or_else(|| {
416 vec![
417 SocialPlatform::Twitter,
418 SocialPlatform::Reddit,
419 SocialPlatform::TikTok,
420 SocialPlatform::HackerNews,
421 SocialPlatform::YouTube,
422 ]
423 });
424 let cfg = SocialPipelineConfig {
425 query: input.query,
426 platforms,
427 max_posts_per_platform: input.max_per_platform.unwrap_or(20),
428 include_trends: true,
429 generate_ideas: input.generate_ideas.unwrap_or(true),
430 deep_analysis: false,
431 timeout_secs: 30,
432 };
433 match fetchium_core::social::unified::engine::run_social_pipeline(
434 &cfg, config, http,
435 )
436 .await
437 {
438 Ok(result) => {
439 serde_json::to_value(&result).unwrap_or(json!({"error": "serialization"}))
440 }
441 Err(e) => json!({ "error": e.to_string() }),
442 }
443 }
444 Err(e) => json!({ "error": format!("Invalid input: {e}") }),
445 },
446 "reddit_search" => match serde_json::from_value::<RedditSearchInput>(args) {
447 Ok(input) => {
448 use fetchium_core::social::reddit::{
449 pipeline as rd_pipeline, types::RedditPipelineConfig,
450 };
451 let cfg = RedditPipelineConfig {
452 query: input.query,
453 subreddits: input.subreddits.unwrap_or_default(),
454 max_posts: input.max_posts.unwrap_or(25),
455 ..Default::default()
456 };
457 match rd_pipeline::run_reddit_pipeline(&cfg, config, http).await {
458 Ok(result) => {
459 serde_json::to_value(&result).unwrap_or(json!({"error": "serialization"}))
460 }
461 Err(e) => json!({ "error": e.to_string() }),
462 }
463 }
464 Err(e) => json!({ "error": format!("Invalid input: {e}") }),
465 },
466 "hackernews_search" => match serde_json::from_value::<HackerNewsSearchInput>(args) {
467 Ok(input) => {
468 match fetchium_core::social::hackernews::search_stories(
469 &input.query,
470 input.max_results.unwrap_or(20),
471 http,
472 15,
473 )
474 .await
475 {
476 Ok(stories) => {
477 serde_json::to_value(&stories).unwrap_or(json!({"error": "serialization"}))
478 }
479 Err(e) => json!({ "error": e.to_string() }),
480 }
481 }
482 Err(e) => json!({ "error": format!("Invalid input: {e}") }),
483 },
484 other => json!({ "error": format!("Unknown tool: {other}") }),
485 }
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491 use fetchium_core::config::FetchiumConfig;
492
493 #[test]
494 fn tool_definitions_has_correct_count() {
495 let tools = tools::tool_definitions();
496 assert_eq!(tools.len(), 12);
498 }
499
500 #[test]
501 fn tool_names_are_correct() {
502 let tools = tools::tool_definitions();
503 let names: Vec<&str> = tools
504 .iter()
505 .filter_map(|t| t.get("name").and_then(|n| n.as_str()))
506 .collect();
507 assert!(names.contains(&"fetchium_search"));
508 assert!(names.contains(&"fetchium_fetch"));
509 assert!(names.contains(&"fetchium_research"));
510 assert!(names.contains(&"fetchium_estimate"));
511 assert!(names.contains(&"fetchium_expand"));
512 }
513
514 #[tokio::test]
515 async fn initialize_request_returns_fetchium_server_info() {
516 let config = FetchiumConfig::default();
517 let http = HttpClient::new(&config).unwrap();
518 let cache = MemoryCache::new(10, 60, true);
519 let response = handle_request(
520 JsonRpcRequest {
521 jsonrpc: "2.0".into(),
522 id: Some(json!(1)),
523 method: "initialize".into(),
524 params: Some(json!({})),
525 },
526 &config,
527 &http,
528 &cache,
529 )
530 .await;
531
532 let result = response.result.unwrap();
533 assert_eq!(result["serverInfo"]["name"], "fetchium");
534 }
535}