Skip to main content

aether_cli/acp/
mappers.rs

1use acp_utils::notifications::{ContextClearedParams, ContextUsageParams, SubAgentProgressParams};
2use acp_utils::server::AcpServerError;
3use aether_core::events::{AgentMessage, SubAgentProgressPayload};
4use agent_client_protocol::schema::{
5    self as acp, Content, ContentBlock, ContentChunk, Diff, HttpHeader, McpServer, PlanEntry, PlanEntryPriority,
6    PlanEntryStatus, SessionId, SessionNotification, SessionUpdate, StopReason, TextContent, ToolCall, ToolCallContent,
7    ToolCallId, ToolCallStatus, ToolCallUpdate, ToolCallUpdateFields,
8};
9use agent_client_protocol::{Client, ConnectionTo};
10use llm::{ToolCallError, ToolCallRequest, ToolCallResult};
11use mcp_utils::client::{McpServer as RuntimeMcpServer, McpTransport};
12use mcp_utils::display_meta::{PlanMetaStatus, ToolResultMeta};
13use rmcp::model::Prompt as McpPrompt;
14use rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig;
15
16use aether_core::context::ext::{SessionEvent, UserEvent};
17
18/// Converts an MCP Prompt to an ACP `AvailableCommand`
19///
20/// Strips the MCP namespace from the prompt name (e.g., "`coding__web`" -> "web")
21/// and creates a slash command that clients can invoke.
22pub fn map_mcp_prompt_to_available_command(prompt: &McpPrompt) -> acp::AvailableCommand {
23    // Extract the base command name by removing the namespace prefix
24    let command_name = prompt.name.split("__").last().unwrap_or(prompt.name.as_ref()).to_string();
25
26    // Extract the input hint from the unified prompt format's ARGUMENTS parameter,
27    // falling back to a generic hint.
28    let hint = prompt
29        .arguments
30        .as_ref()
31        .and_then(|args| args.iter().find(|a| a.name.as_str() == "ARGUMENTS").and_then(|a| a.description.as_deref()))
32        .unwrap_or("optional arguments");
33    let input = Some(acp::AvailableCommandInput::Unstructured(acp::UnstructuredCommandInput::new(hint)));
34
35    let description = prompt.description.clone().unwrap_or_else(|| "No description available".to_string());
36
37    acp::AvailableCommand::new(command_name, description).input(input)
38}
39
40/// Maps ACP MCP server definitions to internal MCP servers, skipping unsupported transports.
41pub fn map_acp_mcp_servers(servers: Vec<McpServer>) -> Vec<RuntimeMcpServer> {
42    servers
43        .into_iter()
44        .filter_map(|s| {
45            try_map_mcp_server(s).or_else(|| {
46                tracing::warn!("Unsupported ACP MCP server transport, skipping");
47                None
48            })
49        })
50        .collect()
51}
52
53fn try_map_mcp_server(server: McpServer) -> Option<RuntimeMcpServer> {
54    use McpServer::{Http, Sse, Stdio};
55    match server {
56        Stdio(stdio) => Some(RuntimeMcpServer::new(
57            stdio.name,
58            McpTransport::Stdio {
59                command: stdio.command.to_string_lossy().into_owned(),
60                args: stdio.args,
61                env: stdio.env.into_iter().map(|e| (e.name, e.value)).collect(),
62            },
63            false,
64        )),
65
66        Http(http) => Some(RuntimeMcpServer::new(
67            http.name,
68            McpTransport::Http { config: http_config(http.url, &http.headers) },
69            false,
70        )),
71
72        Sse(sse) => Some(RuntimeMcpServer::new(
73            sse.name,
74            McpTransport::Http { config: http_config(sse.url, &sse.headers) },
75            false,
76        )),
77
78        _ => None,
79    }
80}
81
82fn http_config(url: String, headers: &[HttpHeader]) -> StreamableHttpClientTransportConfig {
83    let auth_header = headers.iter().find(|h| h.name.eq_ignore_ascii_case("authorization")).map(|h| h.value.clone());
84
85    let mut config = StreamableHttpClientTransportConfig::with_uri(url);
86    if let Some(auth) = auth_header {
87        // rmcp's `auth_header` wants the bare token; it adds the `Bearer ` prefix itself.
88        let token = auth
89            .split_once(' ')
90            .filter(|(scheme, _)| scheme.eq_ignore_ascii_case("Bearer"))
91            .map_or(auth.as_str(), |(_, rest)| rest);
92        config = config.auth_header(token.to_string());
93    }
94    config
95}
96
97/// Converts Aether `AgentMessage` to ACP `SessionUpdate`
98pub fn map_agent_message_to_session_notification(
99    session_id: SessionId,
100    msg: &AgentMessage,
101) -> Option<SessionNotification> {
102    map_agent_message_to_notification(session_id, msg, NotificationMode::Live)
103}
104
105#[derive(Clone, Copy)]
106enum NotificationMode {
107    Live,
108    Replay,
109}
110
111fn map_agent_message_to_notification(
112    session_id: SessionId,
113    msg: &AgentMessage,
114    mode: NotificationMode,
115) -> Option<SessionNotification> {
116    match msg {
117        AgentMessage::Text { chunk, is_complete, .. } => {
118            map_chunk_to_notification(session_id, chunk, *is_complete, mode, SessionUpdate::AgentMessageChunk)
119        }
120
121        AgentMessage::Thought { chunk, is_complete, .. } => {
122            map_chunk_to_notification(session_id, chunk, *is_complete, mode, SessionUpdate::AgentThoughtChunk)
123        }
124
125        AgentMessage::ToolCall { request, .. } => Some(map_tool_call_to_notification(session_id, request)),
126
127        AgentMessage::ToolCallUpdate { tool_call_id, chunk, .. } => {
128            Some(map_tool_call_update_to_notification(session_id, tool_call_id, chunk))
129        }
130
131        AgentMessage::ToolResult { result, result_meta, .. } => {
132            Some(map_tool_result_to_notification(session_id, result, result_meta.as_ref()))
133        }
134
135        AgentMessage::ToolError { error, .. } => Some(map_tool_error_to_notification(session_id, error)),
136
137        AgentMessage::ToolProgress { request, progress, total, message } => {
138            map_tool_progress_to_notification(session_id, request, *progress, *total, message.as_ref())
139        }
140
141        AgentMessage::Error { message } => Some(acp::SessionNotification::new(
142            session_id,
143            SessionUpdate::AgentMessageChunk(ContentChunk::new(ContentBlock::Text(TextContent::new(format!(
144                "[Error] {message}"
145            ))))),
146        )),
147
148        AgentMessage::ContextUsageUpdate { .. }
149        | AgentMessage::ContextCleared
150        | AgentMessage::Cancelled { .. }
151        | AgentMessage::Done
152        | AgentMessage::ContextCompactionStarted { .. }
153        | AgentMessage::ContextCompactionResult { .. }
154        | AgentMessage::AutoContinue { .. }
155        | AgentMessage::Retrying { .. }
156        | AgentMessage::ModelSwitched { .. } => None,
157    }
158}
159
160/// Typed union of agent-side extension notifications that the relay forwards
161/// to the client. Each variant serializes to its own `_aether/*` wire method
162/// and is sent via [`ConnectionTo<Client>::send_notification`].
163pub enum AgentExtNotification {
164    ContextUsage(ContextUsageParams),
165    ContextCleared(ContextClearedParams),
166    SubAgentProgress(SubAgentProgressParams),
167}
168
169pub fn try_into_agent_notification(msg: &AgentMessage) -> Option<AgentExtNotification> {
170    match msg {
171        AgentMessage::ContextUsageUpdate {
172            usage_ratio,
173            context_limit,
174            input_tokens,
175            output_tokens,
176            cache_read_tokens,
177            cache_creation_tokens,
178            reasoning_tokens,
179            total_input_tokens,
180            total_output_tokens,
181            total_cache_read_tokens,
182            total_cache_creation_tokens,
183            total_reasoning_tokens,
184        } => Some(AgentExtNotification::ContextUsage(ContextUsageParams {
185            usage_ratio: *usage_ratio,
186            context_limit: *context_limit,
187            input_tokens: *input_tokens,
188            output_tokens: *output_tokens,
189            cache_read_tokens: *cache_read_tokens,
190            cache_creation_tokens: *cache_creation_tokens,
191            reasoning_tokens: *reasoning_tokens,
192            total_input_tokens: *total_input_tokens,
193            total_output_tokens: *total_output_tokens,
194            total_cache_read_tokens: *total_cache_read_tokens,
195            total_cache_creation_tokens: *total_cache_creation_tokens,
196            total_reasoning_tokens: *total_reasoning_tokens,
197        })),
198        AgentMessage::ToolProgress { request, message, .. } => {
199            let msg_str = message.as_ref()?;
200            let params = try_parse_sub_agent_progress(msg_str, request)?;
201            Some(AgentExtNotification::SubAgentProgress(params))
202        }
203        AgentMessage::ContextCleared => Some(AgentExtNotification::ContextCleared(ContextClearedParams::default())),
204        _ => None,
205    }
206}
207
208/// If the tool result carries plan metadata, build a `SessionUpdate::Plan` notification.
209pub fn try_extract_plan_notification(
210    session_id: SessionId,
211    result_meta: Option<&ToolResultMeta>,
212) -> Option<SessionNotification> {
213    let plan_meta = result_meta?.plan.as_ref()?;
214    let entries = plan_meta
215        .entries
216        .iter()
217        .map(|e| PlanEntry::new(e.content.clone(), PlanEntryPriority::Medium, plan_status_to_acp(e.status)))
218        .collect();
219    Some(SessionNotification::new(session_id, SessionUpdate::Plan(acp::Plan::new(entries))))
220}
221
222/// Convert internal plan status to ACP protocol status.
223fn plan_status_to_acp(status: PlanMetaStatus) -> PlanEntryStatus {
224    match status {
225        PlanMetaStatus::InProgress => PlanEntryStatus::InProgress,
226        PlanMetaStatus::Completed => PlanEntryStatus::Completed,
227        PlanMetaStatus::Pending => PlanEntryStatus::Pending,
228    }
229}
230
231/// Determines the stop reason from the final agent message
232pub fn map_agent_message_to_stop_reason(msg: &AgentMessage) -> acp::StopReason {
233    match msg {
234        AgentMessage::Cancelled { .. } => StopReason::Cancelled,
235        _ => StopReason::EndTurn,
236    }
237}
238
239fn map_chunk_to_notification(
240    session_id: SessionId,
241    chunk: &str,
242    is_complete: bool,
243    mode: NotificationMode,
244    wrap: fn(ContentChunk) -> SessionUpdate,
245) -> Option<SessionNotification> {
246    match mode {
247        // Skip the final completion message to avoid sending duplicate content.
248        // The client has already received all the chunks during streaming.
249        NotificationMode::Live if is_complete => return None,
250        NotificationMode::Replay if !is_complete => return None,
251        NotificationMode::Live | NotificationMode::Replay => {}
252    }
253
254    Some(acp::SessionNotification::new(
255        session_id,
256        wrap(ContentChunk::new(ContentBlock::Text(TextContent::new(chunk.to_owned())))),
257    ))
258}
259
260fn map_tool_call_to_notification(session_id: SessionId, request: &ToolCallRequest) -> SessionNotification {
261    let raw_input = serde_json::from_str(&request.arguments).ok();
262    SessionNotification::new(
263        session_id,
264        SessionUpdate::ToolCall(
265            ToolCall::new(ToolCallId::new(request.id.clone()), humanize_tool_name(&request.name))
266                .status(acp::ToolCallStatus::InProgress)
267                .raw_input(raw_input),
268        ),
269    )
270}
271
272fn parse_tool_call_chunk(chunk: &str) -> serde_json::Value {
273    serde_json::from_str(chunk).unwrap_or_else(|_| serde_json::Value::String(chunk.to_string()))
274}
275
276fn map_tool_call_update_to_notification(session_id: SessionId, tool_call_id: &str, chunk: &str) -> SessionNotification {
277    let fields = ToolCallUpdateFields::new().status(ToolCallStatus::InProgress).raw_input(parse_tool_call_chunk(chunk));
278
279    SessionNotification::new(
280        session_id,
281        SessionUpdate::ToolCallUpdate(ToolCallUpdate::new(ToolCallId::new(tool_call_id.to_string()), fields)),
282    )
283}
284
285/// Produces the initial human-readable title for a tool call (e.g., "Read file").
286/// This is sent when the tool call starts.
287fn humanize_tool_name(name: &str) -> String {
288    let base = name.split("__").last().unwrap_or(name);
289    let mut result = base.replace('_', " ");
290    if let Some(first) = result.get_mut(0..1) {
291        first.make_ascii_uppercase();
292    }
293    result
294}
295
296fn map_tool_result_to_notification(
297    session_id: SessionId,
298    result: &ToolCallResult,
299    result_meta: Option<&ToolResultMeta>,
300) -> SessionNotification {
301    let mut content =
302        vec![ToolCallContent::Content(Content::new(ContentBlock::Text(TextContent::new(result.result.clone()))))];
303
304    if let Some(rm) = result_meta
305        && let Some(fd) = &rm.file_diff
306    {
307        let mut diff = Diff::new(&fd.path, &fd.new_text);
308        if let Some(old) = &fd.old_text {
309            diff = diff.old_text(old.clone());
310        }
311        content.push(ToolCallContent::Diff(diff));
312    }
313
314    let mut fields = ToolCallUpdateFields::new().status(ToolCallStatus::Completed).content(content);
315
316    if let Some(rm) = result_meta {
317        fields = fields.title(&rm.display.title);
318    }
319
320    let mut update = ToolCallUpdate::new(ToolCallId::new(result.id.clone()), fields);
321
322    if let Some(rm) = result_meta
323        && !rm.display.value.is_empty()
324    {
325        let mut meta_map = serde_json::Map::new();
326        meta_map.insert("display_value".into(), rm.display.value.clone().into());
327        update = update.meta(meta_map);
328    }
329
330    SessionNotification::new(session_id, SessionUpdate::ToolCallUpdate(update))
331}
332
333fn map_tool_error_to_notification(session_id: SessionId, error: &ToolCallError) -> SessionNotification {
334    SessionNotification::new(
335        session_id,
336        SessionUpdate::ToolCallUpdate(ToolCallUpdate::new(
337            ToolCallId::new(error.id.clone()),
338            ToolCallUpdateFields::new().status(ToolCallStatus::Failed).content(vec![ToolCallContent::Content(
339                Content::new(ContentBlock::Text(TextContent::new(error.error.clone()))),
340            )]),
341        )),
342    )
343}
344
345fn map_tool_progress_to_notification(
346    session_id: SessionId,
347    request: &ToolCallRequest,
348    progress: f64,
349    total: Option<f64>,
350    message: Option<&String>,
351) -> Option<SessionNotification> {
352    tracing::debug!("Tool progress: {message:?}");
353
354    if message.and_then(|msg_str| try_parse_sub_agent_progress(msg_str, request)).is_some() {
355        return None;
356    }
357
358    if let Some(result_meta) = message.and_then(|m| try_parse_display_meta(m)) {
359        let fields = ToolCallUpdateFields::new().status(ToolCallStatus::InProgress).title(&result_meta.display.title);
360
361        let mut update = ToolCallUpdate::new(ToolCallId::new(request.id.clone()), fields);
362
363        if !result_meta.display.value.is_empty() {
364            let mut meta_map = serde_json::Map::new();
365            meta_map.insert("display_value".into(), result_meta.display.value.into());
366            update = update.meta(meta_map);
367        }
368
369        return Some(SessionNotification::new(session_id, SessionUpdate::ToolCallUpdate(update)));
370    }
371
372    let total_str = total.map_or_else(|| "?".to_string(), |t| t.to_string());
373    let progress_text = message
374        .map_or_else(|| format!("Progress: {progress}/{total_str}"), |msg| format!("{msg} ({progress}/{total_str})"));
375
376    Some(SessionNotification::new(
377        session_id,
378        SessionUpdate::ToolCallUpdate(ToolCallUpdate::new(
379            ToolCallId::new(request.id.clone()),
380            ToolCallUpdateFields::new().status(ToolCallStatus::InProgress).content(vec![ToolCallContent::Content(
381                Content::new(ContentBlock::Text(TextContent::new(progress_text))),
382            )]),
383        )),
384    ))
385}
386
387/// Replay session events to the client as ACP notifications.
388pub async fn replay_to_client(events: &[SessionEvent], connection: &ConnectionTo<Client>, session_id: &SessionId) {
389    for notif in replay_events_to_notifications(events, session_id) {
390        if let Err(e) = connection.send_notification(notif).map_err(|e| AcpServerError::protocol("session/update", e)) {
391            tracing::error!("Failed to send replay notification: {e:?}");
392        }
393    }
394}
395
396pub fn replay_events_to_notifications(events: &[SessionEvent], session_id: &SessionId) -> Vec<SessionNotification> {
397    let mut out = Vec::new();
398    for event in events {
399        match event {
400            SessionEvent::User(UserEvent::Message { content }) => {
401                for block in content {
402                    out.push(SessionNotification::new(
403                        session_id.clone(),
404                        SessionUpdate::UserMessageChunk(ContentChunk::new(map_user_content_block(block))),
405                    ));
406                }
407            }
408            SessionEvent::Agent(message) => {
409                out.extend(map_agent_message_to_notification(session_id.clone(), message, NotificationMode::Replay));
410            }
411            SessionEvent::User(_) => {}
412        }
413    }
414    out
415}
416
417fn map_user_content_block(block: &llm::ContentBlock) -> ContentBlock {
418    match block {
419        llm::ContentBlock::Text { text } => ContentBlock::Text(TextContent::new(text.clone())),
420        llm::ContentBlock::Image { data, mime_type } => {
421            ContentBlock::Image(acp::ImageContent::new(data.clone(), mime_type.clone()))
422        }
423        llm::ContentBlock::Audio { data, mime_type } => {
424            ContentBlock::Audio(acp::AudioContent::new(data.clone(), mime_type.clone()))
425        }
426    }
427}
428
429fn try_parse_display_meta(message: &str) -> Option<ToolResultMeta> {
430    serde_json::from_str::<ToolResultMeta>(message).ok()
431}
432
433/// Attempt to parse a tool progress message as sub-agent progress.
434fn try_parse_sub_agent_progress(message: &str, request: &llm::ToolCallRequest) -> Option<SubAgentProgressParams> {
435    let payload: SubAgentProgressPayload = serde_json::from_str(message).ok()?;
436
437    Some(SubAgentProgressParams {
438        parent_tool_id: request.id.clone(),
439        task_id: payload.task_id,
440        agent_name: payload.agent_name,
441        event: (&payload.event).into(),
442    })
443}
444
445#[cfg(test)]
446mod tests {
447    use super::*;
448    use acp_utils::notifications::SubAgentEvent;
449    use llm::ToolCallRequest;
450
451    #[test]
452    fn test_tool_progress_with_sub_agent_payload_emits_ext_notification() {
453        let session_id = acp::SessionId::new("test-session");
454
455        let payload = SubAgentProgressPayload {
456            task_id: "task_1".to_string(),
457            agent_name: "sub-agent".to_string(),
458            event: AgentMessage::Text {
459                message_id: "msg_1".to_string(),
460                chunk: "Hello".to_string(),
461                is_complete: false,
462                model_name: "TestModel".to_string(),
463            },
464        };
465        let serialized_msg = serde_json::to_string(&payload).unwrap();
466
467        let tool_progress = AgentMessage::ToolProgress {
468            request: ToolCallRequest {
469                id: "call_123".to_string(),
470                name: "plugins__spawn_subagent".to_string(),
471                arguments: "{}".to_string(),
472            },
473            progress: 42.0,
474            total: Some(100.0),
475            message: Some(serialized_msg.clone()),
476        };
477
478        let notification = map_agent_message_to_session_notification(session_id.clone(), &tool_progress);
479
480        assert!(notification.is_none());
481
482        let agent_notif = try_into_agent_notification(&tool_progress).expect("agent notification");
483        match agent_notif {
484            AgentExtNotification::SubAgentProgress(params) => {
485                assert_eq!(params.parent_tool_id, "call_123");
486                assert_eq!(params.task_id, "task_1");
487                assert_eq!(params.agent_name, "sub-agent");
488                assert!(matches!(params.event, SubAgentEvent::Other));
489            }
490            _ => panic!("expected SubAgentProgress"),
491        }
492    }
493
494    #[test]
495    fn replay_emits_user_media_chunks_in_order() {
496        let session_id = acp::SessionId::new("test-session");
497        let events = vec![SessionEvent::User(UserEvent::Message {
498            content: vec![
499                llm::ContentBlock::text("hello"),
500                llm::ContentBlock::Image { data: "aW1n".to_string(), mime_type: "image/png".to_string() },
501                llm::ContentBlock::Audio { data: "YXVkaW8=".to_string(), mime_type: "audio/wav".to_string() },
502            ],
503        })];
504
505        let notifications = replay_events_to_notifications(&events, &session_id);
506        let updates: Vec<_> = notifications.into_iter().map(|n| n.update).collect();
507        assert!(matches!(
508            &updates[0],
509            acp::SessionUpdate::UserMessageChunk(chunk)
510                if matches!(&chunk.content, acp::ContentBlock::Text(text) if text.text == "hello")
511        ));
512        assert!(matches!(
513            &updates[1],
514            acp::SessionUpdate::UserMessageChunk(chunk)
515                if matches!(&chunk.content, acp::ContentBlock::Image(_))
516        ));
517        assert!(matches!(
518            &updates[2],
519            acp::SessionUpdate::UserMessageChunk(chunk)
520                if matches!(&chunk.content, acp::ContentBlock::Audio(_))
521        ));
522    }
523
524    #[test]
525    fn test_thought_maps_to_agent_thought_chunk() {
526        let session_id = acp::SessionId::new("test-session");
527        let thought = AgentMessage::Thought {
528            message_id: "msg_1".to_string(),
529            chunk: "thinking...".to_string(),
530            is_complete: false,
531            model_name: "TestModel".to_string(),
532        };
533
534        let notification = map_agent_message_to_session_notification(session_id, &thought).expect("notification");
535
536        match notification.update {
537            acp::SessionUpdate::AgentThoughtChunk(chunk) => match chunk.content {
538                acp::ContentBlock::Text(text) => assert_eq!(text.text, "thinking..."),
539                other => panic!("Expected text content, got {other:?}"),
540            },
541            other => panic!("Expected AgentThoughtChunk, got {other:?}"),
542        }
543    }
544
545    #[test]
546    fn test_tool_call_maps_to_tool_call_notification() {
547        let session_id = acp::SessionId::new("test-session");
548        let message = AgentMessage::ToolCall {
549            request: ToolCallRequest {
550                id: "call_1".to_string(),
551                name: "coding__read_file".to_string(),
552                arguments: "{}".to_string(),
553            },
554            model_name: "TestModel".to_string(),
555        };
556
557        let notification = map_agent_message_to_session_notification(session_id, &message).expect("notification");
558
559        match notification.update {
560            acp::SessionUpdate::ToolCall(tool_call) => {
561                assert_eq!(tool_call.tool_call_id.0.as_ref(), "call_1");
562                assert_eq!(tool_call.title, "Read file");
563                assert_eq!(tool_call.status, acp::ToolCallStatus::InProgress);
564            }
565            other => panic!("Expected ToolCall, got {other:?}"),
566        }
567    }
568
569    #[test]
570    fn test_tool_call_update_maps_to_tool_call_update_notification() {
571        let session_id = acp::SessionId::new("test-session");
572        let message = AgentMessage::ToolCallUpdate {
573            tool_call_id: "call_1".to_string(),
574            chunk: r#"{"filePath":"Cargo.toml"}"#.to_string(),
575            model_name: "TestModel".to_string(),
576        };
577
578        let notification = map_agent_message_to_session_notification(session_id, &message).expect("notification");
579
580        match notification.update {
581            acp::SessionUpdate::ToolCallUpdate(update) => {
582                assert_eq!(update.tool_call_id.0.as_ref(), "call_1");
583                assert_eq!(update.fields.status, Some(acp::ToolCallStatus::InProgress));
584                assert_eq!(update.fields.raw_input, Some(serde_json::json!({ "filePath": "Cargo.toml" })));
585            }
586            other => panic!("Expected ToolCallUpdate, got {other:?}"),
587        }
588    }
589
590    #[test]
591    fn test_tool_call_update_has_same_live_and_replay_mapping() {
592        let session_id = acp::SessionId::new("test-session");
593        let message = AgentMessage::ToolCallUpdate {
594            tool_call_id: "call_1".to_string(),
595            chunk: r#"{"filePath":"Cargo.toml"}"#.to_string(),
596            model_name: "TestModel".to_string(),
597        };
598
599        let live = map_agent_message_to_notification(session_id.clone(), &message, NotificationMode::Live)
600            .expect("live notification");
601        let replay = map_agent_message_to_notification(session_id, &message, NotificationMode::Replay)
602            .expect("replay notification");
603
604        match (live.update, replay.update) {
605            (acp::SessionUpdate::ToolCallUpdate(live), acp::SessionUpdate::ToolCallUpdate(replay)) => {
606                assert_eq!(live.tool_call_id.0, replay.tool_call_id.0);
607                assert_eq!(live.fields.status, replay.fields.status);
608                assert_eq!(live.fields.raw_input, replay.fields.raw_input);
609            }
610            other => panic!("Expected ToolCallUpdate pair, got {other:?}"),
611        }
612    }
613
614    #[test]
615    fn test_live_mapping_skips_completed_chunks_but_replay_keeps_them() {
616        let cases: Vec<(AgentMessage, &str)> = vec![
617            (
618                AgentMessage::Text {
619                    message_id: "msg_1".to_string(),
620                    chunk: "done".to_string(),
621                    is_complete: true,
622                    model_name: "TestModel".to_string(),
623                },
624                "done",
625            ),
626            (
627                AgentMessage::Thought {
628                    message_id: "msg_1".to_string(),
629                    chunk: "final reasoning".to_string(),
630                    is_complete: true,
631                    model_name: "TestModel".to_string(),
632                },
633                "final reasoning",
634            ),
635        ];
636
637        for (message, expected_text) in cases {
638            let session_id = acp::SessionId::new("test-session");
639            assert!(
640                map_agent_message_to_notification(session_id.clone(), &message, NotificationMode::Live).is_none(),
641                "live mode should skip completed chunk"
642            );
643
644            let notification = map_agent_message_to_notification(session_id, &message, NotificationMode::Replay)
645                .expect("replay notification");
646
647            match notification.update {
648                acp::SessionUpdate::AgentMessageChunk(chunk) | acp::SessionUpdate::AgentThoughtChunk(chunk) => {
649                    match chunk.content {
650                        acp::ContentBlock::Text(text) => assert_eq!(text.text, expected_text),
651                        other => panic!("Expected text content, got {other:?}"),
652                    }
653                }
654                other => panic!("Expected chunk update, got {other:?}"),
655            }
656        }
657    }
658
659    #[test]
660    fn test_context_cleared_maps_to_agent_notification() {
661        let notif = try_into_agent_notification(&AgentMessage::ContextCleared)
662            .expect("context cleared should emit agent notification");
663        assert!(matches!(notif, AgentExtNotification::ContextCleared(_)));
664    }
665
666    #[test]
667    fn test_tool_progress_with_invalid_json_falls_back_to_simple_message() {
668        let session_id = acp::SessionId::new("test-session");
669
670        // Simulate a tool progress message with invalid JSON
671        let tool_progress = AgentMessage::ToolProgress {
672            request: ToolCallRequest {
673                id: "call_456".to_string(),
674                name: "some_tool".to_string(),
675                arguments: "{}".to_string(),
676            },
677            progress: 50.0,
678            total: None,
679            message: Some("not valid json".to_string()),
680        };
681
682        let notification = map_agent_message_to_session_notification(session_id.clone(), &tool_progress);
683
684        assert!(notification.is_some());
685
686        // Should still produce a notification with the message as-is
687        let notification = notification.unwrap();
688        match notification.update {
689            acp::SessionUpdate::ToolCallUpdate(update) => {
690                if let Some(content) = &update.fields.content
691                    && let acp::ToolCallContent::Content(c) = &content[0]
692                    && let acp::ContentBlock::Text(text) = &c.content
693                {
694                    // Should contain the original message
695                    assert!(text.text.contains("not valid json"));
696                }
697            }
698            _ => panic!("Expected ToolCallUpdate"),
699        }
700    }
701
702    #[test]
703    fn test_map_acp_stdio_server() {
704        let server = acp::McpServer::Stdio(
705            acp::McpServerStdio::new("my-server", "/usr/bin/server")
706                .args(vec!["--port".into(), "8080".into()])
707                .env(vec![acp::EnvVariable::new("FOO", "bar")]),
708        );
709
710        let configs = map_acp_mcp_servers(vec![server]);
711        assert_eq!(configs.len(), 1);
712
713        match &configs[0].transport {
714            McpTransport::Stdio { command, args, env } => {
715                assert_eq!(configs[0].name, "my-server");
716                assert_eq!(command, "/usr/bin/server");
717                assert_eq!(args, &["--port", "8080"]);
718                assert_eq!(env.get("FOO").unwrap(), "bar");
719            }
720            other => panic!("Expected Stdio, got {other:?}"),
721        }
722    }
723
724    #[test]
725    fn test_map_acp_http_server() {
726        let server = acp::McpServer::Http(
727            acp::McpServerHttp::new("http-server", "https://example.com/mcp")
728                .headers(vec![acp::HttpHeader::new("Authorization", "Bearer token123")]),
729        );
730
731        let configs = map_acp_mcp_servers(vec![server]);
732        assert_eq!(configs.len(), 1);
733
734        match &configs[0].transport {
735            McpTransport::Http { config } => {
736                assert_eq!(configs[0].name, "http-server");
737                assert_eq!(config.uri.as_ref(), "https://example.com/mcp");
738                assert_eq!(config.auth_header.as_deref(), Some("token123"));
739            }
740            other => panic!("Expected Http, got {other:?}"),
741        }
742    }
743
744    #[test]
745    fn test_http_auth_header_strips_bearer_case_insensitively() {
746        let cases = [
747            ("Bearer token123", "token123"),
748            ("bearer token123", "token123"),
749            ("BEARER token123", "token123"),
750            ("bEaReR token123", "token123"),
751            // Non-Bearer scheme: pass through verbatim. rmcp will then prefix
752            // with "Bearer ", which is the contract for non-bearer auth too.
753            ("Token foo", "Token foo"),
754            ("token123", "token123"),
755        ];
756
757        for (input, expected) in cases {
758            let server = acp::McpServer::Http(
759                acp::McpServerHttp::new("http-server", "https://example.com/mcp")
760                    .headers(vec![acp::HttpHeader::new("Authorization", input)]),
761            );
762            let configs = map_acp_mcp_servers(vec![server]);
763            match &configs[0].transport {
764                McpTransport::Http { config } => {
765                    assert_eq!(config.auth_header.as_deref(), Some(expected), "input was {input:?}");
766                }
767                other => panic!("Expected Http, got {other:?}"),
768            }
769        }
770    }
771
772    #[test]
773    fn test_map_acp_sse_server() {
774        let server = acp::McpServer::Sse(acp::McpServerSse::new("sse-server", "https://example.com/sse"));
775
776        let configs = map_acp_mcp_servers(vec![server]);
777        assert_eq!(configs.len(), 1);
778
779        match &configs[0].transport {
780            McpTransport::Http { config } => {
781                assert_eq!(configs[0].name, "sse-server");
782                assert_eq!(config.uri.as_ref(), "https://example.com/sse");
783                assert_eq!(config.auth_header, None);
784            }
785            other => panic!("Expected Http, got {other:?}"),
786        }
787    }
788
789    #[test]
790    fn test_humanize_tool_name() {
791        assert_eq!(humanize_tool_name("coding__read_file"), "Read file");
792        assert_eq!(humanize_tool_name("read_file"), "Read file");
793        assert_eq!(humanize_tool_name("bash"), "Bash");
794        assert_eq!(humanize_tool_name("plugins__coding__read_file"), "Read file");
795    }
796
797    #[test]
798    fn test_result_with_result_meta_sets_meta() {
799        use mcp_utils::display_meta::ToolDisplayMeta;
800
801        let session_id = acp::SessionId::new("test-session");
802        let result = ToolCallResult {
803            id: "call_1".to_string(),
804            name: "coding__read_file".to_string(),
805            arguments: "{}".to_string(),
806            result: "file contents".to_string(),
807        };
808        let rm: ToolResultMeta = ToolDisplayMeta::new("Read file", "Cargo.toml, 156 lines").into();
809
810        let notification = map_tool_result_to_notification(session_id, &result, Some(&rm));
811        match notification.update {
812            acp::SessionUpdate::ToolCallUpdate(update) => {
813                assert_eq!(update.fields.title.as_deref(), Some("Read file"), "native title should be set");
814                let meta = update.meta.expect("meta should be present");
815                assert_eq!(
816                    meta.get("display_value").and_then(|v| v.as_str()),
817                    Some("Cargo.toml, 156 lines"),
818                    "display_value should be a flat key in _meta"
819                );
820                assert!(meta.get("display").is_none(), "old nested display object should not be in _meta");
821            }
822            other => panic!("Expected ToolCallUpdate, got {other:?}"),
823        }
824    }
825
826    #[test]
827    fn test_result_without_result_meta() {
828        let session_id = acp::SessionId::new("test-session");
829        let result = ToolCallResult {
830            id: "call_1".to_string(),
831            name: "external__some_tool".to_string(),
832            arguments: "{}".to_string(),
833            result: "ok".to_string(),
834        };
835
836        let notification = map_tool_result_to_notification(session_id, &result, None);
837        match notification.update {
838            acp::SessionUpdate::ToolCallUpdate(update) => {
839                assert!(update.fields.title.is_none());
840                assert!(update.meta.is_none());
841            }
842            other => panic!("Expected ToolCallUpdate, got {other:?}"),
843        }
844    }
845
846    #[test]
847    fn test_plan_notification_extracted_from_result_meta() {
848        use mcp_utils::display_meta::{PlanMeta, PlanMetaEntry, PlanMetaStatus, ToolDisplayMeta};
849
850        let session_id = acp::SessionId::new("test-session");
851        let meta = ToolResultMeta::with_plan(
852            ToolDisplayMeta::new("Todo", "Research AI agents"),
853            PlanMeta {
854                entries: vec![
855                    PlanMetaEntry { content: "Research AI agents".to_string(), status: PlanMetaStatus::InProgress },
856                    PlanMetaEntry { content: "Write tests".to_string(), status: PlanMetaStatus::Pending },
857                ],
858            },
859        );
860
861        let notification = try_extract_plan_notification(session_id, Some(&meta)).expect("should produce plan");
862        match notification.update {
863            acp::SessionUpdate::Plan(plan) => {
864                assert_eq!(plan.entries.len(), 2);
865                assert_eq!(plan.entries[0].content, "Research AI agents");
866                assert_eq!(plan.entries[0].status, acp::PlanEntryStatus::InProgress);
867                assert_eq!(plan.entries[1].content, "Write tests");
868                assert_eq!(plan.entries[1].status, acp::PlanEntryStatus::Pending);
869            }
870            other => panic!("Expected Plan, got {other:?}"),
871        }
872    }
873
874    #[test]
875    fn test_plan_notification_none_when_no_plan_or_no_meta() {
876        use mcp_utils::display_meta::ToolDisplayMeta;
877
878        let sid = acp::SessionId::new("test-session");
879        let meta: ToolResultMeta = ToolDisplayMeta::new("Read file", "main.rs").into();
880        assert!(try_extract_plan_notification(sid.clone(), Some(&meta)).is_none());
881        assert!(try_extract_plan_notification(sid, None).is_none());
882    }
883
884    #[test]
885    fn test_tool_progress_with_display_meta_emits_meta_update() {
886        use mcp_utils::display_meta::ToolDisplayMeta;
887
888        let session_id = acp::SessionId::new("test-session");
889        let meta = ToolResultMeta::from(ToolDisplayMeta::new("Read file", "main.rs"));
890        let serialized = serde_json::to_string(&meta).unwrap();
891
892        let request = ToolCallRequest {
893            id: "call_789".to_string(),
894            name: "coding__read_file".to_string(),
895            arguments: "{}".to_string(),
896        };
897
898        let notification = map_tool_progress_to_notification(session_id, &request, 0.0, None, Some(&serialized))
899            .expect("should produce notification");
900
901        match notification.update {
902            acp::SessionUpdate::ToolCallUpdate(update) => {
903                assert_eq!(&*update.tool_call_id.0, "call_789");
904                assert_eq!(update.fields.title.as_deref(), Some("Read file"), "native title should be set");
905                let meta_map = update.meta.expect("meta should be present");
906                assert_eq!(
907                    meta_map.get("display_value").and_then(|v| v.as_str()),
908                    Some("main.rs"),
909                    "display_value should be a flat key in _meta"
910                );
911                assert!(meta_map.get("display").is_none(), "old nested display object should not be in _meta");
912                assert_eq!(update.fields.status, Some(acp::ToolCallStatus::InProgress));
913                // Should NOT have content (no text progress fallback)
914                assert!(update.fields.content.is_none());
915            }
916            other => panic!("Expected ToolCallUpdate, got {other:?}"),
917        }
918    }
919
920    #[tokio::test(flavor = "current_thread")]
921    async fn replay_to_client_forwards_each_event_as_session_notification() {
922        use acp_utils::testing::test_connection;
923        use llm::ContentBlock as LlmContentBlock;
924        use tokio::task::LocalSet;
925
926        LocalSet::new()
927            .run_until(async {
928                let (cx, mut peer) = test_connection().await;
929                let session_id = acp::SessionId::new("test-session");
930                let events = vec![SessionEvent::User(UserEvent::Message {
931                    content: vec![LlmContentBlock::text("hello"), LlmContentBlock::text("world")],
932                })];
933
934                replay_to_client(&events, &cx, &session_id).await;
935
936                for _ in 0..2 {
937                    let notif = peer.next_session_notification().await;
938                    assert_eq!(notif.session_id, session_id);
939                    assert!(matches!(notif.update, SessionUpdate::UserMessageChunk(_)));
940                }
941            })
942            .await;
943    }
944}