intent_engine/mcp/
server.rs

1//! Intent-Engine MCP Server (Rust Implementation)
2//!
3//! This is a native Rust implementation of the MCP (Model Context Protocol) server
4//! that provides a JSON-RPC 2.0 interface for AI assistants to interact with
5//! intent-engine's task management capabilities.
6//!
7//! Unlike the Python wrapper (mcp-server.py), this implementation directly uses
8//! the Rust library functions, avoiding subprocess overhead and improving performance.
9
10use crate::events::EventManager;
11use crate::project::ProjectContext;
12use crate::report::ReportManager;
13use crate::tasks::TaskManager;
14use crate::workspace::WorkspaceManager;
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value};
17use std::io::{self, BufRead, Write};
18
19#[derive(Debug, Deserialize)]
20struct JsonRpcRequest {
21    jsonrpc: String,
22    id: Option<Value>,
23    method: String,
24    params: Option<Value>,
25}
26
27#[derive(Debug, Serialize)]
28struct JsonRpcResponse {
29    jsonrpc: String,
30    id: Option<Value>,
31    #[serde(skip_serializing_if = "Option::is_none")]
32    result: Option<Value>,
33    #[serde(skip_serializing_if = "Option::is_none")]
34    error: Option<JsonRpcError>,
35}
36
37#[derive(Debug, Serialize)]
38struct JsonRpcError {
39    code: i32,
40    message: String,
41}
42
43#[derive(Debug, Deserialize)]
44struct ToolCallParams {
45    name: String,
46    arguments: Value,
47}
48
49/// MCP Tool Schema
50const MCP_TOOLS: &str = include_str!("../../mcp-server.json");
51
52/// Run the MCP server
53/// This is the main entry point for MCP server mode
54pub async fn run() -> io::Result<()> {
55    run_server().await
56}
57
58async fn run_server() -> io::Result<()> {
59    let stdin = io::stdin();
60    let mut stdout = io::stdout();
61    let reader = stdin.lock();
62
63    for line in reader.lines() {
64        let line = line?;
65        if line.trim().is_empty() {
66            continue;
67        }
68
69        let response = match serde_json::from_str::<JsonRpcRequest>(&line) {
70            Ok(request) => {
71                // Handle notifications (no id = no response needed)
72                if request.id.is_none() {
73                    handle_notification(&request).await;
74                    continue; // Skip sending response for notifications
75                }
76                handle_request(request).await
77            },
78            Err(e) => JsonRpcResponse {
79                jsonrpc: "2.0".to_string(),
80                id: None,
81                result: None,
82                error: Some(JsonRpcError {
83                    code: -32700,
84                    message: format!("Parse error: {}", e),
85                }),
86            },
87        };
88
89        let response_json = serde_json::to_string(&response)?;
90        writeln!(stdout, "{}", response_json)?;
91        stdout.flush()?;
92    }
93
94    Ok(())
95}
96
97async fn handle_notification(request: &JsonRpcRequest) {
98    // Handle MCP notifications (no response required)
99    match request.method.as_str() {
100        "initialized" => {
101            eprintln!("✓ MCP client initialized");
102        },
103        "notifications/cancelled" => {
104            eprintln!("⚠ Request cancelled");
105        },
106        _ => {
107            eprintln!("⚠ Unknown notification: {}", request.method);
108        },
109    }
110}
111
112async fn handle_request(request: JsonRpcRequest) -> JsonRpcResponse {
113    // Validate JSON-RPC version
114    if request.jsonrpc != "2.0" {
115        return JsonRpcResponse {
116            jsonrpc: "2.0".to_string(),
117            id: request.id,
118            result: None,
119            error: Some(JsonRpcError {
120                code: -32600,
121                message: format!("Invalid JSON-RPC version: {}", request.jsonrpc),
122            }),
123        };
124    }
125
126    let result = match request.method.as_str() {
127        "initialize" => handle_initialize(request.params),
128        "ping" => Ok(json!({})), // Ping response for connection keep-alive
129        "tools/list" => handle_tools_list(),
130        "tools/call" => handle_tool_call(request.params).await,
131        _ => Err(format!("Method not found: {}", request.method)),
132    };
133
134    match result {
135        Ok(value) => JsonRpcResponse {
136            jsonrpc: "2.0".to_string(),
137            id: request.id,
138            result: Some(value),
139            error: None,
140        },
141        Err(message) => JsonRpcResponse {
142            jsonrpc: "2.0".to_string(),
143            id: request.id,
144            result: None,
145            error: Some(JsonRpcError {
146                code: -32000,
147                message,
148            }),
149        },
150    }
151}
152
153fn handle_initialize(_params: Option<Value>) -> Result<Value, String> {
154    // MCP initialize handshake
155    // Return server capabilities and info per MCP specification
156    Ok(json!({
157        "protocolVersion": "2024-11-05",
158        "capabilities": {
159            "tools": {
160                "listChanged": false  // Static tool list, no dynamic changes
161            }
162        },
163        "serverInfo": {
164            "name": "intent-engine",
165            "version": env!("CARGO_PKG_VERSION")
166        }
167    }))
168}
169
170fn handle_tools_list() -> Result<Value, String> {
171    let config: Value = serde_json::from_str(MCP_TOOLS)
172        .map_err(|e| format!("Failed to parse MCP tools schema: {}", e))?;
173
174    Ok(json!({
175        "tools": config.get("tools").unwrap_or(&json!([]))
176    }))
177}
178
179async fn handle_tool_call(params: Option<Value>) -> Result<Value, String> {
180    let params: ToolCallParams = serde_json::from_value(params.unwrap_or(json!({})))
181        .map_err(|e| format!("Invalid tool call parameters: {}", e))?;
182
183    let result = match params.name.as_str() {
184        "task_add" => handle_task_add(params.arguments).await,
185        "task_start" => handle_task_start(params.arguments).await,
186        "task_pick_next" => handle_task_pick_next(params.arguments).await,
187        "task_spawn_subtask" => handle_task_spawn_subtask(params.arguments).await,
188        "task_switch" => handle_task_switch(params.arguments).await,
189        "task_done" => handle_task_done(params.arguments).await,
190        "task_update" => handle_task_update(params.arguments).await,
191        "task_find" => handle_task_find(params.arguments).await,
192        "task_search" => handle_task_search(params.arguments).await,
193        "task_get" => handle_task_get(params.arguments).await,
194        "task_context" => handle_task_context(params.arguments).await,
195        "task_delete" => handle_task_delete(params.arguments).await,
196        "event_add" => handle_event_add(params.arguments).await,
197        "event_list" => handle_event_list(params.arguments).await,
198        "current_task_get" => handle_current_task_get(params.arguments).await,
199        "report_generate" => handle_report_generate(params.arguments).await,
200        _ => Err(format!("Unknown tool: {}", params.name)),
201    }?;
202
203    Ok(json!({
204        "content": [{
205            "type": "text",
206            "text": serde_json::to_string_pretty(&result)
207                .unwrap_or_else(|_| "{}".to_string())
208        }]
209    }))
210}
211
212// Tool Handlers
213
214async fn handle_task_add(args: Value) -> Result<Value, String> {
215    let name = args
216        .get("name")
217        .and_then(|v| v.as_str())
218        .ok_or("Missing required parameter: name")?;
219
220    let spec = args.get("spec").and_then(|v| v.as_str());
221    let parent_id = args.get("parent_id").and_then(|v| v.as_i64());
222
223    let ctx = ProjectContext::load_or_init()
224        .await
225        .map_err(|e| format!("Failed to load project context: {}", e))?;
226
227    let task_mgr = TaskManager::new(&ctx.pool);
228    let task = task_mgr
229        .add_task(name, spec, parent_id)
230        .await
231        .map_err(|e| format!("Failed to add task: {}", e))?;
232
233    serde_json::to_value(&task).map_err(|e| format!("Serialization error: {}", e))
234}
235
236async fn handle_task_start(args: Value) -> Result<Value, String> {
237    let task_id = args
238        .get("task_id")
239        .and_then(|v| v.as_i64())
240        .ok_or("Missing required parameter: task_id")?;
241
242    let with_events = args
243        .get("with_events")
244        .and_then(|v| v.as_bool())
245        .unwrap_or(true);
246
247    let ctx = ProjectContext::load_or_init()
248        .await
249        .map_err(|e| format!("Failed to load project context: {}", e))?;
250
251    let task_mgr = TaskManager::new(&ctx.pool);
252    let task = task_mgr
253        .start_task(task_id, with_events)
254        .await
255        .map_err(|e| format!("Failed to start task: {}", e))?;
256
257    serde_json::to_value(&task).map_err(|e| format!("Serialization error: {}", e))
258}
259
260async fn handle_task_pick_next(args: Value) -> Result<Value, String> {
261    let _max_count = args.get("max_count").and_then(|v| v.as_i64());
262    let _capacity = args.get("capacity").and_then(|v| v.as_i64());
263
264    let ctx = ProjectContext::load_or_init()
265        .await
266        .map_err(|e| format!("Failed to load project context: {}", e))?;
267
268    let task_mgr = TaskManager::new(&ctx.pool);
269    let response = task_mgr
270        .pick_next()
271        .await
272        .map_err(|e| format!("Failed to pick next task: {}", e))?;
273
274    serde_json::to_value(&response).map_err(|e| format!("Serialization error: {}", e))
275}
276
277async fn handle_task_spawn_subtask(args: Value) -> Result<Value, String> {
278    let name = args
279        .get("name")
280        .and_then(|v| v.as_str())
281        .ok_or("Missing required parameter: name")?;
282
283    let spec = args.get("spec").and_then(|v| v.as_str());
284
285    let ctx = ProjectContext::load_or_init()
286        .await
287        .map_err(|e| format!("Failed to load project context: {}", e))?;
288
289    let task_mgr = TaskManager::new(&ctx.pool);
290    let subtask = task_mgr
291        .spawn_subtask(name, spec)
292        .await
293        .map_err(|e| format!("Failed to spawn subtask: {}", e))?;
294
295    serde_json::to_value(&subtask).map_err(|e| format!("Serialization error: {}", e))
296}
297
298async fn handle_task_switch(args: Value) -> Result<Value, String> {
299    let task_id = args
300        .get("task_id")
301        .and_then(|v| v.as_i64())
302        .ok_or("Missing required parameter: task_id")?;
303
304    let ctx = ProjectContext::load_or_init()
305        .await
306        .map_err(|e| format!("Failed to load project context: {}", e))?;
307
308    let task_mgr = TaskManager::new(&ctx.pool);
309    let task = task_mgr
310        .switch_to_task(task_id)
311        .await
312        .map_err(|e| format!("Failed to switch task: {}", e))?;
313
314    serde_json::to_value(&task).map_err(|e| format!("Serialization error: {}", e))
315}
316
317async fn handle_task_done(args: Value) -> Result<Value, String> {
318    let task_id = args.get("task_id").and_then(|v| v.as_i64());
319
320    let ctx = ProjectContext::load_or_init()
321        .await
322        .map_err(|e| format!("Failed to load project context: {}", e))?;
323
324    let task_mgr = TaskManager::new(&ctx.pool);
325
326    // If task_id is provided, set it as current first
327    if let Some(id) = task_id {
328        let workspace_mgr = WorkspaceManager::new(&ctx.pool);
329        workspace_mgr
330            .set_current_task(id)
331            .await
332            .map_err(|e| format!("Failed to set current task: {}", e))?;
333    }
334
335    let task = task_mgr
336        .done_task()
337        .await
338        .map_err(|e| format!("Failed to mark task as done: {}", e))?;
339
340    serde_json::to_value(&task).map_err(|e| format!("Serialization error: {}", e))
341}
342
343async fn handle_task_update(args: Value) -> Result<Value, String> {
344    let task_id = args
345        .get("task_id")
346        .and_then(|v| v.as_i64())
347        .ok_or("Missing required parameter: task_id")?;
348
349    let name = args.get("name").and_then(|v| v.as_str());
350    let spec = args.get("spec").and_then(|v| v.as_str());
351    let status = args.get("status").and_then(|v| v.as_str());
352    let complexity = args
353        .get("complexity")
354        .and_then(|v| v.as_i64())
355        .map(|v| v as i32);
356    let priority = args
357        .get("priority")
358        .and_then(|v| v.as_i64())
359        .map(|v| v as i32);
360    let parent_id = args.get("parent_id").and_then(|v| v.as_i64()).map(Some);
361
362    let ctx = ProjectContext::load_or_init()
363        .await
364        .map_err(|e| format!("Failed to load project context: {}", e))?;
365
366    let task_mgr = TaskManager::new(&ctx.pool);
367    let task = task_mgr
368        .update_task(task_id, name, spec, parent_id, status, complexity, priority)
369        .await
370        .map_err(|e| format!("Failed to update task: {}", e))?;
371
372    serde_json::to_value(&task).map_err(|e| format!("Serialization error: {}", e))
373}
374
375async fn handle_task_find(args: Value) -> Result<Value, String> {
376    let status = args.get("status").and_then(|v| v.as_str());
377    let parent = args.get("parent").and_then(|v| v.as_str());
378
379    let parent_opt = parent.map(|p| {
380        if p == "null" {
381            None
382        } else {
383            p.parse::<i64>().ok()
384        }
385    });
386
387    let ctx = ProjectContext::load()
388        .await
389        .map_err(|e| format!("Failed to load project context: {}", e))?;
390
391    let task_mgr = TaskManager::new(&ctx.pool);
392    let tasks = task_mgr
393        .find_tasks(status, parent_opt)
394        .await
395        .map_err(|e| format!("Failed to find tasks: {}", e))?;
396
397    serde_json::to_value(&tasks).map_err(|e| format!("Serialization error: {}", e))
398}
399
400async fn handle_task_search(args: Value) -> Result<Value, String> {
401    let query = args
402        .get("query")
403        .and_then(|v| v.as_str())
404        .ok_or("Missing required parameter: query")?;
405
406    let ctx = ProjectContext::load()
407        .await
408        .map_err(|e| format!("Failed to load project context: {}", e))?;
409
410    let task_mgr = TaskManager::new(&ctx.pool);
411    let results = task_mgr
412        .search_tasks(query)
413        .await
414        .map_err(|e| format!("Failed to search tasks: {}", e))?;
415
416    serde_json::to_value(&results).map_err(|e| format!("Serialization error: {}", e))
417}
418
419async fn handle_task_get(args: Value) -> Result<Value, String> {
420    let task_id = args
421        .get("task_id")
422        .and_then(|v| v.as_i64())
423        .ok_or("Missing required parameter: task_id")?;
424
425    let with_events = args
426        .get("with_events")
427        .and_then(|v| v.as_bool())
428        .unwrap_or(false);
429
430    let ctx = ProjectContext::load()
431        .await
432        .map_err(|e| format!("Failed to load project context: {}", e))?;
433
434    let task_mgr = TaskManager::new(&ctx.pool);
435
436    if with_events {
437        let task = task_mgr
438            .get_task_with_events(task_id)
439            .await
440            .map_err(|e| format!("Failed to get task: {}", e))?;
441        serde_json::to_value(&task).map_err(|e| format!("Serialization error: {}", e))
442    } else {
443        let task = task_mgr
444            .get_task(task_id)
445            .await
446            .map_err(|e| format!("Failed to get task: {}", e))?;
447        serde_json::to_value(&task).map_err(|e| format!("Serialization error: {}", e))
448    }
449}
450
451async fn handle_task_context(args: Value) -> Result<Value, String> {
452    // Get task_id from args, or fall back to current task
453    let task_id = if let Some(id) = args.get("task_id").and_then(|v| v.as_i64()) {
454        id
455    } else {
456        // Fall back to current_task_id if no task_id provided
457        let ctx = ProjectContext::load()
458            .await
459            .map_err(|e| format!("Failed to load project context: {}", e))?;
460
461        let current_task_id: Option<String> =
462            sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'current_task_id'")
463                .fetch_optional(&ctx.pool)
464                .await
465                .map_err(|e| format!("Database error: {}", e))?;
466
467        current_task_id
468            .and_then(|s| s.parse::<i64>().ok())
469            .ok_or_else(|| {
470                "No current task is set and task_id was not provided. \
471                 Use task_start or task_switch to set a task first, or provide task_id parameter."
472                    .to_string()
473            })?
474    };
475
476    let ctx = ProjectContext::load()
477        .await
478        .map_err(|e| format!("Failed to load project context: {}", e))?;
479
480    let task_mgr = TaskManager::new(&ctx.pool);
481    let context = task_mgr
482        .get_task_context(task_id)
483        .await
484        .map_err(|e| format!("Failed to get task context: {}", e))?;
485
486    serde_json::to_value(&context).map_err(|e| format!("Serialization error: {}", e))
487}
488
489async fn handle_task_delete(args: Value) -> Result<Value, String> {
490    let task_id = args
491        .get("task_id")
492        .and_then(|v| v.as_i64())
493        .ok_or("Missing required parameter: task_id")?;
494
495    let ctx = ProjectContext::load()
496        .await
497        .map_err(|e| format!("Failed to load project context: {}", e))?;
498
499    let task_mgr = TaskManager::new(&ctx.pool);
500    task_mgr
501        .delete_task(task_id)
502        .await
503        .map_err(|e| format!("Failed to delete task: {}", e))?;
504
505    Ok(json!({"success": true, "deleted_task_id": task_id}))
506}
507
508async fn handle_event_add(args: Value) -> Result<Value, String> {
509    let task_id = args.get("task_id").and_then(|v| v.as_i64());
510
511    let event_type = args
512        .get("event_type")
513        .and_then(|v| v.as_str())
514        .ok_or("Missing required parameter: event_type")?;
515
516    let data = args
517        .get("data")
518        .and_then(|v| v.as_str())
519        .ok_or("Missing required parameter: data")?;
520
521    let ctx = ProjectContext::load_or_init()
522        .await
523        .map_err(|e| format!("Failed to load project context: {}", e))?;
524
525    // Determine the target task ID
526    let target_task_id = if let Some(id) = task_id {
527        id
528    } else {
529        // Fall back to current_task_id
530        let current_task_id: Option<String> =
531            sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'current_task_id'")
532                .fetch_optional(&ctx.pool)
533                .await
534                .map_err(|e| format!("Database error: {}", e))?;
535
536        current_task_id
537            .and_then(|s| s.parse::<i64>().ok())
538            .ok_or_else(|| {
539                "No current task is set and task_id was not provided. \
540                 Use task_start or task_switch to set a task first."
541                    .to_string()
542            })?
543    };
544
545    let event_mgr = EventManager::new(&ctx.pool);
546    let event = event_mgr
547        .add_event(target_task_id, event_type, data)
548        .await
549        .map_err(|e| format!("Failed to add event: {}", e))?;
550
551    serde_json::to_value(&event).map_err(|e| format!("Serialization error: {}", e))
552}
553
554async fn handle_event_list(args: Value) -> Result<Value, String> {
555    let task_id = args
556        .get("task_id")
557        .and_then(|v| v.as_i64())
558        .ok_or("Missing required parameter: task_id")?;
559
560    let limit = args.get("limit").and_then(|v| v.as_i64());
561
562    let ctx = ProjectContext::load()
563        .await
564        .map_err(|e| format!("Failed to load project context: {}", e))?;
565
566    let event_mgr = EventManager::new(&ctx.pool);
567    let events = event_mgr
568        .list_events(task_id, limit)
569        .await
570        .map_err(|e| format!("Failed to list events: {}", e))?;
571
572    serde_json::to_value(&events).map_err(|e| format!("Serialization error: {}", e))
573}
574
575async fn handle_current_task_get(_args: Value) -> Result<Value, String> {
576    let ctx = ProjectContext::load()
577        .await
578        .map_err(|e| format!("Failed to load project context: {}", e))?;
579
580    let workspace_mgr = WorkspaceManager::new(&ctx.pool);
581    let response = workspace_mgr
582        .get_current_task()
583        .await
584        .map_err(|e| format!("Failed to get current task: {}", e))?;
585
586    serde_json::to_value(&response).map_err(|e| format!("Serialization error: {}", e))
587}
588
589async fn handle_report_generate(args: Value) -> Result<Value, String> {
590    let since = args.get("since").and_then(|v| v.as_str()).map(String::from);
591    let status = args
592        .get("status")
593        .and_then(|v| v.as_str())
594        .map(String::from);
595    let filter_name = args
596        .get("filter_name")
597        .and_then(|v| v.as_str())
598        .map(String::from);
599    let filter_spec = args
600        .get("filter_spec")
601        .and_then(|v| v.as_str())
602        .map(String::from);
603    let summary_only = args
604        .get("summary_only")
605        .and_then(|v| v.as_bool())
606        .unwrap_or(true);
607
608    let ctx = ProjectContext::load()
609        .await
610        .map_err(|e| format!("Failed to load project context: {}", e))?;
611
612    let report_mgr = ReportManager::new(&ctx.pool);
613    let report = report_mgr
614        .generate_report(since, status, filter_name, filter_spec, summary_only)
615        .await
616        .map_err(|e| format!("Failed to generate report: {}", e))?;
617
618    serde_json::to_value(&report).map_err(|e| format!("Serialization error: {}", e))
619}