1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
2use std::fmt::Write as _;
3use std::sync::Arc;
4use std::time::Duration;
5
6use async_trait::async_trait;
7use axum::extract::State;
8use axum::http::{HeaderMap, Method, Uri};
9use axum::response::Response;
10use axum::routing::get;
11use axum::{Extension, Router};
12use facet::Facet;
13use moire_trace_types::{BacktraceId, FrameId};
14use moire_types::{
15 BacktraceFrameResolved, BacktraceFrameUnresolved, CutId, EdgeKind, Entity, EntityBody,
16 EntityId, ProcessId, ProcessSnapshotView, SnapshotBacktrace, SnapshotBacktraceFrame,
17 SnapshotCutResponse, TriggerCutResponse,
18};
19use moire_wire::{ServerMessage, encode_server_message_default};
20use rust_mcp_sdk::id_generator::{FastIdGenerator, UuidGenerator};
21use rust_mcp_sdk::macros::{JsonSchema, mcp_tool};
22use rust_mcp_sdk::mcp_http::{GenericBody, McpAppState, McpHttpHandler};
23use rust_mcp_sdk::mcp_server::error::TransportServerError;
24use rust_mcp_sdk::mcp_server::{ServerHandler, ToMcpServerHandler};
25use rust_mcp_sdk::schema::{
26 CallToolError, CallToolRequestParams, CallToolResult, Implementation, InitializeResult,
27 LATEST_PROTOCOL_VERSION, ListToolsResult, PaginatedRequestParams, RpcError, ServerCapabilities,
28 ServerCapabilitiesTools,
29};
30use rust_mcp_sdk::session_store::InMemorySessionStore;
31use rust_mcp_sdk::{TransportOptions, tool_box};
32use serde::{Deserialize, Serialize};
33use serde_json::{Map as JsonMap, Value as JsonValue};
34use tokio::net::TcpListener;
35use tracing::{info, warn};
36
37use crate::api::snapshot::take_snapshot_internal;
38use crate::api::source::lookup_source_text_location_in_db;
39use crate::app::{AppState, CutState, remember_snapshot};
40use crate::db::persist_cut_request;
41use crate::snapshot::table::{
42 is_pending_frame, load_snapshot_backtrace_table, lookup_frame_source_by_raw,
43};
44use crate::symbolication::symbolicate_pending_frames_for_backtraces;
45use crate::util::time::now_nanos;
46use moire_source_context::{cut_source_compact, extract_enclosing_fn, extract_target_statement};
47
48const DEFAULT_MCP_ENDPOINT: &str = "/mcp";
49const DEFAULT_MCP_PING_INTERVAL: Duration = Duration::from_secs(12);
50const DEFAULT_WAIT_CHAIN_MAX_DEPTH: usize = 16;
51const DEFAULT_WAIT_CHAIN_MAX_RESULTS: usize = 200;
52const DEFAULT_SYMBOLICATION_WAIT_TIMEOUT: Duration = Duration::from_secs(3);
53const DEFAULT_SYMBOLICATION_WAIT_TICK: Duration = Duration::from_millis(100);
54const MAX_RENDERED_SOURCE_LINES: usize = 24;
55const SOURCE_FRAMES_PER_ITEM: usize = 3;
56const SYSTEM_CRATES: &[&str] = &[
57 "std",
58 "core",
59 "alloc",
60 "tokio",
61 "tokio_util",
62 "futures",
63 "futures_core",
64 "futures_util",
65 "moire",
66 "moire_trace_capture",
67 "moire_runtime",
68 "moire_tokio",
69];
70
71#[mcp_tool(
72 name = "moire_help",
73 description = "Read this first. Explains deadlock workflow, entity kinds, common hang patterns, and how to use all moire MCP tools effectively."
74)]
75#[derive(Debug, Deserialize, Serialize, JsonSchema)]
76pub struct HelpTool {}
77
78#[mcp_tool(
79 name = "moire_cut_fresh",
80 description = "Trigger a coordinated cut and capture a fresh snapshot. Returns cut_id + snapshot metadata."
81)]
82#[derive(Debug, Deserialize, Serialize, JsonSchema)]
83pub struct CutFreshTool {}
84
85#[mcp_tool(
86 name = "moire_wait_edges",
87 description = "Return waiting_on edges for one snapshot with embedded text/plain source context on nodes."
88)]
89#[derive(Debug, Deserialize, Serialize, JsonSchema)]
90pub struct WaitEdgesTool {
91 #[serde(default)]
92 pub snapshot_id: Option<i64>,
93}
94
95#[mcp_tool(
96 name = "moire_wait_chains",
97 description = "Return precomputed wait chains over waiting_on edges, including cycle detection and embedded source context."
98)]
99#[derive(Debug, Deserialize, Serialize, JsonSchema)]
100pub struct WaitChainsTool {
101 #[serde(default)]
102 pub snapshot_id: Option<i64>,
103 #[serde(default)]
104 pub roots: Option<Vec<String>>,
105 #[serde(default)]
106 pub max_depth: Option<u32>,
107}
108
109#[mcp_tool(
110 name = "moire_deadlock_candidates",
111 description = "Return SCC/cycle-based deadlock candidates with confidence and reason tags."
112)]
113#[derive(Debug, Deserialize, Serialize, JsonSchema)]
114pub struct DeadlockCandidatesTool {
115 #[serde(default)]
116 pub snapshot_id: Option<i64>,
117}
118
119#[mcp_tool(
120 name = "moire_entity",
121 description = "Return one entity with incoming/outgoing wait edges, scopes, and embedded source context."
122)]
123#[derive(Debug, Deserialize, Serialize, JsonSchema)]
124pub struct EntityTool {
125 #[serde(default)]
126 pub snapshot_id: Option<i64>,
127 pub entity_id: String,
128}
129
130#[mcp_tool(
131 name = "moire_channel_state",
132 description = "Return channel-oriented state for one channel entity or all channels, including waiter counts and source context."
133)]
134#[derive(Debug, Deserialize, Serialize, JsonSchema)]
135pub struct ChannelStateTool {
136 #[serde(default)]
137 pub snapshot_id: Option<i64>,
138 #[serde(default)]
139 pub entity_id: Option<String>,
140}
141
142#[mcp_tool(
143 name = "moire_task_state",
144 description = "Return future/task-oriented state, including awaiting target, scopes, and source context."
145)]
146#[derive(Debug, Deserialize, Serialize, JsonSchema)]
147pub struct TaskStateTool {
148 #[serde(default)]
149 pub snapshot_id: Option<i64>,
150 #[serde(default)]
151 pub entity_id: Option<String>,
152}
153
154#[mcp_tool(
155 name = "moire_source_context",
156 description = "Lookup frame source context in text/plain format (statement/enclosing fn/compact scope)."
157)]
158#[derive(Debug, Deserialize, Serialize, JsonSchema)]
159pub struct SourceContextTool {
160 #[serde(default)]
161 pub snapshot_id: Option<i64>,
162 pub frame_ids: Vec<u64>,
163 pub format: String,
164}
165
166#[mcp_tool(
167 name = "moire_backtrace",
168 description = "Expand a backtrace from one snapshot, optionally embedding source context per frame."
169)]
170#[derive(Debug, Deserialize, Serialize, JsonSchema)]
171pub struct BacktraceTool {
172 #[serde(default)]
173 pub snapshot_id: Option<i64>,
174 pub backtrace_id: u64,
175 #[serde(default)]
176 pub with_source: Option<bool>,
177}
178
179#[mcp_tool(
180 name = "moire_diff_snapshots",
181 description = "Return entity/edge/channel/task deltas between two snapshot ids."
182)]
183#[derive(Debug, Deserialize, Serialize, JsonSchema)]
184pub struct DiffSnapshotsTool {
185 pub from_snapshot_id: i64,
186 pub to_snapshot_id: i64,
187}
188
189tool_box!(
190 MoireTools,
191 [
192 HelpTool,
193 CutFreshTool,
194 WaitEdgesTool,
195 WaitChainsTool,
196 DeadlockCandidatesTool,
197 EntityTool,
198 ChannelStateTool,
199 TaskStateTool,
200 SourceContextTool,
201 BacktraceTool,
202 DiffSnapshotsTool
203 ]
204);
205
206#[derive(Facet)]
207struct McpCutFreshResponse {
208 pub cut_id: CutId,
209 pub requested_at_ns: i64,
210 pub requested_connections: usize,
211 pub snapshot_id: i64,
212 pub captured_at_unix_ms: i64,
213 pub process_count: usize,
214 pub timed_out_count: usize,
215}
216
217#[derive(Facet)]
218struct McpHelpResponse {
219 pub read_this_first: String,
220 pub first_steps: Vec<String>,
221 pub tool_guide: Vec<McpHelpToolGuide>,
222 pub entity_kinds: Vec<McpHelpEntityKind>,
223 pub hang_patterns: Vec<McpHelpHangPattern>,
224 pub interpretation_notes: Vec<String>,
225}
226
227#[derive(Facet)]
228struct McpHelpToolGuide {
229 pub tool: String,
230 pub purpose: String,
231 pub when_to_use: String,
232 pub typical_args: String,
233}
234
235#[derive(Facet)]
236struct McpHelpEntityKind {
237 pub kind: String,
238 pub means: String,
239 pub hang_signal: String,
240}
241
242#[derive(Facet)]
243struct McpHelpHangPattern {
244 pub name: String,
245 pub signature: String,
246 pub likely_cause: String,
247 pub next_calls: Vec<String>,
248}
249
250#[derive(Facet)]
251struct McpWaitEdgesResponse {
252 pub snapshot_id: i64,
253 pub row_count: usize,
254 pub wait_edges: Vec<McpWaitEdge>,
255}
256
257#[derive(Facet)]
258struct McpWaitEdge {
259 pub process_id: String,
260 pub waiter_id: String,
261 pub waiter_name: String,
262 pub waiter_kind: String,
263 pub blocked_on_id: String,
264 pub blocked_on_name: String,
265 pub blocked_on_kind: String,
266 pub waiter_birth_ms: u64,
267 pub blocked_birth_ms: u64,
268 pub edge_kind: String,
269 #[facet(skip_unless_truthy)]
270 pub waiter_source: Option<McpSourceContext>,
271 #[facet(skip_unless_truthy)]
272 pub waiter_sources: Vec<McpSourceContext>,
273 #[facet(skip_unless_truthy)]
274 pub blocked_on_source: Option<McpSourceContext>,
275 #[facet(skip_unless_truthy)]
276 pub blocked_on_sources: Vec<McpSourceContext>,
277 #[facet(skip_unless_truthy)]
278 pub wait_site_source: Option<McpSourceContext>,
279 #[facet(skip_unless_truthy)]
280 pub wait_site_sources: Vec<McpSourceContext>,
281}
282
283#[derive(Facet)]
284struct McpWaitChainsResponse {
285 pub snapshot_id: i64,
286 pub chain_count: usize,
287 pub chains: Vec<McpWaitChain>,
288}
289
290#[derive(Facet)]
291struct McpWaitChain {
292 pub chain_id: String,
293 pub is_cycle: bool,
294 pub has_external_wake_source: bool,
295 pub summary: String,
296 pub node_ids: Vec<String>,
297 pub edges: Vec<McpChainEdge>,
298 pub nodes: Vec<McpNodeSummary>,
299}
300
301#[derive(Facet)]
302struct McpChainEdge {
303 pub src_entity_id: String,
304 pub dst_entity_id: String,
305 #[facet(skip_unless_truthy)]
306 pub wait_site_source: Option<McpSourceContext>,
307 #[facet(skip_unless_truthy)]
308 pub wait_site_sources: Vec<McpSourceContext>,
309}
310
311#[derive(Facet)]
312struct McpNodeSummary {
313 pub process_id: String,
314 pub entity_id: String,
315 pub name: String,
316 pub kind: String,
317 #[facet(skip_unless_truthy)]
318 pub source: Option<McpSourceContext>,
319 #[facet(skip_unless_truthy)]
320 pub sources: Vec<McpSourceContext>,
321}
322
323#[derive(Facet)]
324struct McpDeadlockCandidatesResponse {
325 pub snapshot_id: i64,
326 pub candidate_count: usize,
327 pub candidates: Vec<McpDeadlockCandidate>,
328}
329
330#[derive(Facet)]
331struct McpDeadlockCandidate {
332 pub candidate_id: String,
333 pub confidence: String,
334 pub reasons: Vec<String>,
335 pub entity_ids: Vec<String>,
336 #[facet(skip_unless_truthy)]
337 pub blocked_duration_hint_ms: Option<u64>,
338 pub cycle_nodes: Vec<McpNodeSummary>,
339}
340
341#[derive(Facet)]
342struct McpEntityResponse {
343 pub snapshot_id: i64,
344 pub process_id: String,
345 pub process_name: String,
346 pub pid: u32,
347 pub entity_id: String,
348 pub entity_name: String,
349 pub entity_kind: String,
350 pub entity_body_json: String,
351 pub incoming_wait_edges: Vec<McpChainEdge>,
352 pub outgoing_wait_edges: Vec<McpChainEdge>,
353 pub scope_ids: Vec<String>,
354 #[facet(skip_unless_truthy)]
355 pub source: Option<McpSourceContext>,
356 #[facet(skip_unless_truthy)]
357 pub sources: Vec<McpSourceContext>,
358}
359
360#[derive(Facet)]
361struct McpChannelStateResponse {
362 pub snapshot_id: i64,
363 pub channels: Vec<McpChannelState>,
364}
365
366#[derive(Facet)]
367struct McpChannelState {
368 pub process_id: String,
369 pub entity_id: String,
370 pub name: String,
371 pub channel_kind: String,
372 #[facet(skip_unless_truthy)]
373 pub capacity: Option<u32>,
374 #[facet(skip_unless_truthy)]
375 pub occupancy: Option<u32>,
376 pub sender_waiters: u32,
377 pub receiver_waiters: u32,
378 #[facet(skip_unless_truthy)]
379 pub lifecycle_hints: Option<String>,
380 #[facet(skip_unless_truthy)]
381 pub source: Option<McpSourceContext>,
382 #[facet(skip_unless_truthy)]
383 pub sources: Vec<McpSourceContext>,
384}
385
386#[derive(Facet)]
387struct McpTaskStateResponse {
388 pub snapshot_id: i64,
389 pub tasks: Vec<McpTaskState>,
390}
391
392#[derive(Facet)]
393struct McpTaskState {
394 pub process_id: String,
395 pub entity_id: String,
396 pub name: String,
397 pub entry_backtrace_id: u64,
398 #[facet(skip_unless_truthy)]
399 pub entry_frame_id: Option<u64>,
400 pub entry_frame_ids: Vec<u64>,
401 #[facet(skip_unless_truthy)]
402 pub awaiting_on_entity_id: Option<String>,
403 pub scope_ids: Vec<String>,
404 #[facet(skip_unless_truthy)]
405 pub source: Option<McpSourceContext>,
406 #[facet(skip_unless_truthy)]
407 pub sources: Vec<McpSourceContext>,
408}
409
410#[derive(Facet)]
411struct McpSourceContextResponse {
412 pub snapshot_id: i64,
413 pub format: String,
414 pub previews: Vec<McpSourceContext>,
415 pub unavailable_frame_ids: Vec<u64>,
416}
417
418#[derive(Facet)]
419struct McpBacktraceResponse {
420 pub snapshot_id: i64,
421 pub backtrace_id: u64,
422 pub frame_count: usize,
423 pub frames: Vec<McpBacktraceFrame>,
424}
425
426#[derive(Facet)]
427struct McpBacktraceFrame {
428 pub frame_id: u64,
429 pub status: String,
430 pub module_path: String,
431 #[facet(skip_unless_truthy)]
432 pub function_name: Option<String>,
433 #[facet(skip_unless_truthy)]
434 pub source_file: Option<String>,
435 #[facet(skip_unless_truthy)]
436 pub line: Option<u32>,
437 #[facet(skip_unless_truthy)]
438 pub rel_pc: Option<u64>,
439 #[facet(skip_unless_truthy)]
440 pub reason: Option<String>,
441 #[facet(skip_unless_truthy)]
442 pub source: Option<McpSourceContext>,
443}
444
445#[derive(Facet)]
446struct McpDiffSnapshotsResponse {
447 pub from_snapshot_id: i64,
448 pub to_snapshot_id: i64,
449 pub entity_added: Vec<String>,
450 pub entity_removed: Vec<String>,
451 pub waiting_on_added: Vec<String>,
452 pub waiting_on_removed: Vec<String>,
453 pub channel_changes: Vec<McpChannelDiff>,
454 pub task_changes: Vec<McpTaskDiff>,
455}
456
457#[derive(Facet)]
458struct McpChannelDiff {
459 pub entity_id: String,
460 pub before: String,
461 pub after: String,
462}
463
464#[derive(Facet)]
465struct McpTaskDiff {
466 pub entity_id: String,
467 #[facet(skip_unless_truthy)]
468 pub awaiting_before: Option<String>,
469 #[facet(skip_unless_truthy)]
470 pub awaiting_after: Option<String>,
471}
472
473#[derive(Facet, Clone)]
474struct McpSourceContext {
475 pub format: String,
476 pub frame_id: u64,
477 pub source_file: String,
478 pub target_line: u32,
479 #[facet(skip_unless_truthy)]
480 pub target_col: Option<u32>,
481 pub total_lines: u32,
482 #[facet(skip_unless_truthy)]
483 pub statement_text: Option<String>,
484 #[facet(skip_unless_truthy)]
485 pub enclosing_fn_text: Option<String>,
486 #[facet(skip_unless_truthy)]
487 pub compact_scope_text: Option<String>,
488 #[facet(skip_unless_truthy)]
489 pub compact_scope_range: Option<McpLineRange>,
490}
491
492#[derive(Facet, Clone)]
493struct McpLineRange {
494 pub start: u32,
495 pub end: u32,
496}
497
498#[derive(Clone)]
499struct WaitNode {
500 process_id: String,
501 ptime_now_ms: u64,
502 entity_id: String,
503 name: String,
504 kind: String,
505 birth_ms: u64,
506 frame_ids: Vec<FrameId>,
507}
508
509#[derive(Clone)]
510struct WaitEdgeRuntime {
511 process_id: String,
512 src_key: String,
513 dst_key: String,
514 dst_entity_id: String,
515 edge_frame_ids: Vec<FrameId>,
516}
517
518type WaitGraph = (
519 HashMap<String, WaitNode>,
520 Vec<WaitEdgeRuntime>,
521 HashMap<String, Vec<String>>,
522 HashMap<String, usize>,
523);
524
525#[derive(Clone)]
526struct MoireMcpHandler {
527 state: AppState,
528}
529
530impl MoireMcpHandler {
531 fn new(state: AppState) -> Self {
532 Self { state }
533 }
534
535 async fn dispatch_tool(
536 &self,
537 tool_name: &str,
538 args: &JsonMap<String, JsonValue>,
539 ) -> Result<String, String> {
540 match tool_name {
541 "moire_help" => self.tool_help().await,
542 "moire_cut_fresh" => self.tool_cut_fresh().await,
543 "moire_wait_edges" => {
544 let snapshot_id = optional_i64(args, "snapshot_id")?;
545 self.tool_wait_edges(snapshot_id).await
546 }
547 "moire_wait_chains" => {
548 let snapshot_id = optional_i64(args, "snapshot_id")?;
549 let roots = optional_string_list(args, "roots")?;
550 let max_depth = optional_u32(args, "max_depth")?;
551 self.tool_wait_chains(snapshot_id, roots, max_depth).await
552 }
553 "moire_deadlock_candidates" => {
554 let snapshot_id = optional_i64(args, "snapshot_id")?;
555 self.tool_deadlock_candidates(snapshot_id).await
556 }
557 "moire_entity" => {
558 let snapshot_id = optional_i64(args, "snapshot_id")?;
559 let entity_id = required_non_empty_string(args, "entity_id")?;
560 self.tool_entity(snapshot_id, entity_id).await
561 }
562 "moire_channel_state" => {
563 let snapshot_id = optional_i64(args, "snapshot_id")?;
564 let entity_id = optional_non_empty_string(args, "entity_id")?;
565 self.tool_channel_state(snapshot_id, entity_id).await
566 }
567 "moire_task_state" => {
568 let snapshot_id = optional_i64(args, "snapshot_id")?;
569 let entity_id = optional_non_empty_string(args, "entity_id")?;
570 self.tool_task_state(snapshot_id, entity_id).await
571 }
572 "moire_source_context" => {
573 let snapshot_id = optional_i64(args, "snapshot_id")?;
574 let frame_ids = required_u64_list(args, "frame_ids")?;
575 let format = required_non_empty_string(args, "format")?;
576 self.tool_source_context(snapshot_id, frame_ids, format)
577 .await
578 }
579 "moire_backtrace" => {
580 let snapshot_id = optional_i64(args, "snapshot_id")?;
581 let backtrace_id = required_u64(args, "backtrace_id")?;
582 let with_source = optional_bool(args, "with_source")?.unwrap_or(false);
583 self.tool_backtrace(snapshot_id, backtrace_id, with_source)
584 .await
585 }
586 "moire_diff_snapshots" => {
587 let from_snapshot_id = required_i64(args, "from_snapshot_id")?;
588 let to_snapshot_id = required_i64(args, "to_snapshot_id")?;
589 self.tool_diff_snapshots(from_snapshot_id, to_snapshot_id)
590 .await
591 }
592 other => Err(format!("unknown tool: {other}")),
593 }
594 }
595
596 async fn tool_help(&self) -> Result<String, String> {
597 let response = McpHelpResponse {
598 read_this_first: String::from(
599 "Run moire_help first in every new session, then run moire_cut_fresh. \
600Use the returned snapshot_id for all follow-up calls to stay on one coherent cut.",
601 ),
602 first_steps: vec![
603 String::from("1) moire_help"),
604 String::from("2) moire_cut_fresh"),
605 String::from("3) moire_wait_chains { snapshot_id }"),
606 String::from("4) moire_deadlock_candidates { snapshot_id }"),
607 String::from(
608 "5) moire_entity / moire_channel_state / moire_task_state on interesting nodes",
609 ),
610 String::from(
611 "6) moire_diff_snapshots { from_snapshot_id, to_snapshot_id } if you need to prove no progress",
612 ),
613 ],
614 tool_guide: vec![
615 McpHelpToolGuide {
616 tool: String::from("moire_cut_fresh"),
617 purpose: String::from("Capture a new coordinated cut and snapshot anchor."),
618 when_to_use: String::from("Always first for live debugging."),
619 typical_args: String::from("{}"),
620 },
621 McpHelpToolGuide {
622 tool: String::from("moire_wait_edges"),
623 purpose: String::from("Flat waiting_on edges with node + wait-site source."),
624 when_to_use: String::from("Need low-level raw wait graph facts."),
625 typical_args: String::from("{ snapshot_id }"),
626 },
627 McpHelpToolGuide {
628 tool: String::from("moire_wait_chains"),
629 purpose: String::from("Precomputed dependency chains with cycle detection."),
630 when_to_use: String::from("Primary traversal view for hangs."),
631 typical_args: String::from("{ snapshot_id, roots?, max_depth? }"),
632 },
633 McpHelpToolGuide {
634 tool: String::from("moire_deadlock_candidates"),
635 purpose: String::from("SCC-based deadlock candidates with confidence/reasons."),
636 when_to_use: String::from("Need probable root-cause candidates quickly."),
637 typical_args: String::from("{ snapshot_id }"),
638 },
639 McpHelpToolGuide {
640 tool: String::from("moire_entity"),
641 purpose: String::from(
642 "Inspect one entity with incoming/outgoing waits + scopes.",
643 ),
644 when_to_use: String::from("Drilling into one suspicious node."),
645 typical_args: String::from("{ snapshot_id, entity_id }"),
646 },
647 McpHelpToolGuide {
648 tool: String::from("moire_channel_state"),
649 purpose: String::from(
650 "Inspect channel occupancy/capacity and waiter pressure.",
651 ),
652 when_to_use: String::from("Suspected producer/consumer stall."),
653 typical_args: String::from("{ snapshot_id, entity_id? }"),
654 },
655 McpHelpToolGuide {
656 tool: String::from("moire_task_state"),
657 purpose: String::from("Inspect task/future await target + scope context."),
658 when_to_use: String::from("Suspected task/future parking issue."),
659 typical_args: String::from("{ snapshot_id, entity_id? }"),
660 },
661 McpHelpToolGuide {
662 tool: String::from("moire_source_context"),
663 purpose: String::from("Direct frame source lookup in text/plain."),
664 when_to_use: String::from("Need ad-hoc source for specific frame_ids."),
665 typical_args: String::from(
666 "{ snapshot_id?, frame_ids, format: \"text/plain\" }",
667 ),
668 },
669 McpHelpToolGuide {
670 tool: String::from("moire_backtrace"),
671 purpose: String::from("Expand one backtrace, optionally with source snippets."),
672 when_to_use: String::from("Need full call stack context."),
673 typical_args: String::from("{ snapshot_id, backtrace_id, with_source? }"),
674 },
675 McpHelpToolGuide {
676 tool: String::from("moire_diff_snapshots"),
677 purpose: String::from("Show progress/no-progress across two cuts."),
678 when_to_use: String::from("Need to prove stasis or identify transitions."),
679 typical_args: String::from("{ from_snapshot_id, to_snapshot_id }"),
680 },
681 ],
682 entity_kinds: vec![
683 McpHelpEntityKind {
684 kind: String::from("future"),
685 means: String::from("A task/future execution state."),
686 hang_signal: String::from(
687 "Long wait chain roots; waiting_on edges that never clear.",
688 ),
689 },
690 McpHelpEntityKind {
691 kind: String::from("mpsc_tx / mpsc_rx"),
692 means: String::from("Bounded/unbounded MPSC channel endpoints."),
693 hang_signal: String::from(
694 "tx waits with full buffer or rx waits with no producer progress.",
695 ),
696 },
697 McpHelpEntityKind {
698 kind: String::from("broadcast_tx / broadcast_rx"),
699 means: String::from("Broadcast channel endpoints."),
700 hang_signal: String::from(
701 "Receivers lagging or waiting while sender path is blocked.",
702 ),
703 },
704 McpHelpEntityKind {
705 kind: String::from("watch_tx / watch_rx"),
706 means: String::from("Watch channel update/read endpoints."),
707 hang_signal: String::from("rx waiting with no tx updates."),
708 },
709 McpHelpEntityKind {
710 kind: String::from("oneshot_tx / oneshot_rx"),
711 means: String::from("Single-message synchronization."),
712 hang_signal: String::from("rx waiting and tx never reaches send."),
713 },
714 McpHelpEntityKind {
715 kind: String::from("lock / semaphore / notify / once_cell"),
716 means: String::from("Synchronization primitives."),
717 hang_signal: String::from(
718 "Cycles through holders/waiters or no external wake source.",
719 ),
720 },
721 McpHelpEntityKind {
722 kind: String::from("net_* / request / response"),
723 means: String::from("I/O and RPC boundary operations."),
724 hang_signal: String::from(
725 "Can be real external wait; confirm with snapshot diff before calling deadlock.",
726 ),
727 },
728 McpHelpEntityKind {
729 kind: String::from("custom / aether"),
730 means: String::from("User-defined or synthetic placeholder entities."),
731 hang_signal: String::from(
732 "Use source snippets + neighboring edges for interpretation.",
733 ),
734 },
735 ],
736 hang_patterns: vec![
737 McpHelpHangPattern {
738 name: String::from("Pure wait cycle"),
739 signature: String::from(
740 "SCC with >=2 nodes and no clear external wake source.",
741 ),
742 likely_cause: String::from("Logical deadlock or handshake ordering bug."),
743 next_calls: vec![
744 String::from("moire_deadlock_candidates { snapshot_id }"),
745 String::from("moire_wait_chains { snapshot_id }"),
746 String::from("moire_entity { snapshot_id, entity_id }"),
747 ],
748 },
749 McpHelpHangPattern {
750 name: String::from("Producer starvation"),
751 signature: String::from(
752 "Receivers waiting on channel while upstream producer chain is blocked.",
753 ),
754 likely_cause: String::from(
755 "Missed spawn, gated branch, or upstream await cycle.",
756 ),
757 next_calls: vec![
758 String::from("moire_channel_state { snapshot_id }"),
759 String::from("moire_wait_chains { snapshot_id }"),
760 String::from("moire_task_state { snapshot_id }"),
761 ],
762 },
763 McpHelpHangPattern {
764 name: String::from("Backpressure stall"),
765 signature: String::from(
766 "Senders blocked with high/at-capacity channel occupancy.",
767 ),
768 likely_cause: String::from(
769 "Consumer slow or consumer blocked on unrelated wait.",
770 ),
771 next_calls: vec![
772 String::from("moire_channel_state { snapshot_id }"),
773 String::from("moire_wait_edges { snapshot_id }"),
774 String::from("moire_task_state { snapshot_id }"),
775 ],
776 },
777 McpHelpHangPattern {
778 name: String::from("Looks deadlocked but is external wait"),
779 signature: String::from(
780 "Chains terminate in net/request/response-style boundary nodes.",
781 ),
782 likely_cause: String::from(
783 "Remote dependency or I/O latency rather than internal cycle.",
784 ),
785 next_calls: vec![
786 String::from(
787 "moire_backtrace { snapshot_id, backtrace_id, with_source: true }",
788 ),
789 String::from("moire_diff_snapshots { from_snapshot_id, to_snapshot_id }"),
790 ],
791 },
792 McpHelpHangPattern {
793 name: String::from("No progress across cuts"),
794 signature: String::from(
795 "Repeated snapshots show same waiting_on graph and same hot entities.",
796 ),
797 likely_cause: String::from("Stable deadlock or starvation."),
798 next_calls: vec![
799 String::from("moire_cut_fresh"),
800 String::from("moire_diff_snapshots { from_snapshot_id, to_snapshot_id }"),
801 String::from("moire_deadlock_candidates { snapshot_id }"),
802 ],
803 },
804 ],
805 interpretation_notes: vec![
806 String::from(
807 "Prefer snapshot_id-pinned queries. Avoid mixing latest and pinned data in one diagnosis.",
808 ),
809 String::from(
810 "Waiting_on is the primary deadlock edge. Pairing/ownership edges are contextual but non-blocking by themselves.",
811 ),
812 String::from(
813 "Source snippets are best-effort from symbolication + tree-sitter extraction; missing snippets are explicit, not fabricated.",
814 ),
815 String::from(
816 "Do not conclude root cause from graph shape alone. Verify concrete caller code around highlighted wait sites.",
817 ),
818 String::from(
819 "Treat single-cut deadlock conclusions as provisional; confirm with moire_diff_snapshots when possible.",
820 ),
821 ],
822 };
823 Ok(render_help_markdown(&response))
824 }
825
826 async fn tool_cut_fresh(&self) -> Result<String, String> {
827 let cut = self.trigger_cut().await?;
828 let snapshot = take_snapshot_internal(&self.state).await;
829 let response = McpCutFreshResponse {
830 cut_id: cut.cut_id,
831 requested_at_ns: cut.requested_at_ns,
832 requested_connections: cut.requested_connections,
833 snapshot_id: snapshot.snapshot_id,
834 captured_at_unix_ms: snapshot.captured_at_unix_ms,
835 process_count: snapshot.processes.len(),
836 timed_out_count: snapshot.timed_out_processes.len(),
837 };
838 Ok(render_cut_fresh_markdown(&response))
839 }
840
841 async fn tool_wait_edges(&self, snapshot_id: Option<i64>) -> Result<String, String> {
842 let snapshot = self
843 .ensure_symbolication_ready(self.load_snapshot(snapshot_id).await?)
844 .await?;
845 let (nodes, edges, _, _) = self.build_wait_graph(&snapshot)?;
846 let sources = self
847 .load_source_for_graph(&snapshot, nodes.values(), &edges)
848 .await?;
849
850 let mut wait_edges = Vec::with_capacity(edges.len());
851 for edge in edges {
852 let src = nodes
853 .get(&edge.src_key)
854 .ok_or_else(|| format!("invariant violated: missing src node {}", edge.src_key))?;
855 let dst = nodes
856 .get(&edge.dst_key)
857 .ok_or_else(|| format!("invariant violated: missing dst node {}", edge.dst_key))?;
858 wait_edges.push(McpWaitEdge {
859 process_id: edge.process_id,
860 waiter_id: src.entity_id.clone(),
861 waiter_name: src.name.clone(),
862 waiter_kind: src.kind.clone(),
863 blocked_on_id: dst.entity_id.clone(),
864 blocked_on_name: dst.name.clone(),
865 blocked_on_kind: dst.kind.clone(),
866 waiter_birth_ms: src.birth_ms,
867 blocked_birth_ms: dst.birth_ms,
868 edge_kind: String::from("waiting_on"),
869 waiter_source: source_for_node(src, &sources),
870 waiter_sources: sources_for_node(src, &sources),
871 blocked_on_source: source_for_node(dst, &sources),
872 blocked_on_sources: sources_for_node(dst, &sources),
873 wait_site_source: edge
874 .edge_frame_ids
875 .first()
876 .and_then(|id| sources.get(&id.as_u64()).cloned()),
877 wait_site_sources: sources_for_frame_ids(&edge.edge_frame_ids, &sources),
878 });
879 }
880
881 let response = McpWaitEdgesResponse {
882 snapshot_id: snapshot.snapshot_id,
883 row_count: wait_edges.len(),
884 wait_edges,
885 };
886 Ok(render_wait_edges_markdown(&response))
887 }
888
889 async fn tool_wait_chains(
890 &self,
891 snapshot_id: Option<i64>,
892 roots: Option<Vec<String>>,
893 max_depth: Option<u32>,
894 ) -> Result<String, String> {
895 let snapshot = self
896 .ensure_symbolication_ready(self.load_snapshot(snapshot_id).await?)
897 .await?;
898 let (nodes, edges, adjacency, indegree) = self.build_wait_graph(&snapshot)?;
899 let sources = self
900 .load_source_for_graph(&snapshot, nodes.values(), &edges)
901 .await?;
902
903 let mut edge_wait_source: HashMap<(String, String), Option<McpSourceContext>> =
904 HashMap::new();
905 let mut edge_wait_sources: HashMap<(String, String), Vec<McpSourceContext>> =
906 HashMap::new();
907 for edge in &edges {
908 let wait_sources = sources_for_frame_ids(&edge.edge_frame_ids, &sources);
909 edge_wait_source.insert(
910 (edge.src_key.clone(), edge.dst_key.clone()),
911 wait_sources.first().cloned(),
912 );
913 edge_wait_sources.insert((edge.src_key.clone(), edge.dst_key.clone()), wait_sources);
914 }
915
916 let max_depth = max_depth
917 .map(|v| v as usize)
918 .unwrap_or(DEFAULT_WAIT_CHAIN_MAX_DEPTH)
919 .max(1);
920
921 let mut start_keys = if let Some(root_ids) = roots {
922 self.resolve_roots(&nodes, &root_ids)
923 } else {
924 Vec::new()
925 };
926 if start_keys.is_empty() {
927 for (key, next) in &adjacency {
928 if !next.is_empty() && *indegree.get(key).unwrap_or(&0) == 0 {
929 start_keys.push(key.clone());
930 }
931 }
932 }
933 if start_keys.is_empty() {
934 start_keys.extend(adjacency.keys().cloned());
935 }
936 start_keys.sort();
937 start_keys.dedup();
938
939 let mut chains: Vec<McpWaitChain> = Vec::new();
940 let mut chain_count = 0usize;
941 for start in start_keys {
942 if chains.len() >= DEFAULT_WAIT_CHAIN_MAX_RESULTS {
943 break;
944 }
945 let mut path: Vec<String> = vec![start.clone()];
946 self.walk_wait_paths(
947 &adjacency,
948 &start,
949 max_depth,
950 &mut path,
951 &mut chains,
952 &mut chain_count,
953 &nodes,
954 &sources,
955 &edge_wait_source,
956 &edge_wait_sources,
957 );
958 }
959
960 let response = McpWaitChainsResponse {
961 snapshot_id: snapshot.snapshot_id,
962 chain_count: chains.len(),
963 chains,
964 };
965 Ok(render_wait_chains_markdown(&response))
966 }
967
968 async fn tool_deadlock_candidates(&self, snapshot_id: Option<i64>) -> Result<String, String> {
969 let snapshot = self
970 .ensure_symbolication_ready(self.load_snapshot(snapshot_id).await?)
971 .await?;
972 let (nodes, _edges, adjacency, _indegree) = self.build_wait_graph(&snapshot)?;
973 let sources = self
974 .load_source_for_nodes(&snapshot, nodes.values())
975 .await?;
976
977 let mut scc_input = adjacency.keys().cloned().collect::<Vec<_>>();
978 scc_input.sort();
979 let sccs = strongly_connected_components(scc_input, &adjacency);
980 let mut candidates = Vec::new();
981 for (idx, scc) in sccs.into_iter().enumerate() {
982 if scc.len() <= 1 {
983 let Some(node_id) = scc.first() else {
984 continue;
985 };
986 let self_loop = adjacency
987 .get(node_id)
988 .is_some_and(|outs| outs.iter().any(|dst| dst == node_id));
989 if !self_loop {
990 continue;
991 }
992 }
993
994 let mut reasons = vec![String::from("strongly_connected_wait_cycle")];
995 let has_external_wake_source = scc
996 .iter()
997 .filter_map(|id| nodes.get(id))
998 .any(|node| node_has_external_wake_source(node.kind.as_str()));
999 if !has_external_wake_source {
1000 reasons.push(String::from("no_obvious_external_wake_source"));
1001 }
1002 let confidence = if !has_external_wake_source {
1003 String::from("high")
1004 } else {
1005 String::from("medium")
1006 };
1007
1008 let mut entity_ids = Vec::with_capacity(scc.len());
1009 let mut cycle_nodes = Vec::with_capacity(scc.len());
1010 let mut min_age_hint: Option<u64> = None;
1011 for key in &scc {
1012 let Some(node) = nodes.get(key) else {
1013 continue;
1014 };
1015 entity_ids.push(node.entity_id.clone());
1016 let age_hint = node.ptime_now_ms.saturating_sub(node.birth_ms);
1017 min_age_hint = Some(min_age_hint.map_or(age_hint, |curr| curr.min(age_hint)));
1018 cycle_nodes.push(McpNodeSummary {
1019 process_id: node.process_id.clone(),
1020 entity_id: node.entity_id.clone(),
1021 name: node.name.clone(),
1022 kind: node.kind.clone(),
1023 source: source_for_node(node, &sources),
1024 sources: sources_for_node(node, &sources),
1025 });
1026 }
1027
1028 candidates.push(McpDeadlockCandidate {
1029 candidate_id: format!("candidate-{}", idx + 1),
1030 confidence,
1031 reasons,
1032 entity_ids,
1033 blocked_duration_hint_ms: min_age_hint,
1034 cycle_nodes,
1035 });
1036 }
1037
1038 let response = McpDeadlockCandidatesResponse {
1039 snapshot_id: snapshot.snapshot_id,
1040 candidate_count: candidates.len(),
1041 candidates,
1042 };
1043 Ok(render_deadlock_candidates_markdown(&response))
1044 }
1045
1046 async fn tool_entity(
1047 &self,
1048 snapshot_id: Option<i64>,
1049 entity_id: String,
1050 ) -> Result<String, String> {
1051 let snapshot = self
1052 .ensure_symbolication_ready(self.load_snapshot(snapshot_id).await?)
1053 .await?;
1054 let located = self.find_entity(&snapshot, &entity_id)?;
1055 let backtrace_index = backtrace_index(&snapshot);
1056 let frame_catalog = frame_catalog(&snapshot);
1057 let local_entities: HashMap<&str, &Entity> = located
1058 .0
1059 .snapshot
1060 .entities
1061 .iter()
1062 .map(|entity| (entity.id.as_str(), entity))
1063 .collect();
1064 let entity_frame_ids = selected_frames_for_entity(
1065 located.1,
1066 &backtrace_index,
1067 &frame_catalog,
1068 SOURCE_FRAMES_PER_ITEM,
1069 );
1070
1071 let mut frame_ids = BTreeSet::new();
1072 for frame_id in &entity_frame_ids {
1073 frame_ids.insert(frame_id.as_u64());
1074 }
1075 for edge in &located.0.snapshot.edges {
1076 if edge.kind != EdgeKind::WaitingOn {
1077 continue;
1078 }
1079 if edge.dst.as_str() == entity_id || edge.src.as_str() == entity_id {
1080 for frame_id in selected_frames_for_backtrace_id(
1081 edge.backtrace.as_u64(),
1082 &backtrace_index,
1083 &frame_catalog,
1084 local_entities
1085 .get(edge.src.as_str())
1086 .map(|entity| frame_start_index_for_entity(entity))
1087 .unwrap_or(0),
1088 SOURCE_FRAMES_PER_ITEM,
1089 ) {
1090 frame_ids.insert(frame_id.as_u64());
1091 }
1092 }
1093 }
1094 let source_by_frame = if frame_ids.is_empty() {
1095 HashMap::new()
1096 } else {
1097 self.resolve_source_contexts(frame_ids)
1098 .await?
1099 .0
1100 .into_iter()
1101 .map(|ctx| (ctx.frame_id, ctx))
1102 .collect::<HashMap<_, _>>()
1103 };
1104 let source = entity_frame_ids
1105 .first()
1106 .and_then(|frame_id| source_by_frame.get(&frame_id.as_u64()).cloned());
1107 let sources = entity_frame_ids
1108 .iter()
1109 .filter_map(|frame_id| source_by_frame.get(&frame_id.as_u64()).cloned())
1110 .collect::<Vec<_>>();
1111
1112 let mut incoming = Vec::new();
1113 let mut outgoing = Vec::new();
1114 for edge in &located.0.snapshot.edges {
1115 if edge.kind != EdgeKind::WaitingOn {
1116 continue;
1117 }
1118 let wait_site_frame_ids = selected_frames_for_backtrace_id(
1119 edge.backtrace.as_u64(),
1120 &backtrace_index,
1121 &frame_catalog,
1122 local_entities
1123 .get(edge.src.as_str())
1124 .map(|entity| frame_start_index_for_entity(entity))
1125 .unwrap_or(0),
1126 SOURCE_FRAMES_PER_ITEM,
1127 );
1128 if edge.dst.as_str() == entity_id {
1129 let wait_site_source = wait_site_frame_ids
1130 .first()
1131 .and_then(|frame_id| source_by_frame.get(&frame_id.as_u64()).cloned());
1132 let wait_site_sources = wait_site_frame_ids
1133 .iter()
1134 .filter_map(|frame_id| source_by_frame.get(&frame_id.as_u64()).cloned())
1135 .collect::<Vec<_>>();
1136 incoming.push(McpChainEdge {
1137 src_entity_id: edge.src.as_str().to_owned(),
1138 dst_entity_id: edge.dst.as_str().to_owned(),
1139 wait_site_source,
1140 wait_site_sources,
1141 });
1142 }
1143 if edge.src.as_str() == entity_id {
1144 let wait_site_source = wait_site_frame_ids
1145 .first()
1146 .and_then(|frame_id| source_by_frame.get(&frame_id.as_u64()).cloned());
1147 let wait_site_sources = wait_site_frame_ids
1148 .iter()
1149 .filter_map(|frame_id| source_by_frame.get(&frame_id.as_u64()).cloned())
1150 .collect::<Vec<_>>();
1151 outgoing.push(McpChainEdge {
1152 src_entity_id: edge.src.as_str().to_owned(),
1153 dst_entity_id: edge.dst.as_str().to_owned(),
1154 wait_site_source,
1155 wait_site_sources,
1156 });
1157 }
1158 }
1159 incoming.sort_by(|a, b| {
1160 a.src_entity_id
1161 .cmp(&b.src_entity_id)
1162 .then_with(|| a.dst_entity_id.cmp(&b.dst_entity_id))
1163 });
1164 outgoing.sort_by(|a, b| {
1165 a.src_entity_id
1166 .cmp(&b.src_entity_id)
1167 .then_with(|| a.dst_entity_id.cmp(&b.dst_entity_id))
1168 });
1169
1170 let scope_ids = located
1171 .0
1172 .scope_entity_links
1173 .iter()
1174 .filter(|link| link.entity_id == entity_id)
1175 .map(|link| link.scope_id.clone())
1176 .collect();
1177
1178 let response = McpEntityResponse {
1179 snapshot_id: snapshot.snapshot_id,
1180 process_id: located.0.process_id.as_str().to_owned(),
1181 process_name: located.0.process_name.clone(),
1182 pid: located.0.pid,
1183 entity_id: located.1.id.as_str().to_owned(),
1184 entity_name: located.1.name.clone(),
1185 entity_kind: entity_kind_name(&located.1.body).to_owned(),
1186 entity_body_json: facet_json::to_string(&located.1.body)
1187 .map_err(|error| format!("encode entity body json: {error}"))?,
1188 incoming_wait_edges: incoming,
1189 outgoing_wait_edges: outgoing,
1190 scope_ids,
1191 source,
1192 sources,
1193 };
1194 Ok(render_entity_markdown(&response))
1195 }
1196
1197 async fn tool_channel_state(
1198 &self,
1199 snapshot_id: Option<i64>,
1200 entity_id: Option<String>,
1201 ) -> Result<String, String> {
1202 let snapshot = self
1203 .ensure_symbolication_ready(self.load_snapshot(snapshot_id).await?)
1204 .await?;
1205 let (nodes, edges, _adjacency, _indegree) = self.build_wait_graph(&snapshot)?;
1206 let sources = self
1207 .load_source_for_nodes(&snapshot, nodes.values())
1208 .await?;
1209
1210 let mut channels = Vec::new();
1211 for process in &snapshot.processes {
1212 for entity in &process.snapshot.entities {
1213 if !is_channel_entity(&entity.body) {
1214 continue;
1215 }
1216 if let Some(ref wanted) = entity_id
1217 && entity.id.as_str() != wanted
1218 {
1219 continue;
1220 }
1221
1222 let (capacity, occupancy, lifecycle_hints, channel_kind) =
1223 channel_metrics(&entity.body);
1224 let (sender_waiters, receiver_waiters) = count_waiters(&edges, &nodes, entity);
1225 let node_key = compose_node_key(&process.process_id, &entity.id);
1226 let node = nodes.get(&node_key);
1227 channels.push(McpChannelState {
1228 process_id: process.process_id.as_str().to_owned(),
1229 entity_id: entity.id.as_str().to_owned(),
1230 name: entity.name.clone(),
1231 channel_kind: channel_kind.to_owned(),
1232 capacity,
1233 occupancy,
1234 sender_waiters,
1235 receiver_waiters,
1236 lifecycle_hints,
1237 source: node.and_then(|n| source_for_node(n, &sources)),
1238 sources: node
1239 .map(|n| sources_for_node(n, &sources))
1240 .unwrap_or_default(),
1241 });
1242 }
1243 }
1244
1245 if let Some(wanted) = entity_id
1246 && channels.is_empty()
1247 {
1248 return Err(format!("unknown or non-channel entity_id `{wanted}`"));
1249 }
1250 channels.sort_by(|a, b| {
1251 a.process_id
1252 .cmp(&b.process_id)
1253 .then_with(|| a.name.cmp(&b.name))
1254 .then_with(|| a.entity_id.cmp(&b.entity_id))
1255 });
1256
1257 let response = McpChannelStateResponse {
1258 snapshot_id: snapshot.snapshot_id,
1259 channels,
1260 };
1261 Ok(render_channel_state_markdown(&response))
1262 }
1263
1264 async fn tool_task_state(
1265 &self,
1266 snapshot_id: Option<i64>,
1267 entity_id: Option<String>,
1268 ) -> Result<String, String> {
1269 let snapshot = self
1270 .ensure_symbolication_ready(self.load_snapshot(snapshot_id).await?)
1271 .await?;
1272 let (nodes, _edges, _adjacency, _indegree) = self.build_wait_graph(&snapshot)?;
1273 let backtrace_index = backtrace_index(&snapshot);
1274 let frame_catalog = frame_catalog(&snapshot);
1275 let sources = self
1276 .load_source_for_nodes(&snapshot, nodes.values())
1277 .await?;
1278
1279 let mut tasks = Vec::new();
1280 for process in &snapshot.processes {
1281 for entity in &process.snapshot.entities {
1282 if !is_task_entity(&entity.body) {
1283 continue;
1284 }
1285 if let Some(ref wanted) = entity_id
1286 && entity.id.as_str() != wanted
1287 {
1288 continue;
1289 }
1290
1291 let awaiting = process
1292 .snapshot
1293 .edges
1294 .iter()
1295 .find(|edge| {
1296 edge.kind == EdgeKind::WaitingOn && edge.src.as_str() == entity.id.as_str()
1297 })
1298 .map(|edge| edge.dst.as_str().to_owned());
1299
1300 let scope_ids = process
1301 .scope_entity_links
1302 .iter()
1303 .filter(|link| link.entity_id == entity.id.as_str())
1304 .map(|link| link.scope_id.clone())
1305 .collect::<Vec<_>>();
1306
1307 let node_key = compose_node_key(&process.process_id, &entity.id);
1308 let node = nodes.get(&node_key);
1309 let entry_frame_ids = selected_frames_for_entity(
1310 entity,
1311 &backtrace_index,
1312 &frame_catalog,
1313 SOURCE_FRAMES_PER_ITEM,
1314 );
1315 tasks.push(McpTaskState {
1316 process_id: process.process_id.as_str().to_owned(),
1317 entity_id: entity.id.as_str().to_owned(),
1318 name: entity.name.clone(),
1319 entry_backtrace_id: entity.backtrace.as_u64(),
1320 entry_frame_id: entry_frame_ids.first().map(|frame_id| frame_id.as_u64()),
1321 entry_frame_ids: entry_frame_ids
1322 .iter()
1323 .map(|frame_id| frame_id.as_u64())
1324 .collect(),
1325 awaiting_on_entity_id: awaiting,
1326 scope_ids,
1327 source: node.and_then(|n| source_for_node(n, &sources)),
1328 sources: node
1329 .map(|n| sources_for_node(n, &sources))
1330 .unwrap_or_default(),
1331 });
1332 }
1333 }
1334
1335 if let Some(wanted) = entity_id
1336 && tasks.is_empty()
1337 {
1338 return Err(format!("unknown or non-task entity_id `{wanted}`"));
1339 }
1340 tasks.sort_by(|a, b| {
1341 a.process_id
1342 .cmp(&b.process_id)
1343 .then_with(|| a.name.cmp(&b.name))
1344 .then_with(|| a.entity_id.cmp(&b.entity_id))
1345 });
1346
1347 let response = McpTaskStateResponse {
1348 snapshot_id: snapshot.snapshot_id,
1349 tasks,
1350 };
1351 Ok(render_task_state_markdown(&response))
1352 }
1353
1354 async fn tool_source_context(
1355 &self,
1356 snapshot_id: Option<i64>,
1357 frame_ids: Vec<u64>,
1358 format: String,
1359 ) -> Result<String, String> {
1360 let snapshot = self
1361 .ensure_symbolication_ready(self.load_snapshot(snapshot_id).await?)
1362 .await?;
1363 if frame_ids.is_empty() {
1364 return Err("frame_ids must be non-empty".to_string());
1365 }
1366 let format = if format == "text/plain" || format == "text" {
1367 String::from("text/plain")
1368 } else {
1369 return Err(format!(
1370 "unsupported format `{format}`; supported values: text/plain"
1371 ));
1372 };
1373
1374 let (previews, unavailable_frame_ids) = self
1375 .resolve_source_contexts(frame_ids.into_iter().collect::<BTreeSet<_>>())
1376 .await?;
1377
1378 if previews.is_empty() && !unavailable_frame_ids.is_empty() {
1379 let backtrace_ids = snapshot
1380 .backtraces
1381 .iter()
1382 .map(|bt| bt.backtrace_id.as_u64())
1383 .collect::<HashSet<_>>();
1384 let all_look_like_backtrace_ids = unavailable_frame_ids
1385 .iter()
1386 .all(|id| backtrace_ids.contains(id));
1387 if all_look_like_backtrace_ids {
1388 return Err(
1389 "frame_ids expects FRAME ids, but received values look like BACKTRACE ids. \
1390Call moire_backtrace first to list frame_ids for a backtrace."
1391 .to_string(),
1392 );
1393 }
1394 }
1395
1396 let response = McpSourceContextResponse {
1397 snapshot_id: snapshot.snapshot_id,
1398 format,
1399 previews,
1400 unavailable_frame_ids,
1401 };
1402 Ok(render_source_context_markdown(&response))
1403 }
1404
1405 async fn tool_backtrace(
1406 &self,
1407 snapshot_id: Option<i64>,
1408 backtrace_id_raw: u64,
1409 with_source: bool,
1410 ) -> Result<String, String> {
1411 let snapshot = self
1412 .ensure_symbolication_ready(self.load_snapshot(snapshot_id).await?)
1413 .await?;
1414 let Some(backtrace) = snapshot
1415 .backtraces
1416 .iter()
1417 .find(|bt| bt.backtrace_id.as_u64() == backtrace_id_raw)
1418 else {
1419 return Err(format!("unknown backtrace_id {backtrace_id_raw}"));
1420 };
1421
1422 let frame_map: HashMap<u64, &SnapshotBacktraceFrame> = snapshot
1423 .frames
1424 .iter()
1425 .map(|record| (record.frame_id.as_u64(), &record.frame))
1426 .collect();
1427
1428 let source_by_frame = if with_source {
1429 let frame_ids: BTreeSet<u64> =
1430 backtrace.frame_ids.iter().map(|id| id.as_u64()).collect();
1431 self.resolve_source_contexts(frame_ids).await?.0
1432 } else {
1433 Vec::new()
1434 };
1435 let source_by_frame_map: HashMap<u64, McpSourceContext> = source_by_frame
1436 .into_iter()
1437 .map(|src| (src.frame_id, src))
1438 .collect();
1439
1440 let mut frames = Vec::with_capacity(backtrace.frame_ids.len());
1441 for frame_id in &backtrace.frame_ids {
1442 let raw = frame_id.as_u64();
1443 let Some(frame) = frame_map.get(&raw) else {
1444 return Err(format!(
1445 "invariant violated: frame {} referenced by backtrace {} is missing",
1446 raw, backtrace_id_raw
1447 ));
1448 };
1449
1450 let frame_out = match frame {
1451 SnapshotBacktraceFrame::Resolved(BacktraceFrameResolved {
1452 module_path,
1453 function_name,
1454 source_file,
1455 line,
1456 }) => McpBacktraceFrame {
1457 frame_id: raw,
1458 status: String::from("resolved"),
1459 module_path: module_path.clone(),
1460 function_name: Some(function_name.clone()),
1461 source_file: Some(source_file.clone()),
1462 line: *line,
1463 rel_pc: None,
1464 reason: None,
1465 source: source_by_frame_map.get(&raw).cloned(),
1466 },
1467 SnapshotBacktraceFrame::Unresolved(BacktraceFrameUnresolved {
1468 module_path,
1469 rel_pc,
1470 reason,
1471 }) => McpBacktraceFrame {
1472 frame_id: raw,
1473 status: String::from("unresolved"),
1474 module_path: module_path.clone(),
1475 function_name: None,
1476 source_file: None,
1477 line: None,
1478 rel_pc: Some(rel_pc.get()),
1479 reason: Some(reason.clone()),
1480 source: source_by_frame_map.get(&raw).cloned(),
1481 },
1482 };
1483 frames.push(frame_out);
1484 }
1485
1486 let response = McpBacktraceResponse {
1487 snapshot_id: snapshot.snapshot_id,
1488 backtrace_id: backtrace_id_raw,
1489 frame_count: frames.len(),
1490 frames,
1491 };
1492 Ok(render_backtrace_markdown(&response))
1493 }
1494
1495 async fn tool_diff_snapshots(
1496 &self,
1497 from_snapshot_id: i64,
1498 to_snapshot_id: i64,
1499 ) -> Result<String, String> {
1500 let from = self.load_snapshot(Some(from_snapshot_id)).await?;
1501 let to = self.load_snapshot(Some(to_snapshot_id)).await?;
1502
1503 let from_entities = snapshot_entity_keys(&from);
1504 let to_entities = snapshot_entity_keys(&to);
1505
1506 let entity_added = to_entities
1507 .difference(&from_entities)
1508 .cloned()
1509 .collect::<Vec<_>>();
1510 let entity_removed = from_entities
1511 .difference(&to_entities)
1512 .cloned()
1513 .collect::<Vec<_>>();
1514
1515 let from_waiting = snapshot_waiting_edges(&from);
1516 let to_waiting = snapshot_waiting_edges(&to);
1517
1518 let waiting_on_added = to_waiting
1519 .difference(&from_waiting)
1520 .cloned()
1521 .collect::<Vec<_>>();
1522 let waiting_on_removed = from_waiting
1523 .difference(&to_waiting)
1524 .cloned()
1525 .collect::<Vec<_>>();
1526
1527 let from_channel = snapshot_channel_fingerprint(&from);
1528 let to_channel = snapshot_channel_fingerprint(&to);
1529 let mut channel_changes = Vec::new();
1530 for (entity_id, after) in &to_channel {
1531 if let Some(before) = from_channel.get(entity_id)
1532 && before != after
1533 {
1534 channel_changes.push(McpChannelDiff {
1535 entity_id: entity_id.clone(),
1536 before: before.clone(),
1537 after: after.clone(),
1538 });
1539 }
1540 }
1541
1542 let from_tasks = snapshot_task_wait_target(&from);
1543 let to_tasks = snapshot_task_wait_target(&to);
1544 let mut task_changes = Vec::new();
1545 for (entity_id, awaiting_after) in &to_tasks {
1546 let awaiting_before = from_tasks.get(entity_id).cloned().unwrap_or(None);
1547 if awaiting_before != *awaiting_after {
1548 task_changes.push(McpTaskDiff {
1549 entity_id: entity_id.clone(),
1550 awaiting_before,
1551 awaiting_after: awaiting_after.clone(),
1552 });
1553 }
1554 }
1555
1556 let response = McpDiffSnapshotsResponse {
1557 from_snapshot_id,
1558 to_snapshot_id,
1559 entity_added,
1560 entity_removed,
1561 waiting_on_added,
1562 waiting_on_removed,
1563 channel_changes,
1564 task_changes,
1565 };
1566 Ok(render_diff_snapshots_markdown(&response))
1567 }
1568
1569 async fn trigger_cut(&self) -> Result<TriggerCutResponse, String> {
1570 let (cut_id, cut_id_string, now_ns, requested_connections, outbound) = {
1571 let mut guard = self.state.inner.lock().await;
1572 let cut_num = guard.next_cut_id;
1573 guard.next_cut_id = guard.next_cut_id.next();
1574 let cut_id = cut_num.to_cut_id();
1575 let cut_id_string = cut_id.as_str().to_owned();
1576 let now_ns = now_nanos();
1577 let mut pending_conn_ids = BTreeSet::new();
1578 let mut outbound = Vec::new();
1579 for (conn_id, conn) in &guard.connections {
1580 pending_conn_ids.insert(*conn_id);
1581 outbound.push((*conn_id, conn.tx.clone()));
1582 }
1583
1584 guard.cuts.insert(
1585 cut_id.clone(),
1586 CutState {
1587 requested_at_ns: now_ns,
1588 pending_conn_ids,
1589 acks: BTreeMap::new(),
1590 },
1591 );
1592
1593 (cut_id, cut_id_string, now_ns, outbound.len(), outbound)
1594 };
1595
1596 let request = ServerMessage::CutRequest(moire_types::CutRequest {
1597 cut_id: cut_id.clone(),
1598 });
1599 if let Err(error) =
1600 persist_cut_request(self.state.db.clone(), cut_id_string.clone(), now_ns).await
1601 {
1602 warn!(
1603 %error,
1604 cut_id = %cut_id_string,
1605 "failed to persist cut request"
1606 );
1607 }
1608 let payload = encode_server_message_default(&request)
1609 .map_err(|error| format!("failed to encode cut request: {error}"))?;
1610 for (conn_id, tx) in outbound {
1611 if let Err(error) = tx.try_send(payload.clone()) {
1612 warn!(
1613 conn_id = %conn_id,
1614 %error,
1615 "failed to enqueue cut request"
1616 );
1617 }
1618 }
1619
1620 Ok(TriggerCutResponse {
1621 cut_id,
1622 requested_at_ns: now_ns,
1623 requested_connections,
1624 })
1625 }
1626
1627 async fn load_snapshot(
1628 &self,
1629 requested_snapshot_id: Option<i64>,
1630 ) -> Result<SnapshotCutResponse, String> {
1631 let snapshot_json = {
1632 let guard = self.state.inner.lock().await;
1633 match requested_snapshot_id {
1634 Some(snapshot_id) => guard.snapshot_history_json.get(&snapshot_id).cloned(),
1635 None => guard.last_snapshot_json.clone(),
1636 }
1637 };
1638
1639 let Some(snapshot_json) = snapshot_json else {
1640 return match requested_snapshot_id {
1641 Some(snapshot_id) => Err(format!("unknown snapshot_id {snapshot_id}")),
1642 None => Err("no snapshot available".to_string()),
1643 };
1644 };
1645
1646 facet_json::from_str::<SnapshotCutResponse>(&snapshot_json)
1647 .map_err(|error| format!("decode cached snapshot json: {error}"))
1648 }
1649
1650 async fn ensure_symbolication_ready(
1651 &self,
1652 mut snapshot: SnapshotCutResponse,
1653 ) -> Result<SnapshotCutResponse, String> {
1654 if snapshot.backtraces.is_empty() || snapshot.frames.is_empty() {
1655 return Ok(snapshot);
1656 }
1657
1658 if snapshot
1659 .frames
1660 .iter()
1661 .all(|record| !is_pending_frame(&record.frame))
1662 {
1663 return Ok(snapshot);
1664 }
1665
1666 let backtrace_ids: Vec<BacktraceId> = snapshot
1667 .backtraces
1668 .iter()
1669 .map(|bt| bt.backtrace_id)
1670 .collect();
1671 let deadline = tokio::time::Instant::now() + DEFAULT_SYMBOLICATION_WAIT_TIMEOUT;
1672
1673 loop {
1674 if let Err(error) =
1675 symbolicate_pending_frames_for_backtraces(self.state.db.clone(), &backtrace_ids)
1676 .await
1677 {
1678 warn!(
1679 snapshot_id = snapshot.snapshot_id,
1680 %error,
1681 "symbolication pass failed for MCP"
1682 );
1683 }
1684
1685 let table = load_snapshot_backtrace_table(self.state.db.clone(), &backtrace_ids).await;
1686 snapshot.backtraces = table.backtraces;
1687 snapshot.frames = table.frames;
1688 remember_snapshot(&self.state, &snapshot).await;
1689
1690 if snapshot
1691 .frames
1692 .iter()
1693 .all(|record| !is_pending_frame(&record.frame))
1694 {
1695 break;
1696 }
1697 if tokio::time::Instant::now() >= deadline {
1698 break;
1699 }
1700 tokio::time::sleep(DEFAULT_SYMBOLICATION_WAIT_TICK).await;
1701 }
1702
1703 Ok(snapshot)
1704 }
1705
1706 fn build_wait_graph(&self, snapshot: &SnapshotCutResponse) -> Result<WaitGraph, String> {
1707 let backtrace_index = backtrace_index(snapshot);
1708 let frame_catalog = frame_catalog(snapshot);
1709
1710 let mut nodes: HashMap<String, WaitNode> = HashMap::new();
1711 let mut edges: Vec<WaitEdgeRuntime> = Vec::new();
1712 let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
1713 let mut indegree: HashMap<String, usize> = HashMap::new();
1714 let mut seen_edges: HashSet<(String, String)> = HashSet::new();
1715
1716 for process in &snapshot.processes {
1717 let local_entities: HashMap<String, &Entity> = process
1718 .snapshot
1719 .entities
1720 .iter()
1721 .map(|entity| (entity.id.as_str().to_owned(), entity))
1722 .collect();
1723
1724 for edge in &process.snapshot.edges {
1725 if edge.kind != EdgeKind::WaitingOn {
1726 continue;
1727 }
1728
1729 let Some(src) = local_entities.get(edge.src.as_str()) else {
1730 return Err(format!(
1731 "invariant violated: missing src entity {} for waiting_on edge in process {}",
1732 edge.src.as_str(),
1733 process.process_id.as_str()
1734 ));
1735 };
1736 let Some(dst) = local_entities.get(edge.dst.as_str()) else {
1737 return Err(format!(
1738 "invariant violated: missing dst entity {} for waiting_on edge in process {}",
1739 edge.dst.as_str(),
1740 process.process_id.as_str()
1741 ));
1742 };
1743
1744 let src_key = compose_node_key(&process.process_id, &src.id);
1745 let dst_key = compose_node_key(&process.process_id, &dst.id);
1746
1747 nodes
1748 .entry(src_key.clone())
1749 .or_insert_with(|| wait_node(process, src, &backtrace_index, &frame_catalog));
1750 nodes
1751 .entry(dst_key.clone())
1752 .or_insert_with(|| wait_node(process, dst, &backtrace_index, &frame_catalog));
1753
1754 if seen_edges.insert((src_key.clone(), dst_key.clone())) {
1755 edges.push(WaitEdgeRuntime {
1756 process_id: process.process_id.as_str().to_owned(),
1757 src_key: src_key.clone(),
1758 dst_key: dst_key.clone(),
1759 dst_entity_id: dst.id.as_str().to_owned(),
1760 edge_frame_ids: selected_frames_for_backtrace_id(
1761 edge.backtrace.as_u64(),
1762 &backtrace_index,
1763 &frame_catalog,
1764 frame_start_index_for_entity(src),
1765 SOURCE_FRAMES_PER_ITEM,
1766 ),
1767 });
1768 adjacency
1769 .entry(src_key.clone())
1770 .or_default()
1771 .push(dst_key.clone());
1772 *indegree.entry(dst_key).or_insert(0) += 1;
1773 indegree.entry(src_key).or_insert(0);
1774 }
1775 }
1776 }
1777
1778 for outs in adjacency.values_mut() {
1779 outs.sort();
1780 outs.dedup();
1781 }
1782
1783 Ok((nodes, edges, adjacency, indegree))
1784 }
1785
1786 async fn load_source_for_nodes<'a>(
1787 &self,
1788 snapshot: &SnapshotCutResponse,
1789 nodes: impl Iterator<Item = &'a WaitNode>,
1790 ) -> Result<HashMap<u64, McpSourceContext>, String> {
1791 let mut frame_ids = BTreeSet::new();
1792 for node in nodes {
1793 for frame_id in &node.frame_ids {
1794 frame_ids.insert(frame_id.as_u64());
1795 }
1796 }
1797 if frame_ids.is_empty() {
1798 return Ok(HashMap::new());
1799 }
1800
1801 let (contexts, _unavailable) = self.resolve_source_contexts(frame_ids).await?;
1802 let by_frame = contexts
1803 .into_iter()
1804 .map(|ctx| (ctx.frame_id, ctx))
1805 .collect::<HashMap<_, _>>();
1806
1807 let _ = snapshot;
1808 Ok(by_frame)
1809 }
1810
1811 async fn load_source_for_graph<'a>(
1812 &self,
1813 snapshot: &SnapshotCutResponse,
1814 nodes: impl Iterator<Item = &'a WaitNode>,
1815 edges: &[WaitEdgeRuntime],
1816 ) -> Result<HashMap<u64, McpSourceContext>, String> {
1817 let mut frame_ids = BTreeSet::new();
1818 for node in nodes {
1819 for frame_id in &node.frame_ids {
1820 frame_ids.insert(frame_id.as_u64());
1821 }
1822 }
1823 for edge in edges {
1824 for frame_id in &edge.edge_frame_ids {
1825 frame_ids.insert(frame_id.as_u64());
1826 }
1827 }
1828 if frame_ids.is_empty() {
1829 return Ok(HashMap::new());
1830 }
1831
1832 let (contexts, _unavailable) = self.resolve_source_contexts(frame_ids).await?;
1833 let by_frame = contexts
1834 .into_iter()
1835 .map(|ctx| (ctx.frame_id, ctx))
1836 .collect::<HashMap<_, _>>();
1837 let _ = snapshot;
1838 Ok(by_frame)
1839 }
1840
1841 async fn resolve_source_contexts(
1842 &self,
1843 frame_ids: BTreeSet<u64>,
1844 ) -> Result<(Vec<McpSourceContext>, Vec<u64>), String> {
1845 let db = self.state.db.clone();
1846 tokio::task::spawn_blocking(move || {
1847 let mut previews = Vec::new();
1848 let mut unavailable_frame_ids = Vec::new();
1849
1850 for frame_id_raw in frame_ids {
1851 let Some((frame_id, module_identity, rel_pc)) =
1852 lookup_frame_source_by_raw(frame_id_raw)
1853 else {
1854 unavailable_frame_ids.push(frame_id_raw);
1855 continue;
1856 };
1857
1858 let Some(location) =
1859 lookup_source_text_location_in_db(&db, module_identity, rel_pc)?
1860 else {
1861 unavailable_frame_ids.push(frame_id_raw);
1862 continue;
1863 };
1864
1865 let statement_text = location.language.and_then(|lang| {
1866 extract_target_statement(
1867 &location.content,
1868 lang,
1869 location.target_line,
1870 location.target_col,
1871 )
1872 });
1873
1874 let enclosing_fn_text = location.language.and_then(|lang| {
1875 extract_enclosing_fn(
1876 &location.content,
1877 lang,
1878 location.target_line,
1879 location.target_col,
1880 )
1881 });
1882
1883 let (compact_scope_text, compact_scope_range) = location
1884 .language
1885 .and_then(|lang| {
1886 cut_source_compact(
1887 &location.content,
1888 lang,
1889 location.target_line,
1890 location.target_col,
1891 )
1892 })
1893 .map(|cut| {
1894 (
1895 Some(cut.cut_source),
1896 Some(McpLineRange {
1897 start: cut.scope_range.start,
1898 end: cut.scope_range.end,
1899 }),
1900 )
1901 })
1902 .unwrap_or((None, None));
1903
1904 previews.push(McpSourceContext {
1905 format: String::from("text/plain"),
1906 frame_id: frame_id.as_u64(),
1907 source_file: location.source_file,
1908 target_line: location.target_line,
1909 target_col: location.target_col,
1910 total_lines: location.total_lines,
1911 statement_text,
1912 enclosing_fn_text,
1913 compact_scope_text,
1914 compact_scope_range,
1915 });
1916 }
1917
1918 Ok::<(Vec<McpSourceContext>, Vec<u64>), String>((previews, unavailable_frame_ids))
1919 })
1920 .await
1921 .map_err(|error| format!("source context worker join error: {error}"))?
1922 }
1923
1924 fn resolve_roots(&self, nodes: &HashMap<String, WaitNode>, roots: &[String]) -> Vec<String> {
1925 let mut out = Vec::new();
1926 for root in roots {
1927 for (key, node) in nodes {
1928 if node.entity_id == *root {
1929 out.push(key.clone());
1930 }
1931 }
1932 }
1933 out.sort();
1934 out.dedup();
1935 out
1936 }
1937
1938 #[allow(clippy::too_many_arguments)]
1939 fn walk_wait_paths(
1940 &self,
1941 adjacency: &HashMap<String, Vec<String>>,
1942 _start: &str,
1943 max_depth: usize,
1944 path: &mut Vec<String>,
1945 chains: &mut Vec<McpWaitChain>,
1946 chain_count: &mut usize,
1947 nodes: &HashMap<String, WaitNode>,
1948 sources: &HashMap<u64, McpSourceContext>,
1949 edge_wait_source: &HashMap<(String, String), Option<McpSourceContext>>,
1950 edge_wait_sources: &HashMap<(String, String), Vec<McpSourceContext>>,
1951 ) {
1952 if chains.len() >= DEFAULT_WAIT_CHAIN_MAX_RESULTS {
1953 return;
1954 }
1955
1956 let Some(current) = path.last().cloned() else {
1957 return;
1958 };
1959 let nexts = adjacency.get(¤t).cloned().unwrap_or_default();
1960
1961 if nexts.is_empty() || path.len() >= max_depth {
1962 *chain_count += 1;
1963 chains.push(make_chain(
1964 *chain_count,
1965 path,
1966 false,
1967 nodes,
1968 sources,
1969 edge_wait_source,
1970 edge_wait_sources,
1971 path.len() >= max_depth,
1972 ));
1973 return;
1974 }
1975
1976 for next in nexts {
1977 if let Some(cycle_start) = path.iter().position(|id| id == &next) {
1978 let mut cycle_path = path[cycle_start..].to_vec();
1979 cycle_path.push(next.clone());
1980 *chain_count += 1;
1981 chains.push(make_chain(
1982 *chain_count,
1983 &cycle_path,
1984 true,
1985 nodes,
1986 sources,
1987 edge_wait_source,
1988 edge_wait_sources,
1989 false,
1990 ));
1991 if chains.len() >= DEFAULT_WAIT_CHAIN_MAX_RESULTS {
1992 return;
1993 }
1994 continue;
1995 }
1996
1997 path.push(next);
1998 self.walk_wait_paths(
1999 adjacency,
2000 ¤t,
2001 max_depth,
2002 path,
2003 chains,
2004 chain_count,
2005 nodes,
2006 sources,
2007 edge_wait_source,
2008 edge_wait_sources,
2009 );
2010 path.pop();
2011
2012 if chains.len() >= DEFAULT_WAIT_CHAIN_MAX_RESULTS {
2013 return;
2014 }
2015 }
2016 }
2017
2018 fn find_entity<'a>(
2019 &self,
2020 snapshot: &'a SnapshotCutResponse,
2021 entity_id: &str,
2022 ) -> Result<(&'a ProcessSnapshotView, &'a Entity), String> {
2023 let mut matches: Vec<(&ProcessSnapshotView, &Entity)> = Vec::new();
2024 for process in &snapshot.processes {
2025 for entity in &process.snapshot.entities {
2026 if entity.id.as_str() == entity_id {
2027 matches.push((process, entity));
2028 }
2029 }
2030 }
2031
2032 match matches.len() {
2033 0 => Err(format!("unknown entity_id `{entity_id}`")),
2034 1 => Ok(matches.remove(0)),
2035 n => Err(format!(
2036 "entity_id `{entity_id}` is ambiguous across {n} processes; expected a unique id"
2037 )),
2038 }
2039 }
2040}
2041
2042#[async_trait]
2043impl ServerHandler for MoireMcpHandler {
2044 async fn handle_list_tools_request(
2045 &self,
2046 _params: Option<PaginatedRequestParams>,
2047 _runtime: Arc<dyn rust_mcp_sdk::McpServer>,
2048 ) -> Result<ListToolsResult, RpcError> {
2049 Ok(ListToolsResult {
2050 tools: MoireTools::tools(),
2051 meta: None,
2052 next_cursor: None,
2053 })
2054 }
2055
2056 async fn handle_call_tool_request(
2057 &self,
2058 params: CallToolRequestParams,
2059 _runtime: Arc<dyn rust_mcp_sdk::McpServer>,
2060 ) -> Result<CallToolResult, CallToolError> {
2061 let tool_name = params.name.clone();
2062 let args = params.arguments.unwrap_or_default();
2063 let response = match self.dispatch_tool(tool_name.as_str(), &args).await {
2064 Ok(body) => body,
2065 Err(error) => format!("Error: {error}"),
2066 };
2067 Ok(CallToolResult::text_content(vec![response.into()]))
2068 }
2069}
2070
2071pub async fn run_mcp_server(listener: TcpListener, state: AppState) -> Result<(), String> {
2072 let local_addr = listener
2073 .local_addr()
2074 .map_err(|error| format!("resolve mcp listener addr: {error}"))?;
2075 let handler = MoireMcpHandler::new(state);
2076 let app_state = Arc::new(McpAppState {
2077 session_store: Arc::new(InMemorySessionStore::new()),
2078 id_generator: Arc::new(UuidGenerator {}),
2079 stream_id_gen: Arc::new(FastIdGenerator::new(Some("s_"))),
2080 server_details: Arc::new(server_details()),
2081 handler: handler.to_mcp_server_handler(),
2082 ping_interval: DEFAULT_MCP_PING_INTERVAL,
2083 transport_options: Arc::new(TransportOptions::default()),
2084 enable_json_response: false,
2085 event_store: None,
2086 task_store: None,
2087 client_task_store: None,
2088 });
2089
2090 let http_handler = Arc::new(McpHttpHandler::new(vec![]));
2091
2092 let app = Router::new()
2093 .route(
2094 DEFAULT_MCP_ENDPOINT,
2095 get(handle_streamable_http_get)
2096 .post(handle_streamable_http_post)
2097 .delete(handle_streamable_http_delete),
2098 )
2099 .with_state(app_state)
2100 .layer(Extension(http_handler));
2101
2102 info!(
2103 endpoint = %DEFAULT_MCP_ENDPOINT,
2104 addr = %local_addr,
2105 "moire-web MCP Streamable HTTP ready"
2106 );
2107
2108 axum::serve(listener, app)
2109 .await
2110 .map_err(|error| format!("MCP server failed: {error}"))
2111}
2112
2113fn server_details() -> InitializeResult {
2114 InitializeResult {
2115 server_info: Implementation {
2116 name: "moire-web".into(),
2117 version: env!("CARGO_PKG_VERSION").into(),
2118 description: Some(
2119 "Moire runtime graph server with deadlock-focused MCP tools. Run moire_help first."
2120 .into(),
2121 ),
2122 title: Some("moire-web MCP".into()),
2123 icons: vec![],
2124 website_url: Some("https://github.com/bearcove/moire".into()),
2125 },
2126 capabilities: ServerCapabilities {
2127 tools: Some(ServerCapabilitiesTools { list_changed: None }),
2128 ..Default::default()
2129 },
2130 protocol_version: LATEST_PROTOCOL_VERSION.into(),
2131 instructions: Some(
2132 "Run moire_help first. It defines the recommended workflow, entity semantics, and hang patterns. \
2133Then run moire_cut_fresh and keep using its snapshot_id for all follow-up calls."
2134 .into(),
2135 ),
2136 meta: None,
2137 }
2138}
2139
2140fn render_help_markdown(response: &McpHelpResponse) -> String {
2141 let mut out = String::new();
2142 let _ = writeln!(out, "Read this first");
2143 let _ = writeln!(out, "{}", response.read_this_first);
2144 let _ = writeln!(out);
2145
2146 let _ = writeln!(out, "Recommended workflow");
2147 for step in &response.first_steps {
2148 let _ = writeln!(out, "- {step}");
2149 }
2150 let _ = writeln!(out);
2151
2152 let _ = writeln!(out, "Tool guide");
2153 for guide in &response.tool_guide {
2154 let _ = writeln!(out, "- {}: {}", guide.tool, guide.purpose);
2155 let _ = writeln!(out, " when: {}", guide.when_to_use);
2156 let _ = writeln!(out, " args: {}", guide.typical_args);
2157 }
2158 let _ = writeln!(out);
2159
2160 let _ = writeln!(out, "Entity kinds");
2161 for kind in &response.entity_kinds {
2162 let _ = writeln!(
2163 out,
2164 "- {}: {} (hang signal: {})",
2165 kind.kind, kind.means, kind.hang_signal
2166 );
2167 }
2168 let _ = writeln!(out);
2169
2170 let _ = writeln!(out, "Typical hang patterns");
2171 for pattern in &response.hang_patterns {
2172 let _ = writeln!(out, "- {}", pattern.name);
2173 let _ = writeln!(out, " signature: {}", pattern.signature);
2174 let _ = writeln!(out, " likely cause: {}", pattern.likely_cause);
2175 let _ = writeln!(out, " next calls:");
2176 for next in &pattern.next_calls {
2177 let _ = writeln!(out, " - {next}");
2178 }
2179 }
2180 let _ = writeln!(out);
2181
2182 let _ = writeln!(out, "Interpretation notes");
2183 for note in &response.interpretation_notes {
2184 let _ = writeln!(out, "- {note}");
2185 }
2186 out.trim_end().to_string()
2187}
2188
2189fn render_cut_fresh_markdown(response: &McpCutFreshResponse) -> String {
2190 format!(
2191 "cut_id: {}\nsnapshot_id: {}\ncaptured_at_unix_ms: {}\nrequested_connections: {}\nprocess_count: {}\ntimed_out_count: {}",
2192 response.cut_id.as_str(),
2193 response.snapshot_id,
2194 response.captured_at_unix_ms,
2195 response.requested_connections,
2196 response.process_count,
2197 response.timed_out_count
2198 )
2199}
2200
2201fn render_wait_edges_markdown(response: &McpWaitEdgesResponse) -> String {
2202 let mut out = String::new();
2203 let _ = writeln!(out, "snapshot_id: {}", response.snapshot_id);
2204 let _ = writeln!(out, "wait_edges: {}", response.row_count);
2205
2206 for (idx, edge) in response.wait_edges.iter().enumerate() {
2207 let _ = writeln!(
2208 out,
2209 "\n{}. {} [{}] -> {} [{}]",
2210 idx + 1,
2211 edge.waiter_name,
2212 edge.waiter_kind,
2213 edge.blocked_on_name,
2214 edge.blocked_on_kind
2215 );
2216 let _ = writeln!(
2217 out,
2218 " ids: {} -> {} (process {})",
2219 edge.waiter_id, edge.blocked_on_id, edge.process_id
2220 );
2221 let _ = writeln!(
2222 out,
2223 " births_ms: waiter={} blocked={}",
2224 edge.waiter_birth_ms, edge.blocked_birth_ms
2225 );
2226 append_source_set(
2227 &mut out,
2228 " waiter sources",
2229 edge.waiter_source.as_ref(),
2230 &edge.waiter_sources,
2231 " ",
2232 );
2233 append_source_set(
2234 &mut out,
2235 " blocked_on sources",
2236 edge.blocked_on_source.as_ref(),
2237 &edge.blocked_on_sources,
2238 " ",
2239 );
2240 append_source_set(
2241 &mut out,
2242 " wait-site sources",
2243 edge.wait_site_source.as_ref(),
2244 &edge.wait_site_sources,
2245 " ",
2246 );
2247 }
2248 out.trim_end().to_string()
2249}
2250
2251fn render_wait_chains_markdown(response: &McpWaitChainsResponse) -> String {
2252 let mut out = String::new();
2253 let _ = writeln!(out, "snapshot_id: {}", response.snapshot_id);
2254 let _ = writeln!(out, "chains: {}", response.chain_count);
2255
2256 for chain in &response.chains {
2257 let cycle = if chain.is_cycle { "cycle" } else { "acyclic" };
2258 let wake = if chain.has_external_wake_source {
2259 "has external wake source"
2260 } else {
2261 "no external wake source"
2262 };
2263 let _ = writeln!(
2264 out,
2265 "\n{} ({}, {}): {}",
2266 chain.chain_id, cycle, wake, chain.summary
2267 );
2268 if !chain.nodes.is_empty() {
2269 let path = chain
2270 .nodes
2271 .iter()
2272 .map(|node| node.name.as_str())
2273 .collect::<Vec<_>>()
2274 .join(" -> ");
2275 let _ = writeln!(out, "path: {path}");
2276 }
2277
2278 let _ = writeln!(out, "nodes:");
2279 for node in &chain.nodes {
2280 let _ = writeln!(
2281 out,
2282 "- {} [{}] id={} process={}",
2283 node.name, node.kind, node.entity_id, node.process_id
2284 );
2285 append_source_set(
2286 &mut out,
2287 " sources",
2288 node.source.as_ref(),
2289 &node.sources,
2290 " ",
2291 );
2292 }
2293
2294 let _ = writeln!(out, "edges:");
2295 for edge in &chain.edges {
2296 let _ = writeln!(out, "- {} -> {}", edge.src_entity_id, edge.dst_entity_id);
2297 append_source_set(
2298 &mut out,
2299 " wait-site sources",
2300 edge.wait_site_source.as_ref(),
2301 &edge.wait_site_sources,
2302 " ",
2303 );
2304 }
2305 }
2306
2307 out.trim_end().to_string()
2308}
2309
2310fn render_deadlock_candidates_markdown(response: &McpDeadlockCandidatesResponse) -> String {
2311 let mut out = String::new();
2312 let _ = writeln!(out, "snapshot_id: {}", response.snapshot_id);
2313 let _ = writeln!(out, "candidates: {}", response.candidate_count);
2314
2315 for candidate in &response.candidates {
2316 let _ = writeln!(
2317 out,
2318 "\n{}: confidence={}",
2319 candidate.candidate_id, candidate.confidence
2320 );
2321 let _ = writeln!(out, "reasons: {}", candidate.reasons.join(", "));
2322 let _ = writeln!(out, "entity_ids: {}", candidate.entity_ids.join(", "));
2323 if let Some(duration) = candidate.blocked_duration_hint_ms {
2324 let _ = writeln!(out, "blocked_duration_hint_ms: {duration}");
2325 }
2326 for node in &candidate.cycle_nodes {
2327 let _ = writeln!(out, "- {} [{}] id={}", node.name, node.kind, node.entity_id);
2328 append_source_set(
2329 &mut out,
2330 " sources",
2331 node.source.as_ref(),
2332 &node.sources,
2333 " ",
2334 );
2335 }
2336 }
2337
2338 out.trim_end().to_string()
2339}
2340
2341fn render_entity_markdown(response: &McpEntityResponse) -> String {
2342 let mut out = String::new();
2343 let _ = writeln!(out, "snapshot_id: {}", response.snapshot_id);
2344 let _ = writeln!(
2345 out,
2346 "entity: {} [{}]",
2347 response.entity_name, response.entity_kind
2348 );
2349 let _ = writeln!(out, "entity_id: {}", response.entity_id);
2350 let _ = writeln!(
2351 out,
2352 "process: {} (pid {}, id {})",
2353 response.process_name, response.pid, response.process_id
2354 );
2355 let _ = writeln!(out, "scope_ids: {}", response.scope_ids.join(", "));
2356 let _ = writeln!(out, "entity_body: {}", response.entity_body_json);
2357 append_source_set(
2358 &mut out,
2359 "sources",
2360 response.source.as_ref(),
2361 &response.sources,
2362 "",
2363 );
2364
2365 let _ = writeln!(out, "\nincoming_wait_edges:");
2366 for edge in &response.incoming_wait_edges {
2367 let _ = writeln!(out, "- {} -> {}", edge.src_entity_id, edge.dst_entity_id);
2368 append_source_set(
2369 &mut out,
2370 " wait-site sources",
2371 edge.wait_site_source.as_ref(),
2372 &edge.wait_site_sources,
2373 " ",
2374 );
2375 }
2376
2377 let _ = writeln!(out, "\noutgoing_wait_edges:");
2378 for edge in &response.outgoing_wait_edges {
2379 let _ = writeln!(out, "- {} -> {}", edge.src_entity_id, edge.dst_entity_id);
2380 append_source_set(
2381 &mut out,
2382 " wait-site sources",
2383 edge.wait_site_source.as_ref(),
2384 &edge.wait_site_sources,
2385 " ",
2386 );
2387 }
2388
2389 out.trim_end().to_string()
2390}
2391
2392fn render_channel_state_markdown(response: &McpChannelStateResponse) -> String {
2393 let mut out = String::new();
2394 let _ = writeln!(out, "snapshot_id: {}", response.snapshot_id);
2395 let _ = writeln!(out, "channels: {}", response.channels.len());
2396
2397 for channel in &response.channels {
2398 let _ = writeln!(
2399 out,
2400 "\n- {} [{}] id={} process={}",
2401 channel.name, channel.channel_kind, channel.entity_id, channel.process_id
2402 );
2403 if let Some(capacity) = channel.capacity {
2404 let _ = writeln!(out, " capacity: {capacity}");
2405 }
2406 if let Some(occupancy) = channel.occupancy {
2407 let _ = writeln!(out, " occupancy: {occupancy}");
2408 }
2409 let _ = writeln!(
2410 out,
2411 " waiters: senders={} receivers={}",
2412 channel.sender_waiters, channel.receiver_waiters
2413 );
2414 if let Some(hints) = channel.lifecycle_hints.as_ref() {
2415 let _ = writeln!(out, " lifecycle: {hints}");
2416 }
2417 append_source_set(
2418 &mut out,
2419 " sources",
2420 channel.source.as_ref(),
2421 &channel.sources,
2422 " ",
2423 );
2424 }
2425 out.trim_end().to_string()
2426}
2427
2428fn render_task_state_markdown(response: &McpTaskStateResponse) -> String {
2429 let mut out = String::new();
2430 let _ = writeln!(out, "snapshot_id: {}", response.snapshot_id);
2431 let _ = writeln!(out, "tasks: {}", response.tasks.len());
2432
2433 for task in &response.tasks {
2434 let _ = writeln!(
2435 out,
2436 "\n- {} id={} process={}",
2437 task.name, task.entity_id, task.process_id
2438 );
2439 let _ = writeln!(out, " entry_backtrace_id: {}", task.entry_backtrace_id);
2440 if !task.entry_frame_ids.is_empty() {
2441 let frame_ids = task
2442 .entry_frame_ids
2443 .iter()
2444 .map(std::string::ToString::to_string)
2445 .collect::<Vec<_>>()
2446 .join(", ");
2447 let _ = writeln!(out, " entry_frame_ids: {frame_ids}");
2448 } else if let Some(frame_id) = task.entry_frame_id {
2449 let _ = writeln!(out, " entry_frame_id: {frame_id}");
2450 }
2451 if let Some(awaiting) = task.awaiting_on_entity_id.as_ref() {
2452 let _ = writeln!(out, " awaiting_on_entity_id: {awaiting}");
2453 }
2454 let _ = writeln!(out, " scope_ids: {}", task.scope_ids.join(", "));
2455 append_source_set(
2456 &mut out,
2457 " sources",
2458 task.source.as_ref(),
2459 &task.sources,
2460 " ",
2461 );
2462 }
2463
2464 out.trim_end().to_string()
2465}
2466
2467fn render_source_context_markdown(response: &McpSourceContextResponse) -> String {
2468 let mut out = String::new();
2469 let _ = writeln!(out, "snapshot_id: {}", response.snapshot_id);
2470 let _ = writeln!(out, "format: {}", response.format);
2471 let _ = writeln!(out, "previews: {}", response.previews.len());
2472
2473 for preview in &response.previews {
2474 let _ = writeln!(out, "\nframe_id: {}", preview.frame_id);
2475 append_rendered_block(&mut out, &render_source_snippet(preview), "");
2476 }
2477
2478 if !response.unavailable_frame_ids.is_empty() {
2479 let ids = response
2480 .unavailable_frame_ids
2481 .iter()
2482 .map(std::string::ToString::to_string)
2483 .collect::<Vec<_>>()
2484 .join(", ");
2485 let _ = writeln!(out, "\nunavailable_frame_ids: [{ids}]");
2486 }
2487 out.trim_end().to_string()
2488}
2489
2490fn render_backtrace_markdown(response: &McpBacktraceResponse) -> String {
2491 let mut out = String::new();
2492 let _ = writeln!(out, "snapshot_id: {}", response.snapshot_id);
2493 let _ = writeln!(out, "backtrace_id: {}", response.backtrace_id);
2494 let _ = writeln!(out, "frame_count: {}", response.frame_count);
2495
2496 for frame in &response.frames {
2497 let _ = writeln!(out, "\n- frame {} [{}]", frame.frame_id, frame.status);
2498 if let Some(function_name) = frame.function_name.as_ref() {
2499 let _ = writeln!(out, " function: {function_name}");
2500 }
2501 if let Some(source_file) = frame.source_file.as_ref() {
2502 if let Some(line) = frame.line {
2503 let _ = writeln!(out, " source: {source_file}:{line}");
2504 } else {
2505 let _ = writeln!(out, " source: {source_file}");
2506 }
2507 } else {
2508 let _ = writeln!(out, " module: {}", frame.module_path);
2509 }
2510 if let Some(reason) = frame.reason.as_ref() {
2511 let _ = writeln!(out, " unresolved_reason: {reason}");
2512 }
2513 if let Some(source) = frame.source.as_ref() {
2514 append_source_set(&mut out, " context", Some(source), &[], " ");
2515 }
2516 }
2517 out.trim_end().to_string()
2518}
2519
2520fn render_diff_snapshots_markdown(response: &McpDiffSnapshotsResponse) -> String {
2521 let mut out = String::new();
2522 let _ = writeln!(
2523 out,
2524 "from_snapshot_id: {}\nto_snapshot_id: {}",
2525 response.from_snapshot_id, response.to_snapshot_id
2526 );
2527 let _ = writeln!(out, "entity_added: {}", response.entity_added.join(", "));
2528 let _ = writeln!(
2529 out,
2530 "entity_removed: {}",
2531 response.entity_removed.join(", ")
2532 );
2533 let _ = writeln!(
2534 out,
2535 "waiting_on_added: {}",
2536 response.waiting_on_added.join(", ")
2537 );
2538 let _ = writeln!(
2539 out,
2540 "waiting_on_removed: {}",
2541 response.waiting_on_removed.join(", ")
2542 );
2543
2544 let _ = writeln!(out, "\nchannel_changes:");
2545 for change in &response.channel_changes {
2546 let _ = writeln!(
2547 out,
2548 "- {}: {} -> {}",
2549 change.entity_id, change.before, change.after
2550 );
2551 }
2552
2553 let _ = writeln!(out, "\ntask_changes:");
2554 for change in &response.task_changes {
2555 let before = change.awaiting_before.as_deref().unwrap_or("<none>");
2556 let after = change.awaiting_after.as_deref().unwrap_or("<none>");
2557 let _ = writeln!(out, "- {}: {} -> {}", change.entity_id, before, after);
2558 }
2559
2560 out.trim_end().to_string()
2561}
2562
2563fn append_source_set(
2564 out: &mut String,
2565 label: &str,
2566 source: Option<&McpSourceContext>,
2567 sources: &[McpSourceContext],
2568 indent: &str,
2569) {
2570 let mut all = Vec::<McpSourceContext>::new();
2571 if let Some(source) = source {
2572 all.push(source.clone());
2573 }
2574 for source in sources {
2575 if all.iter().any(|seen| seen.frame_id == source.frame_id) {
2576 continue;
2577 }
2578 all.push(source.clone());
2579 }
2580
2581 let _ = writeln!(out, "{indent}{label}:");
2582 if all.is_empty() {
2583 let _ = writeln!(out, "{indent} <source unavailable>");
2584 return;
2585 }
2586 for (idx, source) in all.iter().enumerate() {
2587 let _ = writeln!(out, "{indent} [{}] frame_id={}", idx + 1, source.frame_id);
2588 append_rendered_block(
2589 out,
2590 &render_source_snippet(source),
2591 &format!("{indent} "),
2592 );
2593 }
2594}
2595
2596fn append_rendered_block(out: &mut String, block: &str, indent: &str) {
2597 for line in block.lines() {
2598 let _ = writeln!(out, "{indent}{line}");
2599 }
2600}
2601
2602fn render_source_snippet(source: &McpSourceContext) -> String {
2603 let mut out = String::new();
2604 let _ = writeln!(out, "in {}", source.source_file);
2605
2606 let (start_line, body) = source_body_for_render(source);
2607 let mut rendered = 0usize;
2608 let mut truncated = false;
2609 for (index, raw_line) in body.lines().enumerate() {
2610 if rendered >= MAX_RENDERED_SOURCE_LINES {
2611 truncated = true;
2612 break;
2613 }
2614 let line = raw_line.trim_end();
2615 if line.is_empty() {
2616 continue;
2617 }
2618 let line_no = start_line.saturating_add(index as u32);
2619 let marker = if line_no == source.target_line {
2620 '>'
2621 } else {
2622 ' '
2623 };
2624 let _ = writeln!(out, "{marker}{line_no:>5} | {line}");
2625 rendered = rendered.saturating_add(1);
2626 }
2627
2628 if rendered == 0 {
2629 let _ = writeln!(
2630 out,
2631 ">{:>5} | <source text unavailable>",
2632 source.target_line
2633 );
2634 }
2635 if truncated {
2636 let _ = writeln!(out, " ... | ...");
2637 }
2638
2639 out.trim_end().to_string()
2640}
2641
2642fn source_body_for_render(source: &McpSourceContext) -> (u32, String) {
2643 if let (Some(compact_scope_text), Some(range)) = (
2644 source.compact_scope_text.as_ref(),
2645 source.compact_scope_range.as_ref(),
2646 ) && compact_target_line_visible(compact_scope_text, range, source.target_line)
2647 {
2648 return (range.start, compact_scope_text.clone());
2649 }
2650 if let Some(statement_text) = source.statement_text.as_ref()
2651 && !statement_text.trim().is_empty()
2652 {
2653 return (source.target_line, statement_text.clone());
2654 }
2655 if let Some(enclosing_fn_text) = source.enclosing_fn_text.as_ref()
2656 && !enclosing_fn_text.trim().is_empty()
2657 {
2658 return (source.target_line, enclosing_fn_text.clone());
2659 }
2660 (source.target_line, String::new())
2661}
2662
2663fn compact_target_line_visible(text: &str, range: &McpLineRange, target_line: u32) -> bool {
2664 if target_line < range.start || target_line > range.end {
2665 return false;
2666 }
2667 let index = target_line.saturating_sub(range.start) as usize;
2668 let Some(line) = text.lines().nth(index) else {
2669 return false;
2670 };
2671 let trimmed = line.trim();
2672 !trimmed.is_empty() && trimmed != "/* ... */"
2673}
2674
2675fn source_for_node(
2676 node: &WaitNode,
2677 sources: &HashMap<u64, McpSourceContext>,
2678) -> Option<McpSourceContext> {
2679 sources_for_node(node, sources).into_iter().next()
2680}
2681
2682fn sources_for_node(
2683 node: &WaitNode,
2684 sources: &HashMap<u64, McpSourceContext>,
2685) -> Vec<McpSourceContext> {
2686 sources_for_frame_ids(&node.frame_ids, sources)
2687}
2688
2689fn sources_for_frame_ids(
2690 frame_ids: &[FrameId],
2691 sources: &HashMap<u64, McpSourceContext>,
2692) -> Vec<McpSourceContext> {
2693 frame_ids
2694 .iter()
2695 .filter_map(|id| sources.get(&id.as_u64()).cloned())
2696 .collect()
2697}
2698
2699fn wait_node(
2700 process: &ProcessSnapshotView,
2701 entity: &Entity,
2702 backtrace_index: &HashMap<u64, &SnapshotBacktrace>,
2703 frame_catalog: &HashMap<u64, &SnapshotBacktraceFrame>,
2704) -> WaitNode {
2705 let frame_ids = selected_frames_for_entity(
2706 entity,
2707 backtrace_index,
2708 frame_catalog,
2709 SOURCE_FRAMES_PER_ITEM,
2710 );
2711 WaitNode {
2712 process_id: process.process_id.as_str().to_owned(),
2713 ptime_now_ms: process.ptime_now_ms,
2714 entity_id: entity.id.as_str().to_owned(),
2715 name: entity.name.clone(),
2716 kind: entity_kind_name(&entity.body).to_owned(),
2717 birth_ms: entity.birth.as_millis(),
2718 frame_ids,
2719 }
2720}
2721
2722fn compose_node_key(process_id: &ProcessId, entity_id: &EntityId) -> String {
2723 format!("{}::{}", process_id.as_str(), entity_id.as_str())
2724}
2725
2726fn backtrace_index(snapshot: &SnapshotCutResponse) -> HashMap<u64, &SnapshotBacktrace> {
2727 snapshot
2728 .backtraces
2729 .iter()
2730 .map(|bt| (bt.backtrace_id.as_u64(), bt))
2731 .collect()
2732}
2733
2734fn frame_catalog(snapshot: &SnapshotCutResponse) -> HashMap<u64, &SnapshotBacktraceFrame> {
2735 snapshot
2736 .frames
2737 .iter()
2738 .map(|record| (record.frame_id.as_u64(), &record.frame))
2739 .collect()
2740}
2741
2742fn frame_start_index_for_entity(entity: &Entity) -> usize {
2743 match &entity.body {
2744 EntityBody::Future(fut) => fut.skip_entry_frames.unwrap_or(0) as usize,
2745 _ => 0,
2746 }
2747}
2748
2749fn selected_frames_for_entity(
2750 entity: &Entity,
2751 backtrace_index: &HashMap<u64, &SnapshotBacktrace>,
2752 frame_catalog: &HashMap<u64, &SnapshotBacktraceFrame>,
2753 frame_count: usize,
2754) -> Vec<FrameId> {
2755 let Some(backtrace) = backtrace_index.get(&entity.backtrace.as_u64()) else {
2756 return Vec::new();
2757 };
2758 select_frames_for_backtrace(
2759 backtrace,
2760 frame_catalog,
2761 frame_start_index_for_entity(entity),
2762 frame_count,
2763 )
2764}
2765
2766fn selected_frames_for_backtrace_id(
2767 backtrace_id: u64,
2768 backtrace_index: &HashMap<u64, &SnapshotBacktrace>,
2769 frame_catalog: &HashMap<u64, &SnapshotBacktraceFrame>,
2770 app_skip_count: usize,
2771 frame_count: usize,
2772) -> Vec<FrameId> {
2773 let Some(backtrace) = backtrace_index.get(&backtrace_id) else {
2774 return Vec::new();
2775 };
2776 select_frames_for_backtrace(backtrace, frame_catalog, app_skip_count, frame_count)
2777}
2778
2779fn select_frames_for_backtrace(
2780 backtrace: &SnapshotBacktrace,
2781 frame_catalog: &HashMap<u64, &SnapshotBacktraceFrame>,
2782 app_skip_count: usize,
2783 frame_count: usize,
2784) -> Vec<FrameId> {
2785 if backtrace.frame_ids.is_empty() || frame_count == 0 {
2786 return Vec::new();
2787 }
2788
2789 let mut application_resolved = Vec::<FrameId>::new();
2790 let mut resolved_all = Vec::<FrameId>::new();
2791 for frame_id in &backtrace.frame_ids {
2792 let Some(frame) = frame_catalog.get(&frame_id.as_u64()) else {
2793 continue;
2794 };
2795 let SnapshotBacktraceFrame::Resolved(resolved) = frame else {
2796 continue;
2797 };
2798 resolved_all.push(*frame_id);
2799 let is_system =
2800 crate_from_function_name(resolved.function_name.as_str()).is_some_and(is_system_crate);
2801 if !is_system {
2802 application_resolved.push(*frame_id);
2803 }
2804 }
2805
2806 let mut out = Vec::<FrameId>::new();
2807 for frame_id in application_resolved.into_iter().skip(app_skip_count) {
2808 if !out.contains(&frame_id) {
2809 out.push(frame_id);
2810 }
2811 if out.len() >= frame_count {
2812 return out;
2813 }
2814 }
2815 for frame_id in resolved_all {
2816 if !out.contains(&frame_id) {
2817 out.push(frame_id);
2818 }
2819 if out.len() >= frame_count {
2820 return out;
2821 }
2822 }
2823 for frame_id in &backtrace.frame_ids {
2824 if !out.contains(frame_id) {
2825 out.push(*frame_id);
2826 }
2827 if out.len() >= frame_count {
2828 return out;
2829 }
2830 }
2831 out
2832}
2833
2834fn crate_from_function_name(function_name: &str) -> Option<&str> {
2835 let trimmed = function_name.trim();
2836 if trimmed.is_empty() {
2837 return None;
2838 }
2839 let bytes = trimmed.as_bytes();
2840 let mut index = 0usize;
2841 while index < bytes.len() {
2842 let ch = bytes[index] as char;
2843 if ch == '<' || ch == ' ' || ch == '&' || ch == '*' {
2844 index += 1;
2845 continue;
2846 }
2847 break;
2848 }
2849
2850 let rest = &trimmed[index..];
2851 let mut chars = rest.char_indices();
2852 let (_, first) = chars.next()?;
2853 if !(first == '_' || first.is_ascii_alphabetic()) {
2854 return None;
2855 }
2856 let mut end = first.len_utf8();
2857 for (idx, ch) in chars {
2858 if ch == '_' || ch.is_ascii_alphanumeric() {
2859 end = idx + ch.len_utf8();
2860 continue;
2861 }
2862 break;
2863 }
2864 Some(&rest[..end])
2865}
2866
2867fn is_system_crate(krate: &str) -> bool {
2868 SYSTEM_CRATES.contains(&krate)
2869}
2870
2871#[allow(clippy::too_many_arguments)]
2872fn make_chain(
2873 chain_num: usize,
2874 path: &[String],
2875 is_cycle: bool,
2876 nodes: &HashMap<String, WaitNode>,
2877 sources: &HashMap<u64, McpSourceContext>,
2878 edge_wait_source: &HashMap<(String, String), Option<McpSourceContext>>,
2879 edge_wait_sources: &HashMap<(String, String), Vec<McpSourceContext>>,
2880 truncated: bool,
2881) -> McpWaitChain {
2882 let mut chain_nodes = Vec::new();
2883 let mut node_ids = Vec::new();
2884 for key in path {
2885 if let Some(node) = nodes.get(key) {
2886 node_ids.push(node.entity_id.clone());
2887 chain_nodes.push(McpNodeSummary {
2888 process_id: node.process_id.clone(),
2889 entity_id: node.entity_id.clone(),
2890 name: node.name.clone(),
2891 kind: node.kind.clone(),
2892 source: source_for_node(node, sources),
2893 sources: sources_for_node(node, sources),
2894 });
2895 }
2896 }
2897
2898 let mut edges = Vec::new();
2899 for pair in path.windows(2) {
2900 let Some(src) = nodes.get(&pair[0]) else {
2901 continue;
2902 };
2903 let Some(dst) = nodes.get(&pair[1]) else {
2904 continue;
2905 };
2906 edges.push(McpChainEdge {
2907 src_entity_id: src.entity_id.clone(),
2908 dst_entity_id: dst.entity_id.clone(),
2909 wait_site_source: edge_wait_source
2910 .get(&(pair[0].clone(), pair[1].clone()))
2911 .cloned()
2912 .flatten(),
2913 wait_site_sources: edge_wait_sources
2914 .get(&(pair[0].clone(), pair[1].clone()))
2915 .cloned()
2916 .unwrap_or_default(),
2917 });
2918 }
2919
2920 let has_external_wake_source = chain_nodes
2921 .iter()
2922 .any(|node| node_has_external_wake_source(node.kind.as_str()));
2923
2924 let summary = if is_cycle {
2925 format!("cycle of {} nodes", chain_nodes.len())
2926 } else if truncated {
2927 format!(
2928 "chain truncated at {} nodes (max depth reached)",
2929 chain_nodes.len()
2930 )
2931 } else {
2932 format!("chain of {} nodes", chain_nodes.len())
2933 };
2934
2935 McpWaitChain {
2936 chain_id: format!("chain-{chain_num}"),
2937 is_cycle,
2938 has_external_wake_source,
2939 summary,
2940 node_ids,
2941 edges,
2942 nodes: chain_nodes,
2943 }
2944}
2945
2946fn node_has_external_wake_source(kind: &str) -> bool {
2947 matches!(
2948 kind,
2949 "mpsc_rx"
2950 | "broadcast_rx"
2951 | "watch_rx"
2952 | "oneshot_rx"
2953 | "notify"
2954 | "semaphore"
2955 | "net_accept"
2956 | "net_read"
2957 | "request"
2958 | "response"
2959 )
2960}
2961
2962fn count_waiters(
2963 edges: &[WaitEdgeRuntime],
2964 nodes: &HashMap<String, WaitNode>,
2965 channel_entity: &Entity,
2966) -> (u32, u32) {
2967 let mut sender_waiters = 0u32;
2968 let mut receiver_waiters = 0u32;
2969
2970 for edge in edges {
2971 if edge.dst_entity_id != channel_entity.id.as_str() {
2972 continue;
2973 }
2974 let Some(waiter) = nodes.get(&edge.src_key) else {
2975 continue;
2976 };
2977 if waiter.name.contains(".send") {
2978 sender_waiters = sender_waiters.saturating_add(1);
2979 } else {
2980 receiver_waiters = receiver_waiters.saturating_add(1);
2981 }
2982 }
2983
2984 (sender_waiters, receiver_waiters)
2985}
2986
2987fn entity_kind_name(body: &EntityBody) -> &'static str {
2988 match body {
2989 EntityBody::Future(_) => "future",
2990 EntityBody::Lock(_) => "lock",
2991 EntityBody::MpscTx(_) => "mpsc_tx",
2992 EntityBody::MpscRx(_) => "mpsc_rx",
2993 EntityBody::BroadcastTx(_) => "broadcast_tx",
2994 EntityBody::BroadcastRx(_) => "broadcast_rx",
2995 EntityBody::WatchTx(_) => "watch_tx",
2996 EntityBody::WatchRx(_) => "watch_rx",
2997 EntityBody::OneshotTx(_) => "oneshot_tx",
2998 EntityBody::OneshotRx(_) => "oneshot_rx",
2999 EntityBody::Semaphore(_) => "semaphore",
3000 EntityBody::Notify(_) => "notify",
3001 EntityBody::OnceCell(_) => "once_cell",
3002 EntityBody::Command(_) => "command",
3003 EntityBody::FileOp(_) => "file_op",
3004 EntityBody::NetConnect(_) => "net_connect",
3005 EntityBody::NetAccept(_) => "net_accept",
3006 EntityBody::NetRead(_) => "net_read",
3007 EntityBody::NetWrite(_) => "net_write",
3008 EntityBody::Request(_) => "request",
3009 EntityBody::Response(_) => "response",
3010 EntityBody::Custom(_) => "custom",
3011 EntityBody::Aether(_) => "aether",
3012 }
3013}
3014
3015fn is_channel_entity(body: &EntityBody) -> bool {
3016 matches!(
3017 body,
3018 EntityBody::MpscTx(_)
3019 | EntityBody::MpscRx(_)
3020 | EntityBody::BroadcastTx(_)
3021 | EntityBody::BroadcastRx(_)
3022 | EntityBody::WatchTx(_)
3023 | EntityBody::WatchRx(_)
3024 | EntityBody::OneshotTx(_)
3025 | EntityBody::OneshotRx(_)
3026 )
3027}
3028
3029fn is_task_entity(body: &EntityBody) -> bool {
3030 matches!(body, EntityBody::Future(_))
3031}
3032
3033fn channel_metrics(body: &EntityBody) -> (Option<u32>, Option<u32>, Option<String>, &'static str) {
3034 match body {
3035 EntityBody::MpscTx(tx) => (
3036 tx.capacity,
3037 Some(tx.queue_len),
3038 Some(format!(
3039 "queue_len={} capacity={:?}",
3040 tx.queue_len, tx.capacity
3041 )),
3042 "mpsc",
3043 ),
3044 EntityBody::MpscRx(_) => (None, None, None, "mpsc"),
3045 EntityBody::BroadcastTx(tx) => (
3046 Some(tx.capacity),
3047 None,
3048 Some(format!("capacity={}", tx.capacity)),
3049 "broadcast",
3050 ),
3051 EntityBody::BroadcastRx(rx) => (
3052 None,
3053 Some(rx.lag),
3054 Some(format!("lag={}", rx.lag)),
3055 "broadcast",
3056 ),
3057 EntityBody::WatchTx(tx) => (
3058 None,
3059 None,
3060 Some(format!(
3061 "last_update_at_ms={:?}",
3062 tx.last_update_at.map(|t| t.as_millis())
3063 )),
3064 "watch",
3065 ),
3066 EntityBody::WatchRx(_) => (None, None, None, "watch"),
3067 EntityBody::OneshotTx(tx) => (
3068 Some(1),
3069 Some(if tx.sent { 1 } else { 0 }),
3070 Some(format!("sent={}", tx.sent)),
3071 "oneshot",
3072 ),
3073 EntityBody::OneshotRx(_) => (Some(1), None, None, "oneshot"),
3074 _ => (None, None, None, "unknown"),
3075 }
3076}
3077
3078fn snapshot_entity_keys(snapshot: &SnapshotCutResponse) -> HashSet<String> {
3079 let mut out = HashSet::new();
3080 for process in &snapshot.processes {
3081 for entity in &process.snapshot.entities {
3082 out.insert(compose_node_key(&process.process_id, &entity.id));
3083 }
3084 }
3085 out
3086}
3087
3088fn snapshot_waiting_edges(snapshot: &SnapshotCutResponse) -> HashSet<String> {
3089 let mut out = HashSet::new();
3090 for process in &snapshot.processes {
3091 for edge in &process.snapshot.edges {
3092 if edge.kind == EdgeKind::WaitingOn {
3093 out.insert(format!(
3094 "{}::{}->{}",
3095 process.process_id.as_str(),
3096 edge.src.as_str(),
3097 edge.dst.as_str()
3098 ));
3099 }
3100 }
3101 }
3102 out
3103}
3104
3105fn snapshot_channel_fingerprint(snapshot: &SnapshotCutResponse) -> HashMap<String, String> {
3106 let mut out = HashMap::new();
3107 for process in &snapshot.processes {
3108 for entity in &process.snapshot.entities {
3109 if !is_channel_entity(&entity.body) {
3110 continue;
3111 }
3112 let key = compose_node_key(&process.process_id, &entity.id);
3113 let value = facet_json::to_string(&entity.body)
3114 .unwrap_or_else(|_| String::from("<encode_error>"));
3115 out.insert(key, value);
3116 }
3117 }
3118 out
3119}
3120
3121fn snapshot_task_wait_target(snapshot: &SnapshotCutResponse) -> HashMap<String, Option<String>> {
3122 let mut out = HashMap::new();
3123 for process in &snapshot.processes {
3124 let mut waiting_by_src: HashMap<String, String> = HashMap::new();
3125 for edge in &process.snapshot.edges {
3126 if edge.kind == EdgeKind::WaitingOn {
3127 waiting_by_src.insert(edge.src.as_str().to_owned(), edge.dst.as_str().to_owned());
3128 }
3129 }
3130 for entity in &process.snapshot.entities {
3131 if !is_task_entity(&entity.body) {
3132 continue;
3133 }
3134 let key = compose_node_key(&process.process_id, &entity.id);
3135 out.insert(key, waiting_by_src.get(entity.id.as_str()).cloned());
3136 }
3137 }
3138 out
3139}
3140
3141fn strongly_connected_components(
3142 keys: Vec<String>,
3143 adjacency: &HashMap<String, Vec<String>>,
3144) -> Vec<Vec<String>> {
3145 struct TarjanState {
3146 index: usize,
3147 stack: Vec<String>,
3148 on_stack: HashSet<String>,
3149 index_map: HashMap<String, usize>,
3150 lowlink_map: HashMap<String, usize>,
3151 components: Vec<Vec<String>>,
3152 }
3153
3154 fn strongconnect(node: String, adjacency: &HashMap<String, Vec<String>>, st: &mut TarjanState) {
3155 st.index_map.insert(node.clone(), st.index);
3156 st.lowlink_map.insert(node.clone(), st.index);
3157 st.index += 1;
3158 st.stack.push(node.clone());
3159 st.on_stack.insert(node.clone());
3160
3161 if let Some(neighbors) = adjacency.get(&node) {
3162 for next in neighbors {
3163 if !st.index_map.contains_key(next) {
3164 strongconnect(next.clone(), adjacency, st);
3165 let next_low = st.lowlink_map.get(next).copied().unwrap_or(usize::MAX);
3166 if let Some(node_low) = st.lowlink_map.get_mut(&node) {
3167 *node_low = (*node_low).min(next_low);
3168 }
3169 } else if st.on_stack.contains(next) {
3170 let next_idx = st.index_map.get(next).copied().unwrap_or(usize::MAX);
3171 if let Some(node_low) = st.lowlink_map.get_mut(&node) {
3172 *node_low = (*node_low).min(next_idx);
3173 }
3174 }
3175 }
3176 }
3177
3178 let node_idx = st.index_map.get(&node).copied().unwrap_or(usize::MAX);
3179 let node_low = st.lowlink_map.get(&node).copied().unwrap_or(usize::MAX);
3180 if node_low == node_idx {
3181 let mut component = Vec::new();
3182 while let Some(w) = st.stack.pop() {
3183 st.on_stack.remove(&w);
3184 component.push(w.clone());
3185 if w == node {
3186 break;
3187 }
3188 }
3189 st.components.push(component);
3190 }
3191 }
3192
3193 let mut state = TarjanState {
3194 index: 0,
3195 stack: Vec::new(),
3196 on_stack: HashSet::new(),
3197 index_map: HashMap::new(),
3198 lowlink_map: HashMap::new(),
3199 components: Vec::new(),
3200 };
3201
3202 for node in keys {
3203 if !state.index_map.contains_key(&node) {
3204 strongconnect(node, adjacency, &mut state);
3205 }
3206 }
3207
3208 state.components
3209}
3210
3211fn required_non_empty_string(
3212 args: &JsonMap<String, JsonValue>,
3213 field: &str,
3214) -> Result<String, String> {
3215 let value = args
3216 .get(field)
3217 .and_then(JsonValue::as_str)
3218 .ok_or_else(|| format!("missing required `{field}` string argument"))?
3219 .trim()
3220 .to_string();
3221 if value.is_empty() {
3222 return Err(format!("`{field}` must not be empty"));
3223 }
3224 Ok(value)
3225}
3226
3227fn optional_non_empty_string(
3228 args: &JsonMap<String, JsonValue>,
3229 field: &str,
3230) -> Result<Option<String>, String> {
3231 let Some(value) = args.get(field) else {
3232 return Ok(None);
3233 };
3234 let Some(value) = value.as_str() else {
3235 return Err(format!("`{field}` must be a string"));
3236 };
3237 let value = value.trim();
3238 if value.is_empty() {
3239 return Err(format!("`{field}` must not be empty when provided"));
3240 }
3241 Ok(Some(value.to_owned()))
3242}
3243
3244fn optional_string_list(
3245 args: &JsonMap<String, JsonValue>,
3246 field: &str,
3247) -> Result<Option<Vec<String>>, String> {
3248 let Some(raw) = args.get(field) else {
3249 return Ok(None);
3250 };
3251 let values = raw
3252 .as_array()
3253 .ok_or_else(|| format!("`{field}` must be an array of strings"))?;
3254 let mut out = Vec::with_capacity(values.len());
3255 for (index, value) in values.iter().enumerate() {
3256 let Some(text) = value.as_str() else {
3257 return Err(format!("`{field}[{index}]` must be a string"));
3258 };
3259 let text = text.trim();
3260 if text.is_empty() {
3261 return Err(format!("`{field}[{index}]` must not be empty"));
3262 }
3263 out.push(text.to_owned());
3264 }
3265 Ok(Some(out))
3266}
3267
3268fn optional_u32(args: &JsonMap<String, JsonValue>, field: &str) -> Result<Option<u32>, String> {
3269 let Some(raw) = args.get(field) else {
3270 return Ok(None);
3271 };
3272 let raw = raw
3273 .as_u64()
3274 .ok_or_else(|| format!("`{field}` must be an unsigned integer"))?;
3275 u32::try_from(raw)
3276 .map(Some)
3277 .map_err(|_| format!("`{field}` value {raw} exceeds u32::MAX"))
3278}
3279
3280fn optional_bool(args: &JsonMap<String, JsonValue>, field: &str) -> Result<Option<bool>, String> {
3281 let Some(raw) = args.get(field) else {
3282 return Ok(None);
3283 };
3284 raw.as_bool()
3285 .map(Some)
3286 .ok_or_else(|| format!("`{field}` must be a boolean"))
3287}
3288
3289fn optional_i64(args: &JsonMap<String, JsonValue>, field: &str) -> Result<Option<i64>, String> {
3290 let Some(raw) = args.get(field) else {
3291 return Ok(None);
3292 };
3293 raw.as_i64()
3294 .map(Some)
3295 .ok_or_else(|| format!("`{field}` must be an integer"))
3296}
3297
3298fn required_i64(args: &JsonMap<String, JsonValue>, field: &str) -> Result<i64, String> {
3299 args.get(field)
3300 .and_then(JsonValue::as_i64)
3301 .ok_or_else(|| format!("missing required `{field}` integer argument"))
3302}
3303
3304fn required_u64(args: &JsonMap<String, JsonValue>, field: &str) -> Result<u64, String> {
3305 let value = args
3306 .get(field)
3307 .and_then(JsonValue::as_u64)
3308 .ok_or_else(|| format!("missing required `{field}` unsigned integer argument"))?;
3309 Ok(value)
3310}
3311
3312fn required_u64_list(args: &JsonMap<String, JsonValue>, field: &str) -> Result<Vec<u64>, String> {
3313 let values = args
3314 .get(field)
3315 .and_then(JsonValue::as_array)
3316 .ok_or_else(|| format!("missing required `{field}` array argument"))?;
3317 let mut out = Vec::with_capacity(values.len());
3318 for (index, value) in values.iter().enumerate() {
3319 let numeric = value
3320 .as_u64()
3321 .ok_or_else(|| format!("`{field}[{index}]` must be an unsigned integer"))?;
3322 out.push(numeric);
3323 }
3324 Ok(out)
3325}
3326
3327async fn handle_streamable_http_get(
3328 headers: HeaderMap,
3329 uri: Uri,
3330 State(state): State<Arc<McpAppState>>,
3331 Extension(http_handler): Extension<Arc<McpHttpHandler>>,
3332) -> Result<Response, TransportServerError> {
3333 let request = McpHttpHandler::create_request(Method::GET, uri, headers, None);
3334 let generic_response = http_handler.handle_streamable_http(request, state).await?;
3335 Ok(convert_response(generic_response))
3336}
3337
3338async fn handle_streamable_http_post(
3339 headers: HeaderMap,
3340 uri: Uri,
3341 State(state): State<Arc<McpAppState>>,
3342 Extension(http_handler): Extension<Arc<McpHttpHandler>>,
3343 payload: String,
3344) -> Result<Response, TransportServerError> {
3345 let request =
3346 McpHttpHandler::create_request(Method::POST, uri, headers, Some(payload.as_str()));
3347 let generic_response = http_handler.handle_streamable_http(request, state).await?;
3348 Ok(convert_response(generic_response))
3349}
3350
3351async fn handle_streamable_http_delete(
3352 headers: HeaderMap,
3353 uri: Uri,
3354 State(state): State<Arc<McpAppState>>,
3355 Extension(http_handler): Extension<Arc<McpHttpHandler>>,
3356) -> Result<Response, TransportServerError> {
3357 let request = McpHttpHandler::create_request(Method::DELETE, uri, headers, None);
3358 let generic_response = http_handler.handle_streamable_http(request, state).await?;
3359 Ok(convert_response(generic_response))
3360}
3361
3362fn convert_response(response: axum::http::Response<GenericBody>) -> Response {
3363 let (parts, body) = response.into_parts();
3364 Response::from_parts(parts, axum::body::Body::new(body))
3365}
3366
3367#[cfg(test)]
3368mod tests {
3369 use super::*;
3370
3371 #[test]
3372 fn strongly_connected_components_finds_cycle_cluster() {
3373 let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
3374 adjacency.insert(String::from("a"), vec![String::from("b")]);
3375 adjacency.insert(String::from("b"), vec![String::from("c")]);
3376 adjacency.insert(String::from("c"), vec![String::from("a")]);
3377 adjacency.insert(String::from("d"), vec![String::from("e")]);
3378 adjacency.insert(String::from("e"), vec![]);
3379 let keys = vec![
3380 String::from("a"),
3381 String::from("b"),
3382 String::from("c"),
3383 String::from("d"),
3384 String::from("e"),
3385 ];
3386
3387 let mut components = strongly_connected_components(keys, &adjacency);
3388 components.iter_mut().for_each(|c| c.sort());
3389 components.sort_by_key(|c| c.first().cloned().unwrap_or_default());
3390
3391 assert_eq!(components.len(), 3);
3392 assert_eq!(
3393 components[0],
3394 vec![String::from("a"), String::from("b"), String::from("c")]
3395 );
3396 assert_eq!(components[1], vec![String::from("d")]);
3397 assert_eq!(components[2], vec![String::from("e")]);
3398 }
3399
3400 #[test]
3401 fn external_wake_source_kind_classification_is_strict() {
3402 assert!(node_has_external_wake_source("mpsc_rx"));
3403 assert!(node_has_external_wake_source("net_read"));
3404 assert!(!node_has_external_wake_source("future"));
3405 assert!(!node_has_external_wake_source("mpsc_tx"));
3406 }
3407
3408 #[test]
3409 fn crate_parser_handles_trait_impl_style_names() {
3410 assert_eq!(
3411 crate_from_function_name("<alloc::vec::Vec<u8> as core::fmt::Debug>::fmt"),
3412 Some("alloc")
3413 );
3414 assert_eq!(
3415 crate_from_function_name("tokio::runtime::context::enter"),
3416 Some("tokio")
3417 );
3418 }
3419
3420 #[test]
3421 fn source_snippet_renders_line_prefixed_text() {
3422 let rendered = render_source_snippet(&McpSourceContext {
3423 format: String::from("text/plain"),
3424 frame_id: 42,
3425 source_file: String::from("/tmp/example.rs"),
3426 target_line: 15,
3427 target_col: Some(9),
3428 total_lines: 200,
3429 statement_text: None,
3430 enclosing_fn_text: None,
3431 compact_scope_text: Some(String::from(
3432 "fn demo() {\n\n let x = 1;\n do_work(x).await?;\n\n}",
3433 )),
3434 compact_scope_range: Some(McpLineRange { start: 12, end: 17 }),
3435 });
3436 assert!(rendered.contains("in /tmp/example.rs"));
3437 assert!(rendered.contains(" 12 | fn demo() {"));
3438 assert!(rendered.contains("> 15 | do_work(x).await?;"));
3439 assert!(!rendered.contains("\n\n\n"));
3440 }
3441}