1use crate::events::EventManager;
11use crate::project::ProjectContext;
12use crate::report::ReportManager;
13use crate::tasks::TaskManager;
14use crate::workspace::WorkspaceManager;
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value};
17use std::io::{self, BufRead, Write};
18
19#[derive(Debug, Deserialize)]
20struct JsonRpcRequest {
21 jsonrpc: String,
22 id: Option<Value>,
23 method: String,
24 params: Option<Value>,
25}
26
27#[derive(Debug, Serialize)]
28struct JsonRpcResponse {
29 jsonrpc: String,
30 id: Option<Value>,
31 #[serde(skip_serializing_if = "Option::is_none")]
32 result: Option<Value>,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 error: Option<JsonRpcError>,
35}
36
37#[derive(Debug, Serialize)]
38struct JsonRpcError {
39 code: i32,
40 message: String,
41}
42
43#[derive(Debug, Deserialize)]
44struct ToolCallParams {
45 name: String,
46 arguments: Value,
47}
48
49const MCP_TOOLS: &str = include_str!("../../mcp-server.json");
51
52pub async fn run() -> io::Result<()> {
55 let ctx = ProjectContext::load_or_init()
57 .await
58 .map_err(|e| io::Error::other(e.to_string()))?;
59
60 if let Err(e) = register_mcp_connection(&ctx.root) {
62 eprintln!("⚠ Failed to register MCP connection: {}", e);
63 }
64
65 let project_path = ctx.root.clone();
67 let heartbeat_handle = tokio::spawn(async move {
68 heartbeat_task(project_path).await;
69 });
70
71 let result = run_server().await;
73
74 if let Err(e) = unregister_mcp_connection(&ctx.root) {
76 eprintln!("⚠ Failed to unregister MCP connection: {}", e);
77 }
78
79 heartbeat_handle.abort();
81
82 result
83}
84
85async fn run_server() -> io::Result<()> {
86 let stdin = io::stdin();
87 let mut stdout = io::stdout();
88 let reader = stdin.lock();
89
90 for line in reader.lines() {
91 let line = line?;
92 if line.trim().is_empty() {
93 continue;
94 }
95
96 let response = match serde_json::from_str::<JsonRpcRequest>(&line) {
97 Ok(request) => {
98 if request.id.is_none() {
100 handle_notification(&request).await;
101 continue; }
103 handle_request(request).await
104 },
105 Err(e) => JsonRpcResponse {
106 jsonrpc: "2.0".to_string(),
107 id: None,
108 result: None,
109 error: Some(JsonRpcError {
110 code: -32700,
111 message: format!("Parse error: {}", e),
112 }),
113 },
114 };
115
116 let response_json = serde_json::to_string(&response)?;
117 writeln!(stdout, "{}", response_json)?;
118 stdout.flush()?;
119 }
120
121 Ok(())
122}
123
124async fn handle_notification(request: &JsonRpcRequest) {
125 match request.method.as_str() {
127 "initialized" => {
128 eprintln!("✓ MCP client initialized");
129 },
130 "notifications/cancelled" => {
131 eprintln!("⚠ Request cancelled");
132 },
133 _ => {
134 eprintln!("⚠ Unknown notification: {}", request.method);
135 },
136 }
137}
138
139async fn handle_request(request: JsonRpcRequest) -> JsonRpcResponse {
140 if request.jsonrpc != "2.0" {
142 return JsonRpcResponse {
143 jsonrpc: "2.0".to_string(),
144 id: request.id,
145 result: None,
146 error: Some(JsonRpcError {
147 code: -32600,
148 message: format!("Invalid JSON-RPC version: {}", request.jsonrpc),
149 }),
150 };
151 }
152
153 let result = match request.method.as_str() {
154 "initialize" => handle_initialize(request.params),
155 "ping" => Ok(json!({})), "tools/list" => handle_tools_list(),
157 "tools/call" => handle_tool_call(request.params).await,
158 _ => Err(format!("Method not found: {}", request.method)),
159 };
160
161 match result {
162 Ok(value) => JsonRpcResponse {
163 jsonrpc: "2.0".to_string(),
164 id: request.id,
165 result: Some(value),
166 error: None,
167 },
168 Err(message) => JsonRpcResponse {
169 jsonrpc: "2.0".to_string(),
170 id: request.id,
171 result: None,
172 error: Some(JsonRpcError {
173 code: -32000,
174 message,
175 }),
176 },
177 }
178}
179
180fn handle_initialize(_params: Option<Value>) -> Result<Value, String> {
181 Ok(json!({
184 "protocolVersion": "2024-11-05",
185 "capabilities": {
186 "tools": {
187 "listChanged": false }
189 },
190 "serverInfo": {
191 "name": "intent-engine",
192 "version": env!("CARGO_PKG_VERSION")
193 }
194 }))
195}
196
197fn handle_tools_list() -> Result<Value, String> {
198 let config: Value = serde_json::from_str(MCP_TOOLS)
199 .map_err(|e| format!("Failed to parse MCP tools schema: {}", e))?;
200
201 Ok(json!({
202 "tools": config.get("tools").unwrap_or(&json!([]))
203 }))
204}
205
206async fn handle_tool_call(params: Option<Value>) -> Result<Value, String> {
207 let params: ToolCallParams = serde_json::from_value(params.unwrap_or(json!({})))
208 .map_err(|e| format!("Invalid tool call parameters: {}", e))?;
209
210 let result = match params.name.as_str() {
211 "task_add" => handle_task_add(params.arguments).await,
212 "task_add_dependency" => handle_task_add_dependency(params.arguments).await,
213 "task_start" => handle_task_start(params.arguments).await,
214 "task_pick_next" => handle_task_pick_next(params.arguments).await,
215 "task_spawn_subtask" => handle_task_spawn_subtask(params.arguments).await,
216 "task_switch" => handle_task_switch(params.arguments).await,
217 "task_done" => handle_task_done(params.arguments).await,
218 "task_update" => handle_task_update(params.arguments).await,
219 "task_list" => handle_task_list(params.arguments).await,
220 "task_get" => handle_task_get(params.arguments).await,
221 "task_context" => handle_task_context(params.arguments).await,
222 "task_delete" => handle_task_delete(params.arguments).await,
223 "event_add" => handle_event_add(params.arguments).await,
224 "event_list" => handle_event_list(params.arguments).await,
225 "unified_search" => handle_unified_search(params.arguments).await,
226 "current_task_get" => handle_current_task_get(params.arguments).await,
227 "report_generate" => handle_report_generate(params.arguments).await,
228 _ => Err(format!("Unknown tool: {}", params.name)),
229 }?;
230
231 Ok(json!({
232 "content": [{
233 "type": "text",
234 "text": serde_json::to_string_pretty(&result)
235 .unwrap_or_else(|_| "{}".to_string())
236 }]
237 }))
238}
239
240async fn handle_task_add(args: Value) -> Result<Value, String> {
243 let name = match args.get("name") {
245 None => return Err("Missing required parameter: name".to_string()),
246 Some(value) => {
247 if value.is_null() {
248 return Err("Parameter 'name' cannot be null".to_string());
249 }
250 match value.as_str() {
251 Some(s) if s.trim().is_empty() => {
252 return Err("Parameter 'name' cannot be empty".to_string());
253 },
254 Some(s) => s,
255 None => return Err(format!("Parameter 'name' must be a string, got: {}", value)),
256 }
257 },
258 };
259
260 let spec = args.get("spec").and_then(|v| v.as_str());
261 let parent_id = args.get("parent_id").and_then(|v| v.as_i64());
262
263 let ctx = ProjectContext::load_or_init()
264 .await
265 .map_err(|e| format!("Failed to load project context: {}", e))?;
266
267 let task_mgr = TaskManager::new(&ctx.pool);
268 let task = task_mgr
269 .add_task(name, spec, parent_id)
270 .await
271 .map_err(|e| format!("Failed to add task: {}", e))?;
272
273 serde_json::to_value(&task).map_err(|e| format!("Serialization error: {}", e))
274}
275
276async fn handle_task_add_dependency(args: Value) -> Result<Value, String> {
277 let blocked_task_id = args
278 .get("blocked_task_id")
279 .and_then(|v| v.as_i64())
280 .ok_or("Missing required parameter: blocked_task_id")?;
281
282 let blocking_task_id = args
283 .get("blocking_task_id")
284 .and_then(|v| v.as_i64())
285 .ok_or("Missing required parameter: blocking_task_id")?;
286
287 let ctx = ProjectContext::load_or_init()
288 .await
289 .map_err(|e| format!("Failed to load project context: {}", e))?;
290
291 let dependency =
292 crate::dependencies::add_dependency(&ctx.pool, blocking_task_id, blocked_task_id)
293 .await
294 .map_err(|e| format!("Failed to add dependency: {}", e))?;
295
296 serde_json::to_value(&dependency).map_err(|e| format!("Serialization error: {}", e))
297}
298
299async fn handle_task_start(args: Value) -> Result<Value, String> {
300 let task_id = args
301 .get("task_id")
302 .and_then(|v| v.as_i64())
303 .ok_or("Missing required parameter: task_id")?;
304
305 let with_events = args
306 .get("with_events")
307 .and_then(|v| v.as_bool())
308 .unwrap_or(true);
309
310 let ctx = ProjectContext::load_or_init()
311 .await
312 .map_err(|e| format!("Failed to load project context: {}", e))?;
313
314 let task_mgr = TaskManager::new(&ctx.pool);
315 let task = task_mgr
316 .start_task(task_id, with_events)
317 .await
318 .map_err(|e| format!("Failed to start task: {}", e))?;
319
320 serde_json::to_value(&task).map_err(|e| format!("Serialization error: {}", e))
321}
322
323async fn handle_task_pick_next(args: Value) -> Result<Value, String> {
324 let _max_count = args.get("max_count").and_then(|v| v.as_i64());
325 let _capacity = args.get("capacity").and_then(|v| v.as_i64());
326
327 let ctx = ProjectContext::load_or_init()
328 .await
329 .map_err(|e| format!("Failed to load project context: {}", e))?;
330
331 let task_mgr = TaskManager::new(&ctx.pool);
332 let response = task_mgr
333 .pick_next()
334 .await
335 .map_err(|e| format!("Failed to pick next task: {}", e))?;
336
337 serde_json::to_value(&response).map_err(|e| format!("Serialization error: {}", e))
338}
339
340async fn handle_task_spawn_subtask(args: Value) -> Result<Value, String> {
341 let name = args
342 .get("name")
343 .and_then(|v| v.as_str())
344 .ok_or("Missing required parameter: name")?;
345
346 let spec = args.get("spec").and_then(|v| v.as_str());
347
348 let ctx = ProjectContext::load_or_init()
349 .await
350 .map_err(|e| format!("Failed to load project context: {}", e))?;
351
352 let task_mgr = TaskManager::new(&ctx.pool);
353 let subtask = task_mgr
354 .spawn_subtask(name, spec)
355 .await
356 .map_err(|e| format!("Failed to spawn subtask: {}", e))?;
357
358 serde_json::to_value(&subtask).map_err(|e| format!("Serialization error: {}", e))
359}
360
361async fn handle_task_switch(args: Value) -> Result<Value, String> {
362 let task_id = args
363 .get("task_id")
364 .and_then(|v| v.as_i64())
365 .ok_or("Missing required parameter: task_id")?;
366
367 let ctx = ProjectContext::load_or_init()
368 .await
369 .map_err(|e| format!("Failed to load project context: {}", e))?;
370
371 let task_mgr = TaskManager::new(&ctx.pool);
372 let task = task_mgr
373 .switch_to_task(task_id)
374 .await
375 .map_err(|e| format!("Failed to switch task: {}", e))?;
376
377 serde_json::to_value(&task).map_err(|e| format!("Serialization error: {}", e))
378}
379
380async fn handle_task_done(args: Value) -> Result<Value, String> {
381 let task_id = args.get("task_id").and_then(|v| v.as_i64());
382
383 let ctx = ProjectContext::load_or_init()
384 .await
385 .map_err(|e| format!("Failed to load project context: {}", e))?;
386
387 let task_mgr = TaskManager::new(&ctx.pool);
388
389 if let Some(id) = task_id {
391 let workspace_mgr = WorkspaceManager::new(&ctx.pool);
392 workspace_mgr
393 .set_current_task(id)
394 .await
395 .map_err(|e| format!("Failed to set current task: {}", e))?;
396 }
397
398 let task = task_mgr
399 .done_task()
400 .await
401 .map_err(|e| format!("Failed to mark task as done: {}", e))?;
402
403 serde_json::to_value(&task).map_err(|e| format!("Serialization error: {}", e))
404}
405
406async fn handle_task_update(args: Value) -> Result<Value, String> {
407 let task_id = args
408 .get("task_id")
409 .and_then(|v| v.as_i64())
410 .ok_or("Missing required parameter: task_id")?;
411
412 let name = args.get("name").and_then(|v| v.as_str());
413 let spec = args.get("spec").and_then(|v| v.as_str());
414 let status = args.get("status").and_then(|v| v.as_str());
415 let complexity = args
416 .get("complexity")
417 .and_then(|v| v.as_i64())
418 .map(|v| v as i32);
419 let priority = match args.get("priority").and_then(|v| v.as_str()) {
420 Some(p) => Some(
421 crate::priority::PriorityLevel::parse_to_int(p)
422 .map_err(|e| format!("Invalid priority: {}", e))?,
423 ),
424 None => None,
425 };
426 let parent_id = args.get("parent_id").and_then(|v| v.as_i64()).map(Some);
427
428 let ctx = ProjectContext::load_or_init()
429 .await
430 .map_err(|e| format!("Failed to load project context: {}", e))?;
431
432 let task_mgr = TaskManager::new(&ctx.pool);
433 let task = task_mgr
434 .update_task(task_id, name, spec, parent_id, status, complexity, priority)
435 .await
436 .map_err(|e| format!("Failed to update task: {}", e))?;
437
438 serde_json::to_value(&task).map_err(|e| format!("Serialization error: {}", e))
439}
440
441async fn handle_task_list(args: Value) -> Result<Value, String> {
442 let status = args.get("status").and_then(|v| v.as_str());
443 let parent = args.get("parent").and_then(|v| v.as_str());
444
445 let parent_opt = parent.map(|p| {
446 if p == "null" {
447 None
448 } else {
449 p.parse::<i64>().ok()
450 }
451 });
452
453 let ctx = ProjectContext::load()
454 .await
455 .map_err(|e| format!("Failed to load project context: {}", e))?;
456
457 let task_mgr = TaskManager::new(&ctx.pool);
458 let tasks = task_mgr
459 .find_tasks(status, parent_opt)
460 .await
461 .map_err(|e| format!("Failed to list tasks: {}", e))?;
462
463 serde_json::to_value(&tasks).map_err(|e| format!("Serialization error: {}", e))
464}
465
466async fn handle_task_get(args: Value) -> Result<Value, String> {
467 let task_id = args
468 .get("task_id")
469 .and_then(|v| v.as_i64())
470 .ok_or("Missing required parameter: task_id")?;
471
472 let with_events = args
473 .get("with_events")
474 .and_then(|v| v.as_bool())
475 .unwrap_or(false);
476
477 let ctx = ProjectContext::load()
478 .await
479 .map_err(|e| format!("Failed to load project context: {}", e))?;
480
481 let task_mgr = TaskManager::new(&ctx.pool);
482
483 if with_events {
484 let task = task_mgr
485 .get_task_with_events(task_id)
486 .await
487 .map_err(|e| format!("Failed to get task: {}", e))?;
488 serde_json::to_value(&task).map_err(|e| format!("Serialization error: {}", e))
489 } else {
490 let task = task_mgr
491 .get_task(task_id)
492 .await
493 .map_err(|e| format!("Failed to get task: {}", e))?;
494 serde_json::to_value(&task).map_err(|e| format!("Serialization error: {}", e))
495 }
496}
497
498async fn handle_task_context(args: Value) -> Result<Value, String> {
499 let task_id = if let Some(id) = args.get("task_id").and_then(|v| v.as_i64()) {
501 id
502 } else {
503 let ctx = ProjectContext::load()
505 .await
506 .map_err(|e| format!("Failed to load project context: {}", e))?;
507
508 let current_task_id: Option<String> =
509 sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'current_task_id'")
510 .fetch_optional(&ctx.pool)
511 .await
512 .map_err(|e| format!("Database error: {}", e))?;
513
514 current_task_id
515 .and_then(|s| s.parse::<i64>().ok())
516 .ok_or_else(|| {
517 "No current task is set and task_id was not provided. \
518 Use task_start or task_switch to set a task first, or provide task_id parameter."
519 .to_string()
520 })?
521 };
522
523 let ctx = ProjectContext::load()
524 .await
525 .map_err(|e| format!("Failed to load project context: {}", e))?;
526
527 let task_mgr = TaskManager::new(&ctx.pool);
528 let context = task_mgr
529 .get_task_context(task_id)
530 .await
531 .map_err(|e| format!("Failed to get task context: {}", e))?;
532
533 serde_json::to_value(&context).map_err(|e| format!("Serialization error: {}", e))
534}
535
536async fn handle_task_delete(args: Value) -> Result<Value, String> {
537 let task_id = args
538 .get("task_id")
539 .and_then(|v| v.as_i64())
540 .ok_or("Missing required parameter: task_id")?;
541
542 let ctx = ProjectContext::load()
543 .await
544 .map_err(|e| format!("Failed to load project context: {}", e))?;
545
546 let task_mgr = TaskManager::new(&ctx.pool);
547 task_mgr
548 .delete_task(task_id)
549 .await
550 .map_err(|e| format!("Failed to delete task: {}", e))?;
551
552 Ok(json!({"success": true, "deleted_task_id": task_id}))
553}
554
555async fn handle_event_add(args: Value) -> Result<Value, String> {
556 let task_id = args.get("task_id").and_then(|v| v.as_i64());
557
558 let event_type = args
559 .get("event_type")
560 .and_then(|v| v.as_str())
561 .ok_or("Missing required parameter: event_type")?;
562
563 let data = args
564 .get("data")
565 .and_then(|v| v.as_str())
566 .ok_or("Missing required parameter: data")?;
567
568 let ctx = ProjectContext::load_or_init()
569 .await
570 .map_err(|e| format!("Failed to load project context: {}", e))?;
571
572 let target_task_id = if let Some(id) = task_id {
574 id
575 } else {
576 let current_task_id: Option<String> =
578 sqlx::query_scalar("SELECT value FROM workspace_state WHERE key = 'current_task_id'")
579 .fetch_optional(&ctx.pool)
580 .await
581 .map_err(|e| format!("Database error: {}", e))?;
582
583 current_task_id
584 .and_then(|s| s.parse::<i64>().ok())
585 .ok_or_else(|| {
586 "No current task is set and task_id was not provided. \
587 Use task_start or task_switch to set a task first."
588 .to_string()
589 })?
590 };
591
592 let event_mgr = EventManager::new(&ctx.pool);
593 let event = event_mgr
594 .add_event(target_task_id, event_type, data)
595 .await
596 .map_err(|e| format!("Failed to add event: {}", e))?;
597
598 serde_json::to_value(&event).map_err(|e| format!("Serialization error: {}", e))
599}
600
601async fn handle_event_list(args: Value) -> Result<Value, String> {
602 let task_id = args.get("task_id").and_then(|v| v.as_i64());
603
604 let limit = args.get("limit").and_then(|v| v.as_i64());
605 let log_type = args
606 .get("type")
607 .and_then(|v| v.as_str())
608 .map(|s| s.to_string());
609 let since = args
610 .get("since")
611 .and_then(|v| v.as_str())
612 .map(|s| s.to_string());
613
614 let ctx = ProjectContext::load()
615 .await
616 .map_err(|e| format!("Failed to load project context: {}", e))?;
617
618 let event_mgr = EventManager::new(&ctx.pool);
619 let events = event_mgr
620 .list_events(task_id, limit, log_type, since)
621 .await
622 .map_err(|e| format!("Failed to list events: {}", e))?;
623
624 serde_json::to_value(&events).map_err(|e| format!("Serialization error: {}", e))
625}
626
627async fn handle_unified_search(args: Value) -> Result<Value, String> {
628 use crate::search::SearchManager;
629
630 let query = args
631 .get("query")
632 .and_then(|v| v.as_str())
633 .ok_or("Missing required parameter: query")?;
634
635 let include_tasks = args
636 .get("include_tasks")
637 .and_then(|v| v.as_bool())
638 .unwrap_or(true);
639
640 let include_events = args
641 .get("include_events")
642 .and_then(|v| v.as_bool())
643 .unwrap_or(true);
644
645 let limit = args.get("limit").and_then(|v| v.as_i64());
646
647 let ctx = ProjectContext::load()
648 .await
649 .map_err(|e| format!("Failed to load project context: {}", e))?;
650
651 let search_mgr = SearchManager::new(&ctx.pool);
652 let results = search_mgr
653 .unified_search(query, include_tasks, include_events, limit)
654 .await
655 .map_err(|e| format!("Failed to perform unified search: {}", e))?;
656
657 serde_json::to_value(&results).map_err(|e| format!("Serialization error: {}", e))
658}
659
660async fn handle_current_task_get(_args: Value) -> Result<Value, String> {
661 let ctx = ProjectContext::load()
662 .await
663 .map_err(|e| format!("Failed to load project context: {}", e))?;
664
665 let workspace_mgr = WorkspaceManager::new(&ctx.pool);
666 let response = workspace_mgr
667 .get_current_task()
668 .await
669 .map_err(|e| format!("Failed to get current task: {}", e))?;
670
671 serde_json::to_value(&response).map_err(|e| format!("Serialization error: {}", e))
672}
673
674async fn handle_report_generate(args: Value) -> Result<Value, String> {
675 let since = args.get("since").and_then(|v| v.as_str()).map(String::from);
676 let status = args
677 .get("status")
678 .and_then(|v| v.as_str())
679 .map(String::from);
680 let filter_name = args
681 .get("filter_name")
682 .and_then(|v| v.as_str())
683 .map(String::from);
684 let filter_spec = args
685 .get("filter_spec")
686 .and_then(|v| v.as_str())
687 .map(String::from);
688 let summary_only = args
689 .get("summary_only")
690 .and_then(|v| v.as_bool())
691 .unwrap_or(true);
692
693 let ctx = ProjectContext::load()
694 .await
695 .map_err(|e| format!("Failed to load project context: {}", e))?;
696
697 let report_mgr = ReportManager::new(&ctx.pool);
698 let report = report_mgr
699 .generate_report(since, status, filter_name, filter_spec, summary_only)
700 .await
701 .map_err(|e| format!("Failed to generate report: {}", e))?;
702
703 serde_json::to_value(&report).map_err(|e| format!("Serialization error: {}", e))
704}
705
706fn register_mcp_connection(project_path: &std::path::Path) -> anyhow::Result<()> {
712 use crate::dashboard::registry::ProjectRegistry;
713
714 let mut registry = ProjectRegistry::load()?;
715
716 let agent_name = detect_agent_type();
718
719 registry.register_mcp_connection(&project_path.to_path_buf(), agent_name)?;
720
721 eprintln!(
722 "✓ MCP connection registered for project: {}",
723 project_path.display()
724 );
725
726 Ok(())
727}
728
729fn unregister_mcp_connection(project_path: &std::path::Path) -> anyhow::Result<()> {
731 use crate::dashboard::registry::ProjectRegistry;
732
733 let mut registry = ProjectRegistry::load()?;
734 registry.unregister_mcp_connection(&project_path.to_path_buf())?;
735
736 eprintln!(
737 "✓ MCP connection unregistered for project: {}",
738 project_path.display()
739 );
740
741 Ok(())
742}
743
744async fn heartbeat_task(project_path: std::path::PathBuf) {
746 use crate::dashboard::registry::ProjectRegistry;
747
748 let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
749
750 loop {
751 interval.tick().await;
752
753 if let Ok(mut registry) = ProjectRegistry::load() {
755 if let Err(e) = registry.update_mcp_heartbeat(&project_path) {
756 eprintln!("⚠ Failed to update MCP heartbeat: {}", e);
757 }
758 }
759 }
760}
761
762fn detect_agent_type() -> Option<String> {
764 if std::env::var("CLAUDE_CODE_VERSION").is_ok() {
766 return Some("claude-code".to_string());
767 }
768
769 if std::env::var("CLAUDE_DESKTOP").is_ok() {
771 return Some("claude-desktop".to_string());
772 }
773
774 Some("mcp-client".to_string())
776}
777
778#[cfg(test)]
779#[path = "server_tests.rs"]
780mod tests;