Skip to main content

aether_cli/acp/
mappers.rs

1use acp_utils::notifications::{ContextClearedParams, ContextUsageParams, SubAgentProgressParams};
2use acp_utils::server::AcpServerError;
3use aether_core::events::{AgentMessage, SubAgentProgressPayload};
4use agent_client_protocol::schema::{
5    self as acp, Content, ContentBlock, ContentChunk, Diff, HttpHeader, McpServer, PlanEntry, PlanEntryPriority,
6    PlanEntryStatus, SessionId, SessionNotification, SessionUpdate, StopReason, TextContent, ToolCall, ToolCallContent,
7    ToolCallId, ToolCallStatus, ToolCallUpdate, ToolCallUpdateFields,
8};
9use agent_client_protocol::{Client, ConnectionTo};
10use llm::{ToolCallError, ToolCallRequest, ToolCallResult};
11use mcp_utils::client::{McpServerConfig, ServerConfig};
12use mcp_utils::display_meta::{PlanMetaStatus, ToolResultMeta};
13use rmcp::model::Prompt as McpPrompt;
14use rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig;
15
16use aether_core::context::ext::{SessionEvent, UserEvent};
17
18/// Converts an MCP Prompt to an ACP `AvailableCommand`
19///
20/// Strips the MCP namespace from the prompt name (e.g., "`coding__web`" -> "web")
21/// and creates a slash command that clients can invoke.
22pub fn map_mcp_prompt_to_available_command(prompt: &McpPrompt) -> acp::AvailableCommand {
23    // Extract the base command name by removing the namespace prefix
24    let command_name = prompt.name.split("__").last().unwrap_or(prompt.name.as_ref()).to_string();
25
26    // Extract the input hint from the unified prompt format's ARGUMENTS parameter,
27    // falling back to a generic hint.
28    let hint = prompt
29        .arguments
30        .as_ref()
31        .and_then(|args| args.iter().find(|a| a.name.as_str() == "ARGUMENTS").and_then(|a| a.description.as_deref()))
32        .unwrap_or("optional arguments");
33    let input = Some(acp::AvailableCommandInput::Unstructured(acp::UnstructuredCommandInput::new(hint)));
34
35    let description = prompt.description.clone().unwrap_or_else(|| "No description available".to_string());
36
37    acp::AvailableCommand::new(command_name, description).input(input)
38}
39
40/// Maps ACP MCP server definitions to internal `McpServerConfig`, skipping unsupported transports.
41pub fn map_acp_mcp_servers(servers: Vec<McpServer>) -> Vec<McpServerConfig> {
42    servers
43        .into_iter()
44        .filter_map(|s| {
45            try_map_mcp_server(s).or_else(|| {
46                tracing::warn!("Unsupported ACP MCP server transport, skipping");
47                None
48            })
49        })
50        .collect()
51}
52
53fn try_map_mcp_server(server: McpServer) -> Option<McpServerConfig> {
54    use McpServer::{Http, Sse, Stdio};
55    match server {
56        Stdio(stdio) => Some(
57            ServerConfig::Stdio {
58                name: stdio.name,
59                command: stdio.command.to_string_lossy().into_owned(),
60                args: stdio.args,
61                env: stdio.env.into_iter().map(|e| (e.name, e.value)).collect(),
62            }
63            .into(),
64        ),
65
66        Http(http) => Some(ServerConfig::Http { name: http.name, config: http_config(http.url, &http.headers) }.into()),
67
68        Sse(sse) => Some(ServerConfig::Http { name: sse.name, config: http_config(sse.url, &sse.headers) }.into()),
69
70        _ => None,
71    }
72}
73
74fn http_config(url: String, headers: &[HttpHeader]) -> StreamableHttpClientTransportConfig {
75    let auth_header = headers.iter().find(|h| h.name.eq_ignore_ascii_case("authorization")).map(|h| h.value.clone());
76
77    let mut config = StreamableHttpClientTransportConfig::with_uri(url);
78    if let Some(auth) = auth_header {
79        // rmcp's `auth_header` wants the bare token; it adds the `Bearer ` prefix itself.
80        let token = auth
81            .split_once(' ')
82            .filter(|(scheme, _)| scheme.eq_ignore_ascii_case("Bearer"))
83            .map_or(auth.as_str(), |(_, rest)| rest);
84        config = config.auth_header(token.to_string());
85    }
86    config
87}
88
89/// Converts Aether `AgentMessage` to ACP `SessionUpdate`
90pub fn map_agent_message_to_session_notification(
91    session_id: SessionId,
92    msg: &AgentMessage,
93) -> Option<SessionNotification> {
94    map_agent_message_to_notification(session_id, msg, NotificationMode::Live)
95}
96
97#[derive(Clone, Copy)]
98enum NotificationMode {
99    Live,
100    Replay,
101}
102
103fn map_agent_message_to_notification(
104    session_id: SessionId,
105    msg: &AgentMessage,
106    mode: NotificationMode,
107) -> Option<SessionNotification> {
108    match msg {
109        AgentMessage::Text { chunk, is_complete, .. } => {
110            map_chunk_to_notification(session_id, chunk, *is_complete, mode, SessionUpdate::AgentMessageChunk)
111        }
112
113        AgentMessage::Thought { chunk, is_complete, .. } => {
114            map_chunk_to_notification(session_id, chunk, *is_complete, mode, SessionUpdate::AgentThoughtChunk)
115        }
116
117        AgentMessage::ToolCall { request, .. } => Some(map_tool_call_to_notification(session_id, request)),
118
119        AgentMessage::ToolCallUpdate { tool_call_id, chunk, .. } => {
120            Some(map_tool_call_update_to_notification(session_id, tool_call_id, chunk))
121        }
122
123        AgentMessage::ToolResult { result, result_meta, .. } => {
124            Some(map_tool_result_to_notification(session_id, result, result_meta.as_ref()))
125        }
126
127        AgentMessage::ToolError { error, .. } => Some(map_tool_error_to_notification(session_id, error)),
128
129        AgentMessage::ToolProgress { request, progress, total, message } => {
130            map_tool_progress_to_notification(session_id, request, *progress, *total, message.as_ref())
131        }
132
133        AgentMessage::Error { message } => Some(acp::SessionNotification::new(
134            session_id,
135            SessionUpdate::AgentMessageChunk(ContentChunk::new(ContentBlock::Text(TextContent::new(format!(
136                "[Error] {message}"
137            ))))),
138        )),
139
140        AgentMessage::ContextUsageUpdate { .. }
141        | AgentMessage::ContextCleared
142        | AgentMessage::Cancelled { .. }
143        | AgentMessage::Done
144        | AgentMessage::ContextCompactionStarted { .. }
145        | AgentMessage::ContextCompactionResult { .. }
146        | AgentMessage::AutoContinue { .. }
147        | AgentMessage::ModelSwitched { .. } => None,
148    }
149}
150
151/// Typed union of agent-side extension notifications that the relay forwards
152/// to the client. Each variant serializes to its own `_aether/*` wire method
153/// and is sent via [`ConnectionTo<Client>::send_notification`].
154pub enum AgentExtNotification {
155    ContextUsage(ContextUsageParams),
156    ContextCleared(ContextClearedParams),
157    SubAgentProgress(SubAgentProgressParams),
158}
159
160pub fn try_into_agent_notification(msg: &AgentMessage) -> Option<AgentExtNotification> {
161    match msg {
162        AgentMessage::ContextUsageUpdate {
163            usage_ratio,
164            context_limit,
165            input_tokens,
166            output_tokens,
167            cache_read_tokens,
168            cache_creation_tokens,
169            reasoning_tokens,
170            total_input_tokens,
171            total_output_tokens,
172            total_cache_read_tokens,
173            total_cache_creation_tokens,
174            total_reasoning_tokens,
175        } => Some(AgentExtNotification::ContextUsage(ContextUsageParams {
176            usage_ratio: *usage_ratio,
177            context_limit: *context_limit,
178            input_tokens: *input_tokens,
179            output_tokens: *output_tokens,
180            cache_read_tokens: *cache_read_tokens,
181            cache_creation_tokens: *cache_creation_tokens,
182            reasoning_tokens: *reasoning_tokens,
183            total_input_tokens: *total_input_tokens,
184            total_output_tokens: *total_output_tokens,
185            total_cache_read_tokens: *total_cache_read_tokens,
186            total_cache_creation_tokens: *total_cache_creation_tokens,
187            total_reasoning_tokens: *total_reasoning_tokens,
188        })),
189        AgentMessage::ToolProgress { request, message, .. } => {
190            let msg_str = message.as_ref()?;
191            let params = try_parse_sub_agent_progress(msg_str, request)?;
192            Some(AgentExtNotification::SubAgentProgress(params))
193        }
194        AgentMessage::ContextCleared => Some(AgentExtNotification::ContextCleared(ContextClearedParams::default())),
195        _ => None,
196    }
197}
198
199/// If the tool result carries plan metadata, build a `SessionUpdate::Plan` notification.
200pub fn try_extract_plan_notification(
201    session_id: SessionId,
202    result_meta: Option<&ToolResultMeta>,
203) -> Option<SessionNotification> {
204    let plan_meta = result_meta?.plan.as_ref()?;
205    let entries = plan_meta
206        .entries
207        .iter()
208        .map(|e| PlanEntry::new(e.content.clone(), PlanEntryPriority::Medium, plan_status_to_acp(e.status)))
209        .collect();
210    Some(SessionNotification::new(session_id, SessionUpdate::Plan(acp::Plan::new(entries))))
211}
212
213/// Convert internal plan status to ACP protocol status.
214fn plan_status_to_acp(status: PlanMetaStatus) -> PlanEntryStatus {
215    match status {
216        PlanMetaStatus::InProgress => PlanEntryStatus::InProgress,
217        PlanMetaStatus::Completed => PlanEntryStatus::Completed,
218        PlanMetaStatus::Pending => PlanEntryStatus::Pending,
219    }
220}
221
222/// Determines the stop reason from the final agent message
223pub fn map_agent_message_to_stop_reason(msg: &AgentMessage) -> acp::StopReason {
224    match msg {
225        AgentMessage::Cancelled { .. } => StopReason::Cancelled,
226        _ => StopReason::EndTurn,
227    }
228}
229
230fn map_chunk_to_notification(
231    session_id: SessionId,
232    chunk: &str,
233    is_complete: bool,
234    mode: NotificationMode,
235    wrap: fn(ContentChunk) -> SessionUpdate,
236) -> Option<SessionNotification> {
237    match mode {
238        // Skip the final completion message to avoid sending duplicate content.
239        // The client has already received all the chunks during streaming.
240        NotificationMode::Live if is_complete => return None,
241        NotificationMode::Replay if !is_complete => return None,
242        NotificationMode::Live | NotificationMode::Replay => {}
243    }
244
245    Some(acp::SessionNotification::new(
246        session_id,
247        wrap(ContentChunk::new(ContentBlock::Text(TextContent::new(chunk.to_owned())))),
248    ))
249}
250
251fn map_tool_call_to_notification(session_id: SessionId, request: &ToolCallRequest) -> SessionNotification {
252    let raw_input = serde_json::from_str(&request.arguments).ok();
253    SessionNotification::new(
254        session_id,
255        SessionUpdate::ToolCall(
256            ToolCall::new(ToolCallId::new(request.id.clone()), humanize_tool_name(&request.name))
257                .status(acp::ToolCallStatus::InProgress)
258                .raw_input(raw_input),
259        ),
260    )
261}
262
263fn parse_tool_call_chunk(chunk: &str) -> serde_json::Value {
264    serde_json::from_str(chunk).unwrap_or_else(|_| serde_json::Value::String(chunk.to_string()))
265}
266
267fn map_tool_call_update_to_notification(session_id: SessionId, tool_call_id: &str, chunk: &str) -> SessionNotification {
268    let fields = ToolCallUpdateFields::new().status(ToolCallStatus::InProgress).raw_input(parse_tool_call_chunk(chunk));
269
270    SessionNotification::new(
271        session_id,
272        SessionUpdate::ToolCallUpdate(ToolCallUpdate::new(ToolCallId::new(tool_call_id.to_string()), fields)),
273    )
274}
275
276/// Produces the initial human-readable title for a tool call (e.g., "Read file").
277/// This is sent when the tool call starts.
278fn humanize_tool_name(name: &str) -> String {
279    let base = name.split("__").last().unwrap_or(name);
280    let mut result = base.replace('_', " ");
281    if let Some(first) = result.get_mut(0..1) {
282        first.make_ascii_uppercase();
283    }
284    result
285}
286
287fn map_tool_result_to_notification(
288    session_id: SessionId,
289    result: &ToolCallResult,
290    result_meta: Option<&ToolResultMeta>,
291) -> SessionNotification {
292    let mut content =
293        vec![ToolCallContent::Content(Content::new(ContentBlock::Text(TextContent::new(result.result.clone()))))];
294
295    if let Some(rm) = result_meta
296        && let Some(fd) = &rm.file_diff
297    {
298        let mut diff = Diff::new(&fd.path, &fd.new_text);
299        if let Some(old) = &fd.old_text {
300            diff = diff.old_text(old.clone());
301        }
302        content.push(ToolCallContent::Diff(diff));
303    }
304
305    let mut fields = ToolCallUpdateFields::new().status(ToolCallStatus::Completed).content(content);
306
307    if let Some(rm) = result_meta {
308        fields = fields.title(&rm.display.title);
309    }
310
311    let mut update = ToolCallUpdate::new(ToolCallId::new(result.id.clone()), fields);
312
313    if let Some(rm) = result_meta
314        && !rm.display.value.is_empty()
315    {
316        let mut meta_map = serde_json::Map::new();
317        meta_map.insert("display_value".into(), rm.display.value.clone().into());
318        update = update.meta(meta_map);
319    }
320
321    SessionNotification::new(session_id, SessionUpdate::ToolCallUpdate(update))
322}
323
324fn map_tool_error_to_notification(session_id: SessionId, error: &ToolCallError) -> SessionNotification {
325    SessionNotification::new(
326        session_id,
327        SessionUpdate::ToolCallUpdate(ToolCallUpdate::new(
328            ToolCallId::new(error.id.clone()),
329            ToolCallUpdateFields::new().status(ToolCallStatus::Failed).content(vec![ToolCallContent::Content(
330                Content::new(ContentBlock::Text(TextContent::new(error.error.clone()))),
331            )]),
332        )),
333    )
334}
335
336fn map_tool_progress_to_notification(
337    session_id: SessionId,
338    request: &ToolCallRequest,
339    progress: f64,
340    total: Option<f64>,
341    message: Option<&String>,
342) -> Option<SessionNotification> {
343    tracing::info!("Tool progress: {message:?}");
344
345    if message.and_then(|msg_str| try_parse_sub_agent_progress(msg_str, request)).is_some() {
346        return None;
347    }
348
349    if let Some(result_meta) = message.and_then(|m| try_parse_display_meta(m)) {
350        let fields = ToolCallUpdateFields::new().status(ToolCallStatus::InProgress).title(&result_meta.display.title);
351
352        let mut update = ToolCallUpdate::new(ToolCallId::new(request.id.clone()), fields);
353
354        if !result_meta.display.value.is_empty() {
355            let mut meta_map = serde_json::Map::new();
356            meta_map.insert("display_value".into(), result_meta.display.value.into());
357            update = update.meta(meta_map);
358        }
359
360        return Some(SessionNotification::new(session_id, SessionUpdate::ToolCallUpdate(update)));
361    }
362
363    let total_str = total.map_or_else(|| "?".to_string(), |t| t.to_string());
364    let progress_text = message
365        .map_or_else(|| format!("Progress: {progress}/{total_str}"), |msg| format!("{msg} ({progress}/{total_str})"));
366
367    Some(SessionNotification::new(
368        session_id,
369        SessionUpdate::ToolCallUpdate(ToolCallUpdate::new(
370            ToolCallId::new(request.id.clone()),
371            ToolCallUpdateFields::new().status(ToolCallStatus::InProgress).content(vec![ToolCallContent::Content(
372                Content::new(ContentBlock::Text(TextContent::new(progress_text))),
373            )]),
374        )),
375    ))
376}
377
378/// Replay session events to the client as ACP notifications.
379pub async fn replay_to_client(events: &[SessionEvent], connection: &ConnectionTo<Client>, session_id: &SessionId) {
380    for notif in replay_events_to_notifications(events, session_id) {
381        if let Err(e) = connection.send_notification(notif).map_err(|e| AcpServerError::protocol("session/update", e)) {
382            tracing::error!("Failed to send replay notification: {e:?}");
383        }
384    }
385}
386
387pub fn replay_events_to_notifications(events: &[SessionEvent], session_id: &SessionId) -> Vec<SessionNotification> {
388    let mut out = Vec::new();
389    for event in events {
390        match event {
391            SessionEvent::User(UserEvent::Message { content }) => {
392                for block in content {
393                    out.push(SessionNotification::new(
394                        session_id.clone(),
395                        SessionUpdate::UserMessageChunk(ContentChunk::new(map_user_content_block(block))),
396                    ));
397                }
398            }
399            SessionEvent::Agent(message) => {
400                out.extend(map_agent_message_to_notification(session_id.clone(), message, NotificationMode::Replay));
401            }
402            SessionEvent::User(_) => {}
403        }
404    }
405    out
406}
407
408fn map_user_content_block(block: &llm::ContentBlock) -> ContentBlock {
409    match block {
410        llm::ContentBlock::Text { text } => ContentBlock::Text(TextContent::new(text.clone())),
411        llm::ContentBlock::Image { data, mime_type } => {
412            ContentBlock::Image(acp::ImageContent::new(data.clone(), mime_type.clone()))
413        }
414        llm::ContentBlock::Audio { data, mime_type } => {
415            ContentBlock::Audio(acp::AudioContent::new(data.clone(), mime_type.clone()))
416        }
417    }
418}
419
420fn try_parse_display_meta(message: &str) -> Option<ToolResultMeta> {
421    serde_json::from_str::<ToolResultMeta>(message).ok()
422}
423
424/// Attempt to parse a tool progress message as sub-agent progress.
425fn try_parse_sub_agent_progress(message: &str, request: &llm::ToolCallRequest) -> Option<SubAgentProgressParams> {
426    let payload: SubAgentProgressPayload = serde_json::from_str(message).ok()?;
427
428    Some(SubAgentProgressParams {
429        parent_tool_id: request.id.clone(),
430        task_id: payload.task_id,
431        agent_name: payload.agent_name,
432        event: (&payload.event).into(),
433    })
434}
435
436#[cfg(test)]
437mod tests {
438    use super::*;
439    use acp_utils::notifications::SubAgentEvent;
440    use llm::ToolCallRequest;
441
442    #[test]
443    fn test_tool_progress_with_sub_agent_payload_emits_ext_notification() {
444        let session_id = acp::SessionId::new("test-session");
445
446        let payload = SubAgentProgressPayload {
447            task_id: "task_1".to_string(),
448            agent_name: "sub-agent".to_string(),
449            event: AgentMessage::Text {
450                message_id: "msg_1".to_string(),
451                chunk: "Hello".to_string(),
452                is_complete: false,
453                model_name: "TestModel".to_string(),
454            },
455        };
456        let serialized_msg = serde_json::to_string(&payload).unwrap();
457
458        let tool_progress = AgentMessage::ToolProgress {
459            request: ToolCallRequest {
460                id: "call_123".to_string(),
461                name: "plugins__spawn_subagent".to_string(),
462                arguments: "{}".to_string(),
463            },
464            progress: 42.0,
465            total: Some(100.0),
466            message: Some(serialized_msg.clone()),
467        };
468
469        let notification = map_agent_message_to_session_notification(session_id.clone(), &tool_progress);
470
471        assert!(notification.is_none());
472
473        let agent_notif = try_into_agent_notification(&tool_progress).expect("agent notification");
474        match agent_notif {
475            AgentExtNotification::SubAgentProgress(params) => {
476                assert_eq!(params.parent_tool_id, "call_123");
477                assert_eq!(params.task_id, "task_1");
478                assert_eq!(params.agent_name, "sub-agent");
479                assert!(matches!(params.event, SubAgentEvent::Other));
480            }
481            _ => panic!("expected SubAgentProgress"),
482        }
483    }
484
485    #[test]
486    fn replay_emits_user_media_chunks_in_order() {
487        let session_id = acp::SessionId::new("test-session");
488        let events = vec![SessionEvent::User(UserEvent::Message {
489            content: vec![
490                llm::ContentBlock::text("hello"),
491                llm::ContentBlock::Image { data: "aW1n".to_string(), mime_type: "image/png".to_string() },
492                llm::ContentBlock::Audio { data: "YXVkaW8=".to_string(), mime_type: "audio/wav".to_string() },
493            ],
494        })];
495
496        let notifications = replay_events_to_notifications(&events, &session_id);
497        let updates: Vec<_> = notifications.into_iter().map(|n| n.update).collect();
498        assert!(matches!(
499            &updates[0],
500            acp::SessionUpdate::UserMessageChunk(chunk)
501                if matches!(&chunk.content, acp::ContentBlock::Text(text) if text.text == "hello")
502        ));
503        assert!(matches!(
504            &updates[1],
505            acp::SessionUpdate::UserMessageChunk(chunk)
506                if matches!(&chunk.content, acp::ContentBlock::Image(_))
507        ));
508        assert!(matches!(
509            &updates[2],
510            acp::SessionUpdate::UserMessageChunk(chunk)
511                if matches!(&chunk.content, acp::ContentBlock::Audio(_))
512        ));
513    }
514
515    #[test]
516    fn test_thought_maps_to_agent_thought_chunk() {
517        let session_id = acp::SessionId::new("test-session");
518        let thought = AgentMessage::Thought {
519            message_id: "msg_1".to_string(),
520            chunk: "thinking...".to_string(),
521            is_complete: false,
522            model_name: "TestModel".to_string(),
523        };
524
525        let notification = map_agent_message_to_session_notification(session_id, &thought).expect("notification");
526
527        match notification.update {
528            acp::SessionUpdate::AgentThoughtChunk(chunk) => match chunk.content {
529                acp::ContentBlock::Text(text) => assert_eq!(text.text, "thinking..."),
530                other => panic!("Expected text content, got {other:?}"),
531            },
532            other => panic!("Expected AgentThoughtChunk, got {other:?}"),
533        }
534    }
535
536    #[test]
537    fn test_tool_call_maps_to_tool_call_notification() {
538        let session_id = acp::SessionId::new("test-session");
539        let message = AgentMessage::ToolCall {
540            request: ToolCallRequest {
541                id: "call_1".to_string(),
542                name: "coding__read_file".to_string(),
543                arguments: "{}".to_string(),
544            },
545            model_name: "TestModel".to_string(),
546        };
547
548        let notification = map_agent_message_to_session_notification(session_id, &message).expect("notification");
549
550        match notification.update {
551            acp::SessionUpdate::ToolCall(tool_call) => {
552                assert_eq!(tool_call.tool_call_id.0.as_ref(), "call_1");
553                assert_eq!(tool_call.title, "Read file");
554                assert_eq!(tool_call.status, acp::ToolCallStatus::InProgress);
555            }
556            other => panic!("Expected ToolCall, got {other:?}"),
557        }
558    }
559
560    #[test]
561    fn test_tool_call_update_maps_to_tool_call_update_notification() {
562        let session_id = acp::SessionId::new("test-session");
563        let message = AgentMessage::ToolCallUpdate {
564            tool_call_id: "call_1".to_string(),
565            chunk: r#"{"filePath":"Cargo.toml"}"#.to_string(),
566            model_name: "TestModel".to_string(),
567        };
568
569        let notification = map_agent_message_to_session_notification(session_id, &message).expect("notification");
570
571        match notification.update {
572            acp::SessionUpdate::ToolCallUpdate(update) => {
573                assert_eq!(update.tool_call_id.0.as_ref(), "call_1");
574                assert_eq!(update.fields.status, Some(acp::ToolCallStatus::InProgress));
575                assert_eq!(update.fields.raw_input, Some(serde_json::json!({ "filePath": "Cargo.toml" })));
576            }
577            other => panic!("Expected ToolCallUpdate, got {other:?}"),
578        }
579    }
580
581    #[test]
582    fn test_tool_call_update_has_same_live_and_replay_mapping() {
583        let session_id = acp::SessionId::new("test-session");
584        let message = AgentMessage::ToolCallUpdate {
585            tool_call_id: "call_1".to_string(),
586            chunk: r#"{"filePath":"Cargo.toml"}"#.to_string(),
587            model_name: "TestModel".to_string(),
588        };
589
590        let live = map_agent_message_to_notification(session_id.clone(), &message, NotificationMode::Live)
591            .expect("live notification");
592        let replay = map_agent_message_to_notification(session_id, &message, NotificationMode::Replay)
593            .expect("replay notification");
594
595        match (live.update, replay.update) {
596            (acp::SessionUpdate::ToolCallUpdate(live), acp::SessionUpdate::ToolCallUpdate(replay)) => {
597                assert_eq!(live.tool_call_id.0, replay.tool_call_id.0);
598                assert_eq!(live.fields.status, replay.fields.status);
599                assert_eq!(live.fields.raw_input, replay.fields.raw_input);
600            }
601            other => panic!("Expected ToolCallUpdate pair, got {other:?}"),
602        }
603    }
604
605    #[test]
606    fn test_live_mapping_skips_completed_chunks_but_replay_keeps_them() {
607        let cases: Vec<(AgentMessage, &str)> = vec![
608            (
609                AgentMessage::Text {
610                    message_id: "msg_1".to_string(),
611                    chunk: "done".to_string(),
612                    is_complete: true,
613                    model_name: "TestModel".to_string(),
614                },
615                "done",
616            ),
617            (
618                AgentMessage::Thought {
619                    message_id: "msg_1".to_string(),
620                    chunk: "final reasoning".to_string(),
621                    is_complete: true,
622                    model_name: "TestModel".to_string(),
623                },
624                "final reasoning",
625            ),
626        ];
627
628        for (message, expected_text) in cases {
629            let session_id = acp::SessionId::new("test-session");
630            assert!(
631                map_agent_message_to_notification(session_id.clone(), &message, NotificationMode::Live).is_none(),
632                "live mode should skip completed chunk"
633            );
634
635            let notification = map_agent_message_to_notification(session_id, &message, NotificationMode::Replay)
636                .expect("replay notification");
637
638            match notification.update {
639                acp::SessionUpdate::AgentMessageChunk(chunk) | acp::SessionUpdate::AgentThoughtChunk(chunk) => {
640                    match chunk.content {
641                        acp::ContentBlock::Text(text) => assert_eq!(text.text, expected_text),
642                        other => panic!("Expected text content, got {other:?}"),
643                    }
644                }
645                other => panic!("Expected chunk update, got {other:?}"),
646            }
647        }
648    }
649
650    #[test]
651    fn test_context_cleared_maps_to_agent_notification() {
652        let notif = try_into_agent_notification(&AgentMessage::ContextCleared)
653            .expect("context cleared should emit agent notification");
654        assert!(matches!(notif, AgentExtNotification::ContextCleared(_)));
655    }
656
657    #[test]
658    fn test_tool_progress_with_invalid_json_falls_back_to_simple_message() {
659        let session_id = acp::SessionId::new("test-session");
660
661        // Simulate a tool progress message with invalid JSON
662        let tool_progress = AgentMessage::ToolProgress {
663            request: ToolCallRequest {
664                id: "call_456".to_string(),
665                name: "some_tool".to_string(),
666                arguments: "{}".to_string(),
667            },
668            progress: 50.0,
669            total: None,
670            message: Some("not valid json".to_string()),
671        };
672
673        let notification = map_agent_message_to_session_notification(session_id.clone(), &tool_progress);
674
675        assert!(notification.is_some());
676
677        // Should still produce a notification with the message as-is
678        let notification = notification.unwrap();
679        match notification.update {
680            acp::SessionUpdate::ToolCallUpdate(update) => {
681                if let Some(content) = &update.fields.content
682                    && let acp::ToolCallContent::Content(c) = &content[0]
683                    && let acp::ContentBlock::Text(text) = &c.content
684                {
685                    // Should contain the original message
686                    assert!(text.text.contains("not valid json"));
687                }
688            }
689            _ => panic!("Expected ToolCallUpdate"),
690        }
691    }
692
693    #[test]
694    fn test_map_acp_stdio_server() {
695        let server = acp::McpServer::Stdio(
696            acp::McpServerStdio::new("my-server", "/usr/bin/server")
697                .args(vec!["--port".into(), "8080".into()])
698                .env(vec![acp::EnvVariable::new("FOO", "bar")]),
699        );
700
701        let configs = map_acp_mcp_servers(vec![server]);
702        assert_eq!(configs.len(), 1);
703
704        match &configs[0] {
705            McpServerConfig::Server(ServerConfig::Stdio { name, command, args, env }) => {
706                assert_eq!(name, "my-server");
707                assert_eq!(command, "/usr/bin/server");
708                assert_eq!(args, &["--port", "8080"]);
709                assert_eq!(env.get("FOO").unwrap(), "bar");
710            }
711            other => panic!("Expected Stdio, got {other:?}"),
712        }
713    }
714
715    #[test]
716    fn test_map_acp_http_server() {
717        let server = acp::McpServer::Http(
718            acp::McpServerHttp::new("http-server", "https://example.com/mcp")
719                .headers(vec![acp::HttpHeader::new("Authorization", "Bearer token123")]),
720        );
721
722        let configs = map_acp_mcp_servers(vec![server]);
723        assert_eq!(configs.len(), 1);
724
725        match &configs[0] {
726            McpServerConfig::Server(ServerConfig::Http { name, config }) => {
727                assert_eq!(name, "http-server");
728                assert_eq!(config.uri.as_ref(), "https://example.com/mcp");
729                assert_eq!(config.auth_header.as_deref(), Some("token123"));
730            }
731            other => panic!("Expected Http, got {other:?}"),
732        }
733    }
734
735    #[test]
736    fn test_http_auth_header_strips_bearer_case_insensitively() {
737        let cases = [
738            ("Bearer token123", "token123"),
739            ("bearer token123", "token123"),
740            ("BEARER token123", "token123"),
741            ("bEaReR token123", "token123"),
742            // Non-Bearer scheme: pass through verbatim. rmcp will then prefix
743            // with "Bearer ", which is the contract for non-bearer auth too.
744            ("Token foo", "Token foo"),
745            ("token123", "token123"),
746        ];
747
748        for (input, expected) in cases {
749            let server = acp::McpServer::Http(
750                acp::McpServerHttp::new("http-server", "https://example.com/mcp")
751                    .headers(vec![acp::HttpHeader::new("Authorization", input)]),
752            );
753            let configs = map_acp_mcp_servers(vec![server]);
754            match &configs[0] {
755                McpServerConfig::Server(ServerConfig::Http { config, .. }) => {
756                    assert_eq!(config.auth_header.as_deref(), Some(expected), "input was {input:?}");
757                }
758                other => panic!("Expected Http, got {other:?}"),
759            }
760        }
761    }
762
763    #[test]
764    fn test_map_acp_sse_server() {
765        let server = acp::McpServer::Sse(acp::McpServerSse::new("sse-server", "https://example.com/sse"));
766
767        let configs = map_acp_mcp_servers(vec![server]);
768        assert_eq!(configs.len(), 1);
769
770        match &configs[0] {
771            McpServerConfig::Server(ServerConfig::Http { name, config }) => {
772                assert_eq!(name, "sse-server");
773                assert_eq!(config.uri.as_ref(), "https://example.com/sse");
774                assert_eq!(config.auth_header, None);
775            }
776            other => panic!("Expected Http, got {other:?}"),
777        }
778    }
779
780    #[test]
781    fn test_humanize_tool_name() {
782        assert_eq!(humanize_tool_name("coding__read_file"), "Read file");
783        assert_eq!(humanize_tool_name("read_file"), "Read file");
784        assert_eq!(humanize_tool_name("bash"), "Bash");
785        assert_eq!(humanize_tool_name("plugins__coding__read_file"), "Read file");
786    }
787
788    #[test]
789    fn test_result_with_result_meta_sets_meta() {
790        use mcp_utils::display_meta::ToolDisplayMeta;
791
792        let session_id = acp::SessionId::new("test-session");
793        let result = ToolCallResult {
794            id: "call_1".to_string(),
795            name: "coding__read_file".to_string(),
796            arguments: "{}".to_string(),
797            result: "file contents".to_string(),
798        };
799        let rm: ToolResultMeta = ToolDisplayMeta::new("Read file", "Cargo.toml, 156 lines").into();
800
801        let notification = map_tool_result_to_notification(session_id, &result, Some(&rm));
802        match notification.update {
803            acp::SessionUpdate::ToolCallUpdate(update) => {
804                assert_eq!(update.fields.title.as_deref(), Some("Read file"), "native title should be set");
805                let meta = update.meta.expect("meta should be present");
806                assert_eq!(
807                    meta.get("display_value").and_then(|v| v.as_str()),
808                    Some("Cargo.toml, 156 lines"),
809                    "display_value should be a flat key in _meta"
810                );
811                assert!(meta.get("display").is_none(), "old nested display object should not be in _meta");
812            }
813            other => panic!("Expected ToolCallUpdate, got {other:?}"),
814        }
815    }
816
817    #[test]
818    fn test_result_without_result_meta() {
819        let session_id = acp::SessionId::new("test-session");
820        let result = ToolCallResult {
821            id: "call_1".to_string(),
822            name: "external__some_tool".to_string(),
823            arguments: "{}".to_string(),
824            result: "ok".to_string(),
825        };
826
827        let notification = map_tool_result_to_notification(session_id, &result, None);
828        match notification.update {
829            acp::SessionUpdate::ToolCallUpdate(update) => {
830                assert!(update.fields.title.is_none());
831                assert!(update.meta.is_none());
832            }
833            other => panic!("Expected ToolCallUpdate, got {other:?}"),
834        }
835    }
836
837    #[test]
838    fn test_plan_notification_extracted_from_result_meta() {
839        use mcp_utils::display_meta::{PlanMeta, PlanMetaEntry, PlanMetaStatus, ToolDisplayMeta};
840
841        let session_id = acp::SessionId::new("test-session");
842        let meta = ToolResultMeta::with_plan(
843            ToolDisplayMeta::new("Todo", "Research AI agents"),
844            PlanMeta {
845                entries: vec![
846                    PlanMetaEntry { content: "Research AI agents".to_string(), status: PlanMetaStatus::InProgress },
847                    PlanMetaEntry { content: "Write tests".to_string(), status: PlanMetaStatus::Pending },
848                ],
849            },
850        );
851
852        let notification = try_extract_plan_notification(session_id, Some(&meta)).expect("should produce plan");
853        match notification.update {
854            acp::SessionUpdate::Plan(plan) => {
855                assert_eq!(plan.entries.len(), 2);
856                assert_eq!(plan.entries[0].content, "Research AI agents");
857                assert_eq!(plan.entries[0].status, acp::PlanEntryStatus::InProgress);
858                assert_eq!(plan.entries[1].content, "Write tests");
859                assert_eq!(plan.entries[1].status, acp::PlanEntryStatus::Pending);
860            }
861            other => panic!("Expected Plan, got {other:?}"),
862        }
863    }
864
865    #[test]
866    fn test_plan_notification_none_when_no_plan_or_no_meta() {
867        use mcp_utils::display_meta::ToolDisplayMeta;
868
869        let sid = acp::SessionId::new("test-session");
870        let meta: ToolResultMeta = ToolDisplayMeta::new("Read file", "main.rs").into();
871        assert!(try_extract_plan_notification(sid.clone(), Some(&meta)).is_none());
872        assert!(try_extract_plan_notification(sid, None).is_none());
873    }
874
875    #[test]
876    fn test_tool_progress_with_display_meta_emits_meta_update() {
877        use mcp_utils::display_meta::ToolDisplayMeta;
878
879        let session_id = acp::SessionId::new("test-session");
880        let meta = ToolResultMeta::from(ToolDisplayMeta::new("Read file", "main.rs"));
881        let serialized = serde_json::to_string(&meta).unwrap();
882
883        let request = ToolCallRequest {
884            id: "call_789".to_string(),
885            name: "coding__read_file".to_string(),
886            arguments: "{}".to_string(),
887        };
888
889        let notification = map_tool_progress_to_notification(session_id, &request, 0.0, None, Some(&serialized))
890            .expect("should produce notification");
891
892        match notification.update {
893            acp::SessionUpdate::ToolCallUpdate(update) => {
894                assert_eq!(&*update.tool_call_id.0, "call_789");
895                assert_eq!(update.fields.title.as_deref(), Some("Read file"), "native title should be set");
896                let meta_map = update.meta.expect("meta should be present");
897                assert_eq!(
898                    meta_map.get("display_value").and_then(|v| v.as_str()),
899                    Some("main.rs"),
900                    "display_value should be a flat key in _meta"
901                );
902                assert!(meta_map.get("display").is_none(), "old nested display object should not be in _meta");
903                assert_eq!(update.fields.status, Some(acp::ToolCallStatus::InProgress));
904                // Should NOT have content (no text progress fallback)
905                assert!(update.fields.content.is_none());
906            }
907            other => panic!("Expected ToolCallUpdate, got {other:?}"),
908        }
909    }
910
911    #[tokio::test(flavor = "current_thread")]
912    async fn replay_to_client_forwards_each_event_as_session_notification() {
913        use acp_utils::testing::test_connection;
914        use llm::ContentBlock as LlmContentBlock;
915        use tokio::task::LocalSet;
916
917        LocalSet::new()
918            .run_until(async {
919                let (cx, mut peer) = test_connection().await;
920                let session_id = acp::SessionId::new("test-session");
921                let events = vec![SessionEvent::User(UserEvent::Message {
922                    content: vec![LlmContentBlock::text("hello"), LlmContentBlock::text("world")],
923                })];
924
925                replay_to_client(&events, &cx, &session_id).await;
926
927                for _ in 0..2 {
928                    let notif = peer.next_session_notification().await;
929                    assert_eq!(notif.session_id, session_id);
930                    assert!(matches!(notif.update, SessionUpdate::UserMessageChunk(_)));
931                }
932            })
933            .await;
934    }
935}