Skip to main content

aether_cli/acp/
mappers.rs

1use acp_utils::notifications::{ContextClearedParams, ContextUsageParams, SubAgentProgressParams};
2use acp_utils::server::AcpServerError;
3use aether_core::events::{AgentMessage, SubAgentProgressPayload};
4use agent_client_protocol::schema::{
5    self as acp, Content, ContentBlock, ContentChunk, Diff, HttpHeader, McpServer, PlanEntry, PlanEntryPriority,
6    PlanEntryStatus, SessionId, SessionNotification, SessionUpdate, StopReason, TextContent, ToolCall, ToolCallContent,
7    ToolCallId, ToolCallStatus, ToolCallUpdate, ToolCallUpdateFields,
8};
9use agent_client_protocol::{Client, ConnectionTo};
10use llm::{ToolCallError, ToolCallRequest, ToolCallResult};
11use mcp_utils::client::{McpServerConfig, ServerConfig};
12use mcp_utils::display_meta::{PlanMetaStatus, ToolResultMeta};
13use rmcp::model::Prompt as McpPrompt;
14use rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig;
15
16use aether_core::context::ext::{SessionEvent, UserEvent};
17
18/// Converts an MCP Prompt to an ACP `AvailableCommand`
19///
20/// Strips the MCP namespace from the prompt name (e.g., "`coding__web`" -> "web")
21/// and creates a slash command that clients can invoke.
22pub fn map_mcp_prompt_to_available_command(prompt: &McpPrompt) -> acp::AvailableCommand {
23    // Extract the base command name by removing the namespace prefix
24    let command_name = prompt.name.split("__").last().unwrap_or(prompt.name.as_ref()).to_string();
25
26    // Extract the input hint from the unified prompt format's ARGUMENTS parameter,
27    // falling back to a generic hint.
28    let hint = prompt
29        .arguments
30        .as_ref()
31        .and_then(|args| args.iter().find(|a| a.name.as_str() == "ARGUMENTS").and_then(|a| a.description.as_deref()))
32        .unwrap_or("optional arguments");
33    let input = Some(acp::AvailableCommandInput::Unstructured(acp::UnstructuredCommandInput::new(hint)));
34
35    let description = prompt.description.clone().unwrap_or_else(|| "No description available".to_string());
36
37    acp::AvailableCommand::new(command_name, description).input(input)
38}
39
40/// Maps ACP MCP server definitions to internal `McpServerConfig`, skipping unsupported transports.
41pub fn map_acp_mcp_servers(servers: Vec<McpServer>) -> Vec<McpServerConfig> {
42    servers
43        .into_iter()
44        .filter_map(|s| {
45            try_map_mcp_server(s).or_else(|| {
46                tracing::warn!("Unsupported ACP MCP server transport, skipping");
47                None
48            })
49        })
50        .collect()
51}
52
53fn try_map_mcp_server(server: McpServer) -> Option<McpServerConfig> {
54    use McpServer::{Http, Sse, Stdio};
55    match server {
56        Stdio(stdio) => Some(
57            ServerConfig::Stdio {
58                name: stdio.name,
59                command: stdio.command.to_string_lossy().into_owned(),
60                args: stdio.args,
61                env: stdio.env.into_iter().map(|e| (e.name, e.value)).collect(),
62            }
63            .into(),
64        ),
65
66        Http(http) => Some(ServerConfig::Http { name: http.name, config: http_config(http.url, &http.headers) }.into()),
67
68        Sse(sse) => Some(ServerConfig::Http { name: sse.name, config: http_config(sse.url, &sse.headers) }.into()),
69
70        _ => None,
71    }
72}
73
74fn http_config(url: String, headers: &[HttpHeader]) -> StreamableHttpClientTransportConfig {
75    let auth_header = headers.iter().find(|h| h.name.eq_ignore_ascii_case("authorization")).map(|h| h.value.clone());
76
77    let mut config = StreamableHttpClientTransportConfig::with_uri(url);
78    if let Some(auth) = auth_header {
79        // rmcp's `auth_header` wants the bare token; it adds the `Bearer ` prefix itself.
80        let token = auth
81            .split_once(' ')
82            .filter(|(scheme, _)| scheme.eq_ignore_ascii_case("Bearer"))
83            .map_or(auth.as_str(), |(_, rest)| rest);
84        config = config.auth_header(token.to_string());
85    }
86    config
87}
88
89/// Converts Aether `AgentMessage` to ACP `SessionUpdate`
90pub fn map_agent_message_to_session_notification(
91    session_id: SessionId,
92    msg: &AgentMessage,
93) -> Option<SessionNotification> {
94    map_agent_message_to_notification(session_id, msg, NotificationMode::Live)
95}
96
97#[derive(Clone, Copy)]
98enum NotificationMode {
99    Live,
100    Replay,
101}
102
103fn map_agent_message_to_notification(
104    session_id: SessionId,
105    msg: &AgentMessage,
106    mode: NotificationMode,
107) -> Option<SessionNotification> {
108    match msg {
109        AgentMessage::Text { chunk, is_complete, .. } => {
110            map_chunk_to_notification(session_id, chunk, *is_complete, mode, SessionUpdate::AgentMessageChunk)
111        }
112
113        AgentMessage::Thought { chunk, is_complete, .. } => {
114            map_chunk_to_notification(session_id, chunk, *is_complete, mode, SessionUpdate::AgentThoughtChunk)
115        }
116
117        AgentMessage::ToolCall { request, .. } => Some(map_tool_call_to_notification(session_id, request)),
118
119        AgentMessage::ToolCallUpdate { tool_call_id, chunk, .. } => {
120            Some(map_tool_call_update_to_notification(session_id, tool_call_id, chunk))
121        }
122
123        AgentMessage::ToolResult { result, result_meta, .. } => {
124            Some(map_tool_result_to_notification(session_id, result, result_meta.as_ref()))
125        }
126
127        AgentMessage::ToolError { error, .. } => Some(map_tool_error_to_notification(session_id, error)),
128
129        AgentMessage::ToolProgress { request, progress, total, message } => {
130            map_tool_progress_to_notification(session_id, request, *progress, *total, message.as_ref())
131        }
132
133        AgentMessage::Error { message } => Some(acp::SessionNotification::new(
134            session_id,
135            SessionUpdate::AgentMessageChunk(ContentChunk::new(ContentBlock::Text(TextContent::new(format!(
136                "[Error] {message}"
137            ))))),
138        )),
139
140        AgentMessage::ContextUsageUpdate { .. }
141        | AgentMessage::ContextCleared
142        | AgentMessage::Cancelled { .. }
143        | AgentMessage::Done
144        | AgentMessage::ContextCompactionStarted { .. }
145        | AgentMessage::ContextCompactionResult { .. }
146        | AgentMessage::AutoContinue { .. }
147        | AgentMessage::Retrying { .. }
148        | AgentMessage::ModelSwitched { .. } => None,
149    }
150}
151
152/// Typed union of agent-side extension notifications that the relay forwards
153/// to the client. Each variant serializes to its own `_aether/*` wire method
154/// and is sent via [`ConnectionTo<Client>::send_notification`].
155pub enum AgentExtNotification {
156    ContextUsage(ContextUsageParams),
157    ContextCleared(ContextClearedParams),
158    SubAgentProgress(SubAgentProgressParams),
159}
160
161pub fn try_into_agent_notification(msg: &AgentMessage) -> Option<AgentExtNotification> {
162    match msg {
163        AgentMessage::ContextUsageUpdate {
164            usage_ratio,
165            context_limit,
166            input_tokens,
167            output_tokens,
168            cache_read_tokens,
169            cache_creation_tokens,
170            reasoning_tokens,
171            total_input_tokens,
172            total_output_tokens,
173            total_cache_read_tokens,
174            total_cache_creation_tokens,
175            total_reasoning_tokens,
176        } => Some(AgentExtNotification::ContextUsage(ContextUsageParams {
177            usage_ratio: *usage_ratio,
178            context_limit: *context_limit,
179            input_tokens: *input_tokens,
180            output_tokens: *output_tokens,
181            cache_read_tokens: *cache_read_tokens,
182            cache_creation_tokens: *cache_creation_tokens,
183            reasoning_tokens: *reasoning_tokens,
184            total_input_tokens: *total_input_tokens,
185            total_output_tokens: *total_output_tokens,
186            total_cache_read_tokens: *total_cache_read_tokens,
187            total_cache_creation_tokens: *total_cache_creation_tokens,
188            total_reasoning_tokens: *total_reasoning_tokens,
189        })),
190        AgentMessage::ToolProgress { request, message, .. } => {
191            let msg_str = message.as_ref()?;
192            let params = try_parse_sub_agent_progress(msg_str, request)?;
193            Some(AgentExtNotification::SubAgentProgress(params))
194        }
195        AgentMessage::ContextCleared => Some(AgentExtNotification::ContextCleared(ContextClearedParams::default())),
196        _ => None,
197    }
198}
199
200/// If the tool result carries plan metadata, build a `SessionUpdate::Plan` notification.
201pub fn try_extract_plan_notification(
202    session_id: SessionId,
203    result_meta: Option<&ToolResultMeta>,
204) -> Option<SessionNotification> {
205    let plan_meta = result_meta?.plan.as_ref()?;
206    let entries = plan_meta
207        .entries
208        .iter()
209        .map(|e| PlanEntry::new(e.content.clone(), PlanEntryPriority::Medium, plan_status_to_acp(e.status)))
210        .collect();
211    Some(SessionNotification::new(session_id, SessionUpdate::Plan(acp::Plan::new(entries))))
212}
213
214/// Convert internal plan status to ACP protocol status.
215fn plan_status_to_acp(status: PlanMetaStatus) -> PlanEntryStatus {
216    match status {
217        PlanMetaStatus::InProgress => PlanEntryStatus::InProgress,
218        PlanMetaStatus::Completed => PlanEntryStatus::Completed,
219        PlanMetaStatus::Pending => PlanEntryStatus::Pending,
220    }
221}
222
223/// Determines the stop reason from the final agent message
224pub fn map_agent_message_to_stop_reason(msg: &AgentMessage) -> acp::StopReason {
225    match msg {
226        AgentMessage::Cancelled { .. } => StopReason::Cancelled,
227        _ => StopReason::EndTurn,
228    }
229}
230
231fn map_chunk_to_notification(
232    session_id: SessionId,
233    chunk: &str,
234    is_complete: bool,
235    mode: NotificationMode,
236    wrap: fn(ContentChunk) -> SessionUpdate,
237) -> Option<SessionNotification> {
238    match mode {
239        // Skip the final completion message to avoid sending duplicate content.
240        // The client has already received all the chunks during streaming.
241        NotificationMode::Live if is_complete => return None,
242        NotificationMode::Replay if !is_complete => return None,
243        NotificationMode::Live | NotificationMode::Replay => {}
244    }
245
246    Some(acp::SessionNotification::new(
247        session_id,
248        wrap(ContentChunk::new(ContentBlock::Text(TextContent::new(chunk.to_owned())))),
249    ))
250}
251
252fn map_tool_call_to_notification(session_id: SessionId, request: &ToolCallRequest) -> SessionNotification {
253    let raw_input = serde_json::from_str(&request.arguments).ok();
254    SessionNotification::new(
255        session_id,
256        SessionUpdate::ToolCall(
257            ToolCall::new(ToolCallId::new(request.id.clone()), humanize_tool_name(&request.name))
258                .status(acp::ToolCallStatus::InProgress)
259                .raw_input(raw_input),
260        ),
261    )
262}
263
264fn parse_tool_call_chunk(chunk: &str) -> serde_json::Value {
265    serde_json::from_str(chunk).unwrap_or_else(|_| serde_json::Value::String(chunk.to_string()))
266}
267
268fn map_tool_call_update_to_notification(session_id: SessionId, tool_call_id: &str, chunk: &str) -> SessionNotification {
269    let fields = ToolCallUpdateFields::new().status(ToolCallStatus::InProgress).raw_input(parse_tool_call_chunk(chunk));
270
271    SessionNotification::new(
272        session_id,
273        SessionUpdate::ToolCallUpdate(ToolCallUpdate::new(ToolCallId::new(tool_call_id.to_string()), fields)),
274    )
275}
276
277/// Produces the initial human-readable title for a tool call (e.g., "Read file").
278/// This is sent when the tool call starts.
279fn humanize_tool_name(name: &str) -> String {
280    let base = name.split("__").last().unwrap_or(name);
281    let mut result = base.replace('_', " ");
282    if let Some(first) = result.get_mut(0..1) {
283        first.make_ascii_uppercase();
284    }
285    result
286}
287
288fn map_tool_result_to_notification(
289    session_id: SessionId,
290    result: &ToolCallResult,
291    result_meta: Option<&ToolResultMeta>,
292) -> SessionNotification {
293    let mut content =
294        vec![ToolCallContent::Content(Content::new(ContentBlock::Text(TextContent::new(result.result.clone()))))];
295
296    if let Some(rm) = result_meta
297        && let Some(fd) = &rm.file_diff
298    {
299        let mut diff = Diff::new(&fd.path, &fd.new_text);
300        if let Some(old) = &fd.old_text {
301            diff = diff.old_text(old.clone());
302        }
303        content.push(ToolCallContent::Diff(diff));
304    }
305
306    let mut fields = ToolCallUpdateFields::new().status(ToolCallStatus::Completed).content(content);
307
308    if let Some(rm) = result_meta {
309        fields = fields.title(&rm.display.title);
310    }
311
312    let mut update = ToolCallUpdate::new(ToolCallId::new(result.id.clone()), fields);
313
314    if let Some(rm) = result_meta
315        && !rm.display.value.is_empty()
316    {
317        let mut meta_map = serde_json::Map::new();
318        meta_map.insert("display_value".into(), rm.display.value.clone().into());
319        update = update.meta(meta_map);
320    }
321
322    SessionNotification::new(session_id, SessionUpdate::ToolCallUpdate(update))
323}
324
325fn map_tool_error_to_notification(session_id: SessionId, error: &ToolCallError) -> SessionNotification {
326    SessionNotification::new(
327        session_id,
328        SessionUpdate::ToolCallUpdate(ToolCallUpdate::new(
329            ToolCallId::new(error.id.clone()),
330            ToolCallUpdateFields::new().status(ToolCallStatus::Failed).content(vec![ToolCallContent::Content(
331                Content::new(ContentBlock::Text(TextContent::new(error.error.clone()))),
332            )]),
333        )),
334    )
335}
336
337fn map_tool_progress_to_notification(
338    session_id: SessionId,
339    request: &ToolCallRequest,
340    progress: f64,
341    total: Option<f64>,
342    message: Option<&String>,
343) -> Option<SessionNotification> {
344    tracing::debug!("Tool progress: {message:?}");
345
346    if message.and_then(|msg_str| try_parse_sub_agent_progress(msg_str, request)).is_some() {
347        return None;
348    }
349
350    if let Some(result_meta) = message.and_then(|m| try_parse_display_meta(m)) {
351        let fields = ToolCallUpdateFields::new().status(ToolCallStatus::InProgress).title(&result_meta.display.title);
352
353        let mut update = ToolCallUpdate::new(ToolCallId::new(request.id.clone()), fields);
354
355        if !result_meta.display.value.is_empty() {
356            let mut meta_map = serde_json::Map::new();
357            meta_map.insert("display_value".into(), result_meta.display.value.into());
358            update = update.meta(meta_map);
359        }
360
361        return Some(SessionNotification::new(session_id, SessionUpdate::ToolCallUpdate(update)));
362    }
363
364    let total_str = total.map_or_else(|| "?".to_string(), |t| t.to_string());
365    let progress_text = message
366        .map_or_else(|| format!("Progress: {progress}/{total_str}"), |msg| format!("{msg} ({progress}/{total_str})"));
367
368    Some(SessionNotification::new(
369        session_id,
370        SessionUpdate::ToolCallUpdate(ToolCallUpdate::new(
371            ToolCallId::new(request.id.clone()),
372            ToolCallUpdateFields::new().status(ToolCallStatus::InProgress).content(vec![ToolCallContent::Content(
373                Content::new(ContentBlock::Text(TextContent::new(progress_text))),
374            )]),
375        )),
376    ))
377}
378
379/// Replay session events to the client as ACP notifications.
380pub async fn replay_to_client(events: &[SessionEvent], connection: &ConnectionTo<Client>, session_id: &SessionId) {
381    for notif in replay_events_to_notifications(events, session_id) {
382        if let Err(e) = connection.send_notification(notif).map_err(|e| AcpServerError::protocol("session/update", e)) {
383            tracing::error!("Failed to send replay notification: {e:?}");
384        }
385    }
386}
387
388pub fn replay_events_to_notifications(events: &[SessionEvent], session_id: &SessionId) -> Vec<SessionNotification> {
389    let mut out = Vec::new();
390    for event in events {
391        match event {
392            SessionEvent::User(UserEvent::Message { content }) => {
393                for block in content {
394                    out.push(SessionNotification::new(
395                        session_id.clone(),
396                        SessionUpdate::UserMessageChunk(ContentChunk::new(map_user_content_block(block))),
397                    ));
398                }
399            }
400            SessionEvent::Agent(message) => {
401                out.extend(map_agent_message_to_notification(session_id.clone(), message, NotificationMode::Replay));
402            }
403            SessionEvent::User(_) => {}
404        }
405    }
406    out
407}
408
409fn map_user_content_block(block: &llm::ContentBlock) -> ContentBlock {
410    match block {
411        llm::ContentBlock::Text { text } => ContentBlock::Text(TextContent::new(text.clone())),
412        llm::ContentBlock::Image { data, mime_type } => {
413            ContentBlock::Image(acp::ImageContent::new(data.clone(), mime_type.clone()))
414        }
415        llm::ContentBlock::Audio { data, mime_type } => {
416            ContentBlock::Audio(acp::AudioContent::new(data.clone(), mime_type.clone()))
417        }
418    }
419}
420
421fn try_parse_display_meta(message: &str) -> Option<ToolResultMeta> {
422    serde_json::from_str::<ToolResultMeta>(message).ok()
423}
424
425/// Attempt to parse a tool progress message as sub-agent progress.
426fn try_parse_sub_agent_progress(message: &str, request: &llm::ToolCallRequest) -> Option<SubAgentProgressParams> {
427    let payload: SubAgentProgressPayload = serde_json::from_str(message).ok()?;
428
429    Some(SubAgentProgressParams {
430        parent_tool_id: request.id.clone(),
431        task_id: payload.task_id,
432        agent_name: payload.agent_name,
433        event: (&payload.event).into(),
434    })
435}
436
437#[cfg(test)]
438mod tests {
439    use super::*;
440    use acp_utils::notifications::SubAgentEvent;
441    use llm::ToolCallRequest;
442
443    #[test]
444    fn test_tool_progress_with_sub_agent_payload_emits_ext_notification() {
445        let session_id = acp::SessionId::new("test-session");
446
447        let payload = SubAgentProgressPayload {
448            task_id: "task_1".to_string(),
449            agent_name: "sub-agent".to_string(),
450            event: AgentMessage::Text {
451                message_id: "msg_1".to_string(),
452                chunk: "Hello".to_string(),
453                is_complete: false,
454                model_name: "TestModel".to_string(),
455            },
456        };
457        let serialized_msg = serde_json::to_string(&payload).unwrap();
458
459        let tool_progress = AgentMessage::ToolProgress {
460            request: ToolCallRequest {
461                id: "call_123".to_string(),
462                name: "plugins__spawn_subagent".to_string(),
463                arguments: "{}".to_string(),
464            },
465            progress: 42.0,
466            total: Some(100.0),
467            message: Some(serialized_msg.clone()),
468        };
469
470        let notification = map_agent_message_to_session_notification(session_id.clone(), &tool_progress);
471
472        assert!(notification.is_none());
473
474        let agent_notif = try_into_agent_notification(&tool_progress).expect("agent notification");
475        match agent_notif {
476            AgentExtNotification::SubAgentProgress(params) => {
477                assert_eq!(params.parent_tool_id, "call_123");
478                assert_eq!(params.task_id, "task_1");
479                assert_eq!(params.agent_name, "sub-agent");
480                assert!(matches!(params.event, SubAgentEvent::Other));
481            }
482            _ => panic!("expected SubAgentProgress"),
483        }
484    }
485
486    #[test]
487    fn replay_emits_user_media_chunks_in_order() {
488        let session_id = acp::SessionId::new("test-session");
489        let events = vec![SessionEvent::User(UserEvent::Message {
490            content: vec![
491                llm::ContentBlock::text("hello"),
492                llm::ContentBlock::Image { data: "aW1n".to_string(), mime_type: "image/png".to_string() },
493                llm::ContentBlock::Audio { data: "YXVkaW8=".to_string(), mime_type: "audio/wav".to_string() },
494            ],
495        })];
496
497        let notifications = replay_events_to_notifications(&events, &session_id);
498        let updates: Vec<_> = notifications.into_iter().map(|n| n.update).collect();
499        assert!(matches!(
500            &updates[0],
501            acp::SessionUpdate::UserMessageChunk(chunk)
502                if matches!(&chunk.content, acp::ContentBlock::Text(text) if text.text == "hello")
503        ));
504        assert!(matches!(
505            &updates[1],
506            acp::SessionUpdate::UserMessageChunk(chunk)
507                if matches!(&chunk.content, acp::ContentBlock::Image(_))
508        ));
509        assert!(matches!(
510            &updates[2],
511            acp::SessionUpdate::UserMessageChunk(chunk)
512                if matches!(&chunk.content, acp::ContentBlock::Audio(_))
513        ));
514    }
515
516    #[test]
517    fn test_thought_maps_to_agent_thought_chunk() {
518        let session_id = acp::SessionId::new("test-session");
519        let thought = AgentMessage::Thought {
520            message_id: "msg_1".to_string(),
521            chunk: "thinking...".to_string(),
522            is_complete: false,
523            model_name: "TestModel".to_string(),
524        };
525
526        let notification = map_agent_message_to_session_notification(session_id, &thought).expect("notification");
527
528        match notification.update {
529            acp::SessionUpdate::AgentThoughtChunk(chunk) => match chunk.content {
530                acp::ContentBlock::Text(text) => assert_eq!(text.text, "thinking..."),
531                other => panic!("Expected text content, got {other:?}"),
532            },
533            other => panic!("Expected AgentThoughtChunk, got {other:?}"),
534        }
535    }
536
537    #[test]
538    fn test_tool_call_maps_to_tool_call_notification() {
539        let session_id = acp::SessionId::new("test-session");
540        let message = AgentMessage::ToolCall {
541            request: ToolCallRequest {
542                id: "call_1".to_string(),
543                name: "coding__read_file".to_string(),
544                arguments: "{}".to_string(),
545            },
546            model_name: "TestModel".to_string(),
547        };
548
549        let notification = map_agent_message_to_session_notification(session_id, &message).expect("notification");
550
551        match notification.update {
552            acp::SessionUpdate::ToolCall(tool_call) => {
553                assert_eq!(tool_call.tool_call_id.0.as_ref(), "call_1");
554                assert_eq!(tool_call.title, "Read file");
555                assert_eq!(tool_call.status, acp::ToolCallStatus::InProgress);
556            }
557            other => panic!("Expected ToolCall, got {other:?}"),
558        }
559    }
560
561    #[test]
562    fn test_tool_call_update_maps_to_tool_call_update_notification() {
563        let session_id = acp::SessionId::new("test-session");
564        let message = AgentMessage::ToolCallUpdate {
565            tool_call_id: "call_1".to_string(),
566            chunk: r#"{"filePath":"Cargo.toml"}"#.to_string(),
567            model_name: "TestModel".to_string(),
568        };
569
570        let notification = map_agent_message_to_session_notification(session_id, &message).expect("notification");
571
572        match notification.update {
573            acp::SessionUpdate::ToolCallUpdate(update) => {
574                assert_eq!(update.tool_call_id.0.as_ref(), "call_1");
575                assert_eq!(update.fields.status, Some(acp::ToolCallStatus::InProgress));
576                assert_eq!(update.fields.raw_input, Some(serde_json::json!({ "filePath": "Cargo.toml" })));
577            }
578            other => panic!("Expected ToolCallUpdate, got {other:?}"),
579        }
580    }
581
582    #[test]
583    fn test_tool_call_update_has_same_live_and_replay_mapping() {
584        let session_id = acp::SessionId::new("test-session");
585        let message = AgentMessage::ToolCallUpdate {
586            tool_call_id: "call_1".to_string(),
587            chunk: r#"{"filePath":"Cargo.toml"}"#.to_string(),
588            model_name: "TestModel".to_string(),
589        };
590
591        let live = map_agent_message_to_notification(session_id.clone(), &message, NotificationMode::Live)
592            .expect("live notification");
593        let replay = map_agent_message_to_notification(session_id, &message, NotificationMode::Replay)
594            .expect("replay notification");
595
596        match (live.update, replay.update) {
597            (acp::SessionUpdate::ToolCallUpdate(live), acp::SessionUpdate::ToolCallUpdate(replay)) => {
598                assert_eq!(live.tool_call_id.0, replay.tool_call_id.0);
599                assert_eq!(live.fields.status, replay.fields.status);
600                assert_eq!(live.fields.raw_input, replay.fields.raw_input);
601            }
602            other => panic!("Expected ToolCallUpdate pair, got {other:?}"),
603        }
604    }
605
606    #[test]
607    fn test_live_mapping_skips_completed_chunks_but_replay_keeps_them() {
608        let cases: Vec<(AgentMessage, &str)> = vec![
609            (
610                AgentMessage::Text {
611                    message_id: "msg_1".to_string(),
612                    chunk: "done".to_string(),
613                    is_complete: true,
614                    model_name: "TestModel".to_string(),
615                },
616                "done",
617            ),
618            (
619                AgentMessage::Thought {
620                    message_id: "msg_1".to_string(),
621                    chunk: "final reasoning".to_string(),
622                    is_complete: true,
623                    model_name: "TestModel".to_string(),
624                },
625                "final reasoning",
626            ),
627        ];
628
629        for (message, expected_text) in cases {
630            let session_id = acp::SessionId::new("test-session");
631            assert!(
632                map_agent_message_to_notification(session_id.clone(), &message, NotificationMode::Live).is_none(),
633                "live mode should skip completed chunk"
634            );
635
636            let notification = map_agent_message_to_notification(session_id, &message, NotificationMode::Replay)
637                .expect("replay notification");
638
639            match notification.update {
640                acp::SessionUpdate::AgentMessageChunk(chunk) | acp::SessionUpdate::AgentThoughtChunk(chunk) => {
641                    match chunk.content {
642                        acp::ContentBlock::Text(text) => assert_eq!(text.text, expected_text),
643                        other => panic!("Expected text content, got {other:?}"),
644                    }
645                }
646                other => panic!("Expected chunk update, got {other:?}"),
647            }
648        }
649    }
650
651    #[test]
652    fn test_context_cleared_maps_to_agent_notification() {
653        let notif = try_into_agent_notification(&AgentMessage::ContextCleared)
654            .expect("context cleared should emit agent notification");
655        assert!(matches!(notif, AgentExtNotification::ContextCleared(_)));
656    }
657
658    #[test]
659    fn test_tool_progress_with_invalid_json_falls_back_to_simple_message() {
660        let session_id = acp::SessionId::new("test-session");
661
662        // Simulate a tool progress message with invalid JSON
663        let tool_progress = AgentMessage::ToolProgress {
664            request: ToolCallRequest {
665                id: "call_456".to_string(),
666                name: "some_tool".to_string(),
667                arguments: "{}".to_string(),
668            },
669            progress: 50.0,
670            total: None,
671            message: Some("not valid json".to_string()),
672        };
673
674        let notification = map_agent_message_to_session_notification(session_id.clone(), &tool_progress);
675
676        assert!(notification.is_some());
677
678        // Should still produce a notification with the message as-is
679        let notification = notification.unwrap();
680        match notification.update {
681            acp::SessionUpdate::ToolCallUpdate(update) => {
682                if let Some(content) = &update.fields.content
683                    && let acp::ToolCallContent::Content(c) = &content[0]
684                    && let acp::ContentBlock::Text(text) = &c.content
685                {
686                    // Should contain the original message
687                    assert!(text.text.contains("not valid json"));
688                }
689            }
690            _ => panic!("Expected ToolCallUpdate"),
691        }
692    }
693
694    #[test]
695    fn test_map_acp_stdio_server() {
696        let server = acp::McpServer::Stdio(
697            acp::McpServerStdio::new("my-server", "/usr/bin/server")
698                .args(vec!["--port".into(), "8080".into()])
699                .env(vec![acp::EnvVariable::new("FOO", "bar")]),
700        );
701
702        let configs = map_acp_mcp_servers(vec![server]);
703        assert_eq!(configs.len(), 1);
704
705        match &configs[0] {
706            McpServerConfig::Server(ServerConfig::Stdio { name, command, args, env }) => {
707                assert_eq!(name, "my-server");
708                assert_eq!(command, "/usr/bin/server");
709                assert_eq!(args, &["--port", "8080"]);
710                assert_eq!(env.get("FOO").unwrap(), "bar");
711            }
712            other => panic!("Expected Stdio, got {other:?}"),
713        }
714    }
715
716    #[test]
717    fn test_map_acp_http_server() {
718        let server = acp::McpServer::Http(
719            acp::McpServerHttp::new("http-server", "https://example.com/mcp")
720                .headers(vec![acp::HttpHeader::new("Authorization", "Bearer token123")]),
721        );
722
723        let configs = map_acp_mcp_servers(vec![server]);
724        assert_eq!(configs.len(), 1);
725
726        match &configs[0] {
727            McpServerConfig::Server(ServerConfig::Http { name, config }) => {
728                assert_eq!(name, "http-server");
729                assert_eq!(config.uri.as_ref(), "https://example.com/mcp");
730                assert_eq!(config.auth_header.as_deref(), Some("token123"));
731            }
732            other => panic!("Expected Http, got {other:?}"),
733        }
734    }
735
736    #[test]
737    fn test_http_auth_header_strips_bearer_case_insensitively() {
738        let cases = [
739            ("Bearer token123", "token123"),
740            ("bearer token123", "token123"),
741            ("BEARER token123", "token123"),
742            ("bEaReR token123", "token123"),
743            // Non-Bearer scheme: pass through verbatim. rmcp will then prefix
744            // with "Bearer ", which is the contract for non-bearer auth too.
745            ("Token foo", "Token foo"),
746            ("token123", "token123"),
747        ];
748
749        for (input, expected) in cases {
750            let server = acp::McpServer::Http(
751                acp::McpServerHttp::new("http-server", "https://example.com/mcp")
752                    .headers(vec![acp::HttpHeader::new("Authorization", input)]),
753            );
754            let configs = map_acp_mcp_servers(vec![server]);
755            match &configs[0] {
756                McpServerConfig::Server(ServerConfig::Http { config, .. }) => {
757                    assert_eq!(config.auth_header.as_deref(), Some(expected), "input was {input:?}");
758                }
759                other => panic!("Expected Http, got {other:?}"),
760            }
761        }
762    }
763
764    #[test]
765    fn test_map_acp_sse_server() {
766        let server = acp::McpServer::Sse(acp::McpServerSse::new("sse-server", "https://example.com/sse"));
767
768        let configs = map_acp_mcp_servers(vec![server]);
769        assert_eq!(configs.len(), 1);
770
771        match &configs[0] {
772            McpServerConfig::Server(ServerConfig::Http { name, config }) => {
773                assert_eq!(name, "sse-server");
774                assert_eq!(config.uri.as_ref(), "https://example.com/sse");
775                assert_eq!(config.auth_header, None);
776            }
777            other => panic!("Expected Http, got {other:?}"),
778        }
779    }
780
781    #[test]
782    fn test_humanize_tool_name() {
783        assert_eq!(humanize_tool_name("coding__read_file"), "Read file");
784        assert_eq!(humanize_tool_name("read_file"), "Read file");
785        assert_eq!(humanize_tool_name("bash"), "Bash");
786        assert_eq!(humanize_tool_name("plugins__coding__read_file"), "Read file");
787    }
788
789    #[test]
790    fn test_result_with_result_meta_sets_meta() {
791        use mcp_utils::display_meta::ToolDisplayMeta;
792
793        let session_id = acp::SessionId::new("test-session");
794        let result = ToolCallResult {
795            id: "call_1".to_string(),
796            name: "coding__read_file".to_string(),
797            arguments: "{}".to_string(),
798            result: "file contents".to_string(),
799        };
800        let rm: ToolResultMeta = ToolDisplayMeta::new("Read file", "Cargo.toml, 156 lines").into();
801
802        let notification = map_tool_result_to_notification(session_id, &result, Some(&rm));
803        match notification.update {
804            acp::SessionUpdate::ToolCallUpdate(update) => {
805                assert_eq!(update.fields.title.as_deref(), Some("Read file"), "native title should be set");
806                let meta = update.meta.expect("meta should be present");
807                assert_eq!(
808                    meta.get("display_value").and_then(|v| v.as_str()),
809                    Some("Cargo.toml, 156 lines"),
810                    "display_value should be a flat key in _meta"
811                );
812                assert!(meta.get("display").is_none(), "old nested display object should not be in _meta");
813            }
814            other => panic!("Expected ToolCallUpdate, got {other:?}"),
815        }
816    }
817
818    #[test]
819    fn test_result_without_result_meta() {
820        let session_id = acp::SessionId::new("test-session");
821        let result = ToolCallResult {
822            id: "call_1".to_string(),
823            name: "external__some_tool".to_string(),
824            arguments: "{}".to_string(),
825            result: "ok".to_string(),
826        };
827
828        let notification = map_tool_result_to_notification(session_id, &result, None);
829        match notification.update {
830            acp::SessionUpdate::ToolCallUpdate(update) => {
831                assert!(update.fields.title.is_none());
832                assert!(update.meta.is_none());
833            }
834            other => panic!("Expected ToolCallUpdate, got {other:?}"),
835        }
836    }
837
838    #[test]
839    fn test_plan_notification_extracted_from_result_meta() {
840        use mcp_utils::display_meta::{PlanMeta, PlanMetaEntry, PlanMetaStatus, ToolDisplayMeta};
841
842        let session_id = acp::SessionId::new("test-session");
843        let meta = ToolResultMeta::with_plan(
844            ToolDisplayMeta::new("Todo", "Research AI agents"),
845            PlanMeta {
846                entries: vec![
847                    PlanMetaEntry { content: "Research AI agents".to_string(), status: PlanMetaStatus::InProgress },
848                    PlanMetaEntry { content: "Write tests".to_string(), status: PlanMetaStatus::Pending },
849                ],
850            },
851        );
852
853        let notification = try_extract_plan_notification(session_id, Some(&meta)).expect("should produce plan");
854        match notification.update {
855            acp::SessionUpdate::Plan(plan) => {
856                assert_eq!(plan.entries.len(), 2);
857                assert_eq!(plan.entries[0].content, "Research AI agents");
858                assert_eq!(plan.entries[0].status, acp::PlanEntryStatus::InProgress);
859                assert_eq!(plan.entries[1].content, "Write tests");
860                assert_eq!(plan.entries[1].status, acp::PlanEntryStatus::Pending);
861            }
862            other => panic!("Expected Plan, got {other:?}"),
863        }
864    }
865
866    #[test]
867    fn test_plan_notification_none_when_no_plan_or_no_meta() {
868        use mcp_utils::display_meta::ToolDisplayMeta;
869
870        let sid = acp::SessionId::new("test-session");
871        let meta: ToolResultMeta = ToolDisplayMeta::new("Read file", "main.rs").into();
872        assert!(try_extract_plan_notification(sid.clone(), Some(&meta)).is_none());
873        assert!(try_extract_plan_notification(sid, None).is_none());
874    }
875
876    #[test]
877    fn test_tool_progress_with_display_meta_emits_meta_update() {
878        use mcp_utils::display_meta::ToolDisplayMeta;
879
880        let session_id = acp::SessionId::new("test-session");
881        let meta = ToolResultMeta::from(ToolDisplayMeta::new("Read file", "main.rs"));
882        let serialized = serde_json::to_string(&meta).unwrap();
883
884        let request = ToolCallRequest {
885            id: "call_789".to_string(),
886            name: "coding__read_file".to_string(),
887            arguments: "{}".to_string(),
888        };
889
890        let notification = map_tool_progress_to_notification(session_id, &request, 0.0, None, Some(&serialized))
891            .expect("should produce notification");
892
893        match notification.update {
894            acp::SessionUpdate::ToolCallUpdate(update) => {
895                assert_eq!(&*update.tool_call_id.0, "call_789");
896                assert_eq!(update.fields.title.as_deref(), Some("Read file"), "native title should be set");
897                let meta_map = update.meta.expect("meta should be present");
898                assert_eq!(
899                    meta_map.get("display_value").and_then(|v| v.as_str()),
900                    Some("main.rs"),
901                    "display_value should be a flat key in _meta"
902                );
903                assert!(meta_map.get("display").is_none(), "old nested display object should not be in _meta");
904                assert_eq!(update.fields.status, Some(acp::ToolCallStatus::InProgress));
905                // Should NOT have content (no text progress fallback)
906                assert!(update.fields.content.is_none());
907            }
908            other => panic!("Expected ToolCallUpdate, got {other:?}"),
909        }
910    }
911
912    #[tokio::test(flavor = "current_thread")]
913    async fn replay_to_client_forwards_each_event_as_session_notification() {
914        use acp_utils::testing::test_connection;
915        use llm::ContentBlock as LlmContentBlock;
916        use tokio::task::LocalSet;
917
918        LocalSet::new()
919            .run_until(async {
920                let (cx, mut peer) = test_connection().await;
921                let session_id = acp::SessionId::new("test-session");
922                let events = vec![SessionEvent::User(UserEvent::Message {
923                    content: vec![LlmContentBlock::text("hello"), LlmContentBlock::text("world")],
924                })];
925
926                replay_to_client(&events, &cx, &session_id).await;
927
928                for _ in 0..2 {
929                    let notif = peer.next_session_notification().await;
930                    assert_eq!(notif.session_id, session_id);
931                    assert!(matches!(notif.update, SessionUpdate::UserMessageChunk(_)));
932                }
933            })
934            .await;
935    }
936}