1use acp_utils::notifications::{ContextClearedParams, ContextUsageParams, SubAgentProgressParams};
2use acp_utils::server::AcpServerError;
3use aether_core::events::{AgentMessage, SubAgentProgressPayload};
4use agent_client_protocol::schema::{
5 self as acp, Content, ContentBlock, ContentChunk, Diff, HttpHeader, McpServer, PlanEntry, PlanEntryPriority,
6 PlanEntryStatus, SessionId, SessionNotification, SessionUpdate, StopReason, TextContent, ToolCall, ToolCallContent,
7 ToolCallId, ToolCallStatus, ToolCallUpdate, ToolCallUpdateFields,
8};
9use agent_client_protocol::{Client, ConnectionTo};
10use llm::{ToolCallError, ToolCallRequest, ToolCallResult};
11use mcp_utils::client::{McpServerConfig, ServerConfig};
12use mcp_utils::display_meta::{PlanMetaStatus, ToolResultMeta};
13use rmcp::model::Prompt as McpPrompt;
14use rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig;
15
16use aether_core::context::ext::{SessionEvent, UserEvent};
17
18pub fn map_mcp_prompt_to_available_command(prompt: &McpPrompt) -> acp::AvailableCommand {
23 let command_name = prompt.name.split("__").last().unwrap_or(prompt.name.as_ref()).to_string();
25
26 let hint = prompt
29 .arguments
30 .as_ref()
31 .and_then(|args| args.iter().find(|a| a.name.as_str() == "ARGUMENTS").and_then(|a| a.description.as_deref()))
32 .unwrap_or("optional arguments");
33 let input = Some(acp::AvailableCommandInput::Unstructured(acp::UnstructuredCommandInput::new(hint)));
34
35 let description = prompt.description.clone().unwrap_or_else(|| "No description available".to_string());
36
37 acp::AvailableCommand::new(command_name, description).input(input)
38}
39
40pub fn map_acp_mcp_servers(servers: Vec<McpServer>) -> Vec<McpServerConfig> {
42 servers
43 .into_iter()
44 .filter_map(|s| {
45 try_map_mcp_server(s).or_else(|| {
46 tracing::warn!("Unsupported ACP MCP server transport, skipping");
47 None
48 })
49 })
50 .collect()
51}
52
53fn try_map_mcp_server(server: McpServer) -> Option<McpServerConfig> {
54 use McpServer::{Http, Sse, Stdio};
55 match server {
56 Stdio(stdio) => Some(
57 ServerConfig::Stdio {
58 name: stdio.name,
59 command: stdio.command.to_string_lossy().into_owned(),
60 args: stdio.args,
61 env: stdio.env.into_iter().map(|e| (e.name, e.value)).collect(),
62 }
63 .into(),
64 ),
65
66 Http(http) => Some(ServerConfig::Http { name: http.name, config: http_config(http.url, &http.headers) }.into()),
67
68 Sse(sse) => Some(ServerConfig::Http { name: sse.name, config: http_config(sse.url, &sse.headers) }.into()),
69
70 _ => None,
71 }
72}
73
74fn http_config(url: String, headers: &[HttpHeader]) -> StreamableHttpClientTransportConfig {
75 let auth_header = headers.iter().find(|h| h.name.eq_ignore_ascii_case("authorization")).map(|h| h.value.clone());
76
77 let mut config = StreamableHttpClientTransportConfig::with_uri(url);
78 if let Some(auth) = auth_header {
79 let token = auth
81 .split_once(' ')
82 .filter(|(scheme, _)| scheme.eq_ignore_ascii_case("Bearer"))
83 .map_or(auth.as_str(), |(_, rest)| rest);
84 config = config.auth_header(token.to_string());
85 }
86 config
87}
88
89pub fn map_agent_message_to_session_notification(
91 session_id: SessionId,
92 msg: &AgentMessage,
93) -> Option<SessionNotification> {
94 map_agent_message_to_notification(session_id, msg, NotificationMode::Live)
95}
96
97#[derive(Clone, Copy)]
98enum NotificationMode {
99 Live,
100 Replay,
101}
102
103fn map_agent_message_to_notification(
104 session_id: SessionId,
105 msg: &AgentMessage,
106 mode: NotificationMode,
107) -> Option<SessionNotification> {
108 match msg {
109 AgentMessage::Text { chunk, is_complete, .. } => {
110 map_chunk_to_notification(session_id, chunk, *is_complete, mode, SessionUpdate::AgentMessageChunk)
111 }
112
113 AgentMessage::Thought { chunk, is_complete, .. } => {
114 map_chunk_to_notification(session_id, chunk, *is_complete, mode, SessionUpdate::AgentThoughtChunk)
115 }
116
117 AgentMessage::ToolCall { request, .. } => Some(map_tool_call_to_notification(session_id, request)),
118
119 AgentMessage::ToolCallUpdate { tool_call_id, chunk, .. } => {
120 Some(map_tool_call_update_to_notification(session_id, tool_call_id, chunk))
121 }
122
123 AgentMessage::ToolResult { result, result_meta, .. } => {
124 Some(map_tool_result_to_notification(session_id, result, result_meta.as_ref()))
125 }
126
127 AgentMessage::ToolError { error, .. } => Some(map_tool_error_to_notification(session_id, error)),
128
129 AgentMessage::ToolProgress { request, progress, total, message } => {
130 map_tool_progress_to_notification(session_id, request, *progress, *total, message.as_ref())
131 }
132
133 AgentMessage::Error { message } => Some(acp::SessionNotification::new(
134 session_id,
135 SessionUpdate::AgentMessageChunk(ContentChunk::new(ContentBlock::Text(TextContent::new(format!(
136 "[Error] {message}"
137 ))))),
138 )),
139
140 AgentMessage::ContextUsageUpdate { .. }
141 | AgentMessage::ContextCleared
142 | AgentMessage::Cancelled { .. }
143 | AgentMessage::Done
144 | AgentMessage::ContextCompactionStarted { .. }
145 | AgentMessage::ContextCompactionResult { .. }
146 | AgentMessage::AutoContinue { .. }
147 | AgentMessage::ModelSwitched { .. } => None,
148 }
149}
150
151pub enum AgentExtNotification {
155 ContextUsage(ContextUsageParams),
156 ContextCleared(ContextClearedParams),
157 SubAgentProgress(SubAgentProgressParams),
158}
159
160pub fn try_into_agent_notification(msg: &AgentMessage) -> Option<AgentExtNotification> {
161 match msg {
162 AgentMessage::ContextUsageUpdate {
163 usage_ratio,
164 context_limit,
165 input_tokens,
166 output_tokens,
167 cache_read_tokens,
168 cache_creation_tokens,
169 reasoning_tokens,
170 total_input_tokens,
171 total_output_tokens,
172 total_cache_read_tokens,
173 total_cache_creation_tokens,
174 total_reasoning_tokens,
175 } => Some(AgentExtNotification::ContextUsage(ContextUsageParams {
176 usage_ratio: *usage_ratio,
177 context_limit: *context_limit,
178 input_tokens: *input_tokens,
179 output_tokens: *output_tokens,
180 cache_read_tokens: *cache_read_tokens,
181 cache_creation_tokens: *cache_creation_tokens,
182 reasoning_tokens: *reasoning_tokens,
183 total_input_tokens: *total_input_tokens,
184 total_output_tokens: *total_output_tokens,
185 total_cache_read_tokens: *total_cache_read_tokens,
186 total_cache_creation_tokens: *total_cache_creation_tokens,
187 total_reasoning_tokens: *total_reasoning_tokens,
188 })),
189 AgentMessage::ToolProgress { request, message, .. } => {
190 let msg_str = message.as_ref()?;
191 let params = try_parse_sub_agent_progress(msg_str, request)?;
192 Some(AgentExtNotification::SubAgentProgress(params))
193 }
194 AgentMessage::ContextCleared => Some(AgentExtNotification::ContextCleared(ContextClearedParams::default())),
195 _ => None,
196 }
197}
198
199pub fn try_extract_plan_notification(
201 session_id: SessionId,
202 result_meta: Option<&ToolResultMeta>,
203) -> Option<SessionNotification> {
204 let plan_meta = result_meta?.plan.as_ref()?;
205 let entries = plan_meta
206 .entries
207 .iter()
208 .map(|e| PlanEntry::new(e.content.clone(), PlanEntryPriority::Medium, plan_status_to_acp(e.status)))
209 .collect();
210 Some(SessionNotification::new(session_id, SessionUpdate::Plan(acp::Plan::new(entries))))
211}
212
213fn plan_status_to_acp(status: PlanMetaStatus) -> PlanEntryStatus {
215 match status {
216 PlanMetaStatus::InProgress => PlanEntryStatus::InProgress,
217 PlanMetaStatus::Completed => PlanEntryStatus::Completed,
218 PlanMetaStatus::Pending => PlanEntryStatus::Pending,
219 }
220}
221
222pub fn map_agent_message_to_stop_reason(msg: &AgentMessage) -> acp::StopReason {
224 match msg {
225 AgentMessage::Cancelled { .. } => StopReason::Cancelled,
226 _ => StopReason::EndTurn,
227 }
228}
229
230fn map_chunk_to_notification(
231 session_id: SessionId,
232 chunk: &str,
233 is_complete: bool,
234 mode: NotificationMode,
235 wrap: fn(ContentChunk) -> SessionUpdate,
236) -> Option<SessionNotification> {
237 match mode {
238 NotificationMode::Live if is_complete => return None,
241 NotificationMode::Replay if !is_complete => return None,
242 NotificationMode::Live | NotificationMode::Replay => {}
243 }
244
245 Some(acp::SessionNotification::new(
246 session_id,
247 wrap(ContentChunk::new(ContentBlock::Text(TextContent::new(chunk.to_owned())))),
248 ))
249}
250
251fn map_tool_call_to_notification(session_id: SessionId, request: &ToolCallRequest) -> SessionNotification {
252 let raw_input = serde_json::from_str(&request.arguments).ok();
253 SessionNotification::new(
254 session_id,
255 SessionUpdate::ToolCall(
256 ToolCall::new(ToolCallId::new(request.id.clone()), humanize_tool_name(&request.name))
257 .status(acp::ToolCallStatus::InProgress)
258 .raw_input(raw_input),
259 ),
260 )
261}
262
263fn parse_tool_call_chunk(chunk: &str) -> serde_json::Value {
264 serde_json::from_str(chunk).unwrap_or_else(|_| serde_json::Value::String(chunk.to_string()))
265}
266
267fn map_tool_call_update_to_notification(session_id: SessionId, tool_call_id: &str, chunk: &str) -> SessionNotification {
268 let fields = ToolCallUpdateFields::new().status(ToolCallStatus::InProgress).raw_input(parse_tool_call_chunk(chunk));
269
270 SessionNotification::new(
271 session_id,
272 SessionUpdate::ToolCallUpdate(ToolCallUpdate::new(ToolCallId::new(tool_call_id.to_string()), fields)),
273 )
274}
275
276fn humanize_tool_name(name: &str) -> String {
279 let base = name.split("__").last().unwrap_or(name);
280 let mut result = base.replace('_', " ");
281 if let Some(first) = result.get_mut(0..1) {
282 first.make_ascii_uppercase();
283 }
284 result
285}
286
287fn map_tool_result_to_notification(
288 session_id: SessionId,
289 result: &ToolCallResult,
290 result_meta: Option<&ToolResultMeta>,
291) -> SessionNotification {
292 let mut content =
293 vec![ToolCallContent::Content(Content::new(ContentBlock::Text(TextContent::new(result.result.clone()))))];
294
295 if let Some(rm) = result_meta
296 && let Some(fd) = &rm.file_diff
297 {
298 let mut diff = Diff::new(&fd.path, &fd.new_text);
299 if let Some(old) = &fd.old_text {
300 diff = diff.old_text(old.clone());
301 }
302 content.push(ToolCallContent::Diff(diff));
303 }
304
305 let mut fields = ToolCallUpdateFields::new().status(ToolCallStatus::Completed).content(content);
306
307 if let Some(rm) = result_meta {
308 fields = fields.title(&rm.display.title);
309 }
310
311 let mut update = ToolCallUpdate::new(ToolCallId::new(result.id.clone()), fields);
312
313 if let Some(rm) = result_meta
314 && !rm.display.value.is_empty()
315 {
316 let mut meta_map = serde_json::Map::new();
317 meta_map.insert("display_value".into(), rm.display.value.clone().into());
318 update = update.meta(meta_map);
319 }
320
321 SessionNotification::new(session_id, SessionUpdate::ToolCallUpdate(update))
322}
323
324fn map_tool_error_to_notification(session_id: SessionId, error: &ToolCallError) -> SessionNotification {
325 SessionNotification::new(
326 session_id,
327 SessionUpdate::ToolCallUpdate(ToolCallUpdate::new(
328 ToolCallId::new(error.id.clone()),
329 ToolCallUpdateFields::new().status(ToolCallStatus::Failed).content(vec![ToolCallContent::Content(
330 Content::new(ContentBlock::Text(TextContent::new(error.error.clone()))),
331 )]),
332 )),
333 )
334}
335
336fn map_tool_progress_to_notification(
337 session_id: SessionId,
338 request: &ToolCallRequest,
339 progress: f64,
340 total: Option<f64>,
341 message: Option<&String>,
342) -> Option<SessionNotification> {
343 tracing::info!("Tool progress: {message:?}");
344
345 if message.and_then(|msg_str| try_parse_sub_agent_progress(msg_str, request)).is_some() {
346 return None;
347 }
348
349 if let Some(result_meta) = message.and_then(|m| try_parse_display_meta(m)) {
350 let fields = ToolCallUpdateFields::new().status(ToolCallStatus::InProgress).title(&result_meta.display.title);
351
352 let mut update = ToolCallUpdate::new(ToolCallId::new(request.id.clone()), fields);
353
354 if !result_meta.display.value.is_empty() {
355 let mut meta_map = serde_json::Map::new();
356 meta_map.insert("display_value".into(), result_meta.display.value.into());
357 update = update.meta(meta_map);
358 }
359
360 return Some(SessionNotification::new(session_id, SessionUpdate::ToolCallUpdate(update)));
361 }
362
363 let total_str = total.map_or_else(|| "?".to_string(), |t| t.to_string());
364 let progress_text = message
365 .map_or_else(|| format!("Progress: {progress}/{total_str}"), |msg| format!("{msg} ({progress}/{total_str})"));
366
367 Some(SessionNotification::new(
368 session_id,
369 SessionUpdate::ToolCallUpdate(ToolCallUpdate::new(
370 ToolCallId::new(request.id.clone()),
371 ToolCallUpdateFields::new().status(ToolCallStatus::InProgress).content(vec![ToolCallContent::Content(
372 Content::new(ContentBlock::Text(TextContent::new(progress_text))),
373 )]),
374 )),
375 ))
376}
377
378pub async fn replay_to_client(events: &[SessionEvent], connection: &ConnectionTo<Client>, session_id: &SessionId) {
380 for notif in replay_events_to_notifications(events, session_id) {
381 if let Err(e) = connection.send_notification(notif).map_err(|e| AcpServerError::protocol("session/update", e)) {
382 tracing::error!("Failed to send replay notification: {e:?}");
383 }
384 }
385}
386
387pub fn replay_events_to_notifications(events: &[SessionEvent], session_id: &SessionId) -> Vec<SessionNotification> {
388 let mut out = Vec::new();
389 for event in events {
390 match event {
391 SessionEvent::User(UserEvent::Message { content }) => {
392 for block in content {
393 out.push(SessionNotification::new(
394 session_id.clone(),
395 SessionUpdate::UserMessageChunk(ContentChunk::new(map_user_content_block(block))),
396 ));
397 }
398 }
399 SessionEvent::Agent(message) => {
400 out.extend(map_agent_message_to_notification(session_id.clone(), message, NotificationMode::Replay));
401 }
402 SessionEvent::User(_) => {}
403 }
404 }
405 out
406}
407
408fn map_user_content_block(block: &llm::ContentBlock) -> ContentBlock {
409 match block {
410 llm::ContentBlock::Text { text } => ContentBlock::Text(TextContent::new(text.clone())),
411 llm::ContentBlock::Image { data, mime_type } => {
412 ContentBlock::Image(acp::ImageContent::new(data.clone(), mime_type.clone()))
413 }
414 llm::ContentBlock::Audio { data, mime_type } => {
415 ContentBlock::Audio(acp::AudioContent::new(data.clone(), mime_type.clone()))
416 }
417 }
418}
419
420fn try_parse_display_meta(message: &str) -> Option<ToolResultMeta> {
421 serde_json::from_str::<ToolResultMeta>(message).ok()
422}
423
424fn try_parse_sub_agent_progress(message: &str, request: &llm::ToolCallRequest) -> Option<SubAgentProgressParams> {
426 let payload: SubAgentProgressPayload = serde_json::from_str(message).ok()?;
427
428 Some(SubAgentProgressParams {
429 parent_tool_id: request.id.clone(),
430 task_id: payload.task_id,
431 agent_name: payload.agent_name,
432 event: (&payload.event).into(),
433 })
434}
435
436#[cfg(test)]
437mod tests {
438 use super::*;
439 use acp_utils::notifications::SubAgentEvent;
440 use llm::ToolCallRequest;
441
442 #[test]
443 fn test_tool_progress_with_sub_agent_payload_emits_ext_notification() {
444 let session_id = acp::SessionId::new("test-session");
445
446 let payload = SubAgentProgressPayload {
447 task_id: "task_1".to_string(),
448 agent_name: "sub-agent".to_string(),
449 event: AgentMessage::Text {
450 message_id: "msg_1".to_string(),
451 chunk: "Hello".to_string(),
452 is_complete: false,
453 model_name: "TestModel".to_string(),
454 },
455 };
456 let serialized_msg = serde_json::to_string(&payload).unwrap();
457
458 let tool_progress = AgentMessage::ToolProgress {
459 request: ToolCallRequest {
460 id: "call_123".to_string(),
461 name: "plugins__spawn_subagent".to_string(),
462 arguments: "{}".to_string(),
463 },
464 progress: 42.0,
465 total: Some(100.0),
466 message: Some(serialized_msg.clone()),
467 };
468
469 let notification = map_agent_message_to_session_notification(session_id.clone(), &tool_progress);
470
471 assert!(notification.is_none());
472
473 let agent_notif = try_into_agent_notification(&tool_progress).expect("agent notification");
474 match agent_notif {
475 AgentExtNotification::SubAgentProgress(params) => {
476 assert_eq!(params.parent_tool_id, "call_123");
477 assert_eq!(params.task_id, "task_1");
478 assert_eq!(params.agent_name, "sub-agent");
479 assert!(matches!(params.event, SubAgentEvent::Other));
480 }
481 _ => panic!("expected SubAgentProgress"),
482 }
483 }
484
485 #[test]
486 fn replay_emits_user_media_chunks_in_order() {
487 let session_id = acp::SessionId::new("test-session");
488 let events = vec![SessionEvent::User(UserEvent::Message {
489 content: vec![
490 llm::ContentBlock::text("hello"),
491 llm::ContentBlock::Image { data: "aW1n".to_string(), mime_type: "image/png".to_string() },
492 llm::ContentBlock::Audio { data: "YXVkaW8=".to_string(), mime_type: "audio/wav".to_string() },
493 ],
494 })];
495
496 let notifications = replay_events_to_notifications(&events, &session_id);
497 let updates: Vec<_> = notifications.into_iter().map(|n| n.update).collect();
498 assert!(matches!(
499 &updates[0],
500 acp::SessionUpdate::UserMessageChunk(chunk)
501 if matches!(&chunk.content, acp::ContentBlock::Text(text) if text.text == "hello")
502 ));
503 assert!(matches!(
504 &updates[1],
505 acp::SessionUpdate::UserMessageChunk(chunk)
506 if matches!(&chunk.content, acp::ContentBlock::Image(_))
507 ));
508 assert!(matches!(
509 &updates[2],
510 acp::SessionUpdate::UserMessageChunk(chunk)
511 if matches!(&chunk.content, acp::ContentBlock::Audio(_))
512 ));
513 }
514
515 #[test]
516 fn test_thought_maps_to_agent_thought_chunk() {
517 let session_id = acp::SessionId::new("test-session");
518 let thought = AgentMessage::Thought {
519 message_id: "msg_1".to_string(),
520 chunk: "thinking...".to_string(),
521 is_complete: false,
522 model_name: "TestModel".to_string(),
523 };
524
525 let notification = map_agent_message_to_session_notification(session_id, &thought).expect("notification");
526
527 match notification.update {
528 acp::SessionUpdate::AgentThoughtChunk(chunk) => match chunk.content {
529 acp::ContentBlock::Text(text) => assert_eq!(text.text, "thinking..."),
530 other => panic!("Expected text content, got {other:?}"),
531 },
532 other => panic!("Expected AgentThoughtChunk, got {other:?}"),
533 }
534 }
535
536 #[test]
537 fn test_tool_call_maps_to_tool_call_notification() {
538 let session_id = acp::SessionId::new("test-session");
539 let message = AgentMessage::ToolCall {
540 request: ToolCallRequest {
541 id: "call_1".to_string(),
542 name: "coding__read_file".to_string(),
543 arguments: "{}".to_string(),
544 },
545 model_name: "TestModel".to_string(),
546 };
547
548 let notification = map_agent_message_to_session_notification(session_id, &message).expect("notification");
549
550 match notification.update {
551 acp::SessionUpdate::ToolCall(tool_call) => {
552 assert_eq!(tool_call.tool_call_id.0.as_ref(), "call_1");
553 assert_eq!(tool_call.title, "Read file");
554 assert_eq!(tool_call.status, acp::ToolCallStatus::InProgress);
555 }
556 other => panic!("Expected ToolCall, got {other:?}"),
557 }
558 }
559
560 #[test]
561 fn test_tool_call_update_maps_to_tool_call_update_notification() {
562 let session_id = acp::SessionId::new("test-session");
563 let message = AgentMessage::ToolCallUpdate {
564 tool_call_id: "call_1".to_string(),
565 chunk: r#"{"filePath":"Cargo.toml"}"#.to_string(),
566 model_name: "TestModel".to_string(),
567 };
568
569 let notification = map_agent_message_to_session_notification(session_id, &message).expect("notification");
570
571 match notification.update {
572 acp::SessionUpdate::ToolCallUpdate(update) => {
573 assert_eq!(update.tool_call_id.0.as_ref(), "call_1");
574 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::InProgress));
575 assert_eq!(update.fields.raw_input, Some(serde_json::json!({ "filePath": "Cargo.toml" })));
576 }
577 other => panic!("Expected ToolCallUpdate, got {other:?}"),
578 }
579 }
580
581 #[test]
582 fn test_tool_call_update_has_same_live_and_replay_mapping() {
583 let session_id = acp::SessionId::new("test-session");
584 let message = AgentMessage::ToolCallUpdate {
585 tool_call_id: "call_1".to_string(),
586 chunk: r#"{"filePath":"Cargo.toml"}"#.to_string(),
587 model_name: "TestModel".to_string(),
588 };
589
590 let live = map_agent_message_to_notification(session_id.clone(), &message, NotificationMode::Live)
591 .expect("live notification");
592 let replay = map_agent_message_to_notification(session_id, &message, NotificationMode::Replay)
593 .expect("replay notification");
594
595 match (live.update, replay.update) {
596 (acp::SessionUpdate::ToolCallUpdate(live), acp::SessionUpdate::ToolCallUpdate(replay)) => {
597 assert_eq!(live.tool_call_id.0, replay.tool_call_id.0);
598 assert_eq!(live.fields.status, replay.fields.status);
599 assert_eq!(live.fields.raw_input, replay.fields.raw_input);
600 }
601 other => panic!("Expected ToolCallUpdate pair, got {other:?}"),
602 }
603 }
604
605 #[test]
606 fn test_live_mapping_skips_completed_chunks_but_replay_keeps_them() {
607 let cases: Vec<(AgentMessage, &str)> = vec![
608 (
609 AgentMessage::Text {
610 message_id: "msg_1".to_string(),
611 chunk: "done".to_string(),
612 is_complete: true,
613 model_name: "TestModel".to_string(),
614 },
615 "done",
616 ),
617 (
618 AgentMessage::Thought {
619 message_id: "msg_1".to_string(),
620 chunk: "final reasoning".to_string(),
621 is_complete: true,
622 model_name: "TestModel".to_string(),
623 },
624 "final reasoning",
625 ),
626 ];
627
628 for (message, expected_text) in cases {
629 let session_id = acp::SessionId::new("test-session");
630 assert!(
631 map_agent_message_to_notification(session_id.clone(), &message, NotificationMode::Live).is_none(),
632 "live mode should skip completed chunk"
633 );
634
635 let notification = map_agent_message_to_notification(session_id, &message, NotificationMode::Replay)
636 .expect("replay notification");
637
638 match notification.update {
639 acp::SessionUpdate::AgentMessageChunk(chunk) | acp::SessionUpdate::AgentThoughtChunk(chunk) => {
640 match chunk.content {
641 acp::ContentBlock::Text(text) => assert_eq!(text.text, expected_text),
642 other => panic!("Expected text content, got {other:?}"),
643 }
644 }
645 other => panic!("Expected chunk update, got {other:?}"),
646 }
647 }
648 }
649
650 #[test]
651 fn test_context_cleared_maps_to_agent_notification() {
652 let notif = try_into_agent_notification(&AgentMessage::ContextCleared)
653 .expect("context cleared should emit agent notification");
654 assert!(matches!(notif, AgentExtNotification::ContextCleared(_)));
655 }
656
657 #[test]
658 fn test_tool_progress_with_invalid_json_falls_back_to_simple_message() {
659 let session_id = acp::SessionId::new("test-session");
660
661 let tool_progress = AgentMessage::ToolProgress {
663 request: ToolCallRequest {
664 id: "call_456".to_string(),
665 name: "some_tool".to_string(),
666 arguments: "{}".to_string(),
667 },
668 progress: 50.0,
669 total: None,
670 message: Some("not valid json".to_string()),
671 };
672
673 let notification = map_agent_message_to_session_notification(session_id.clone(), &tool_progress);
674
675 assert!(notification.is_some());
676
677 let notification = notification.unwrap();
679 match notification.update {
680 acp::SessionUpdate::ToolCallUpdate(update) => {
681 if let Some(content) = &update.fields.content
682 && let acp::ToolCallContent::Content(c) = &content[0]
683 && let acp::ContentBlock::Text(text) = &c.content
684 {
685 assert!(text.text.contains("not valid json"));
687 }
688 }
689 _ => panic!("Expected ToolCallUpdate"),
690 }
691 }
692
693 #[test]
694 fn test_map_acp_stdio_server() {
695 let server = acp::McpServer::Stdio(
696 acp::McpServerStdio::new("my-server", "/usr/bin/server")
697 .args(vec!["--port".into(), "8080".into()])
698 .env(vec![acp::EnvVariable::new("FOO", "bar")]),
699 );
700
701 let configs = map_acp_mcp_servers(vec![server]);
702 assert_eq!(configs.len(), 1);
703
704 match &configs[0] {
705 McpServerConfig::Server(ServerConfig::Stdio { name, command, args, env }) => {
706 assert_eq!(name, "my-server");
707 assert_eq!(command, "/usr/bin/server");
708 assert_eq!(args, &["--port", "8080"]);
709 assert_eq!(env.get("FOO").unwrap(), "bar");
710 }
711 other => panic!("Expected Stdio, got {other:?}"),
712 }
713 }
714
715 #[test]
716 fn test_map_acp_http_server() {
717 let server = acp::McpServer::Http(
718 acp::McpServerHttp::new("http-server", "https://example.com/mcp")
719 .headers(vec![acp::HttpHeader::new("Authorization", "Bearer token123")]),
720 );
721
722 let configs = map_acp_mcp_servers(vec![server]);
723 assert_eq!(configs.len(), 1);
724
725 match &configs[0] {
726 McpServerConfig::Server(ServerConfig::Http { name, config }) => {
727 assert_eq!(name, "http-server");
728 assert_eq!(config.uri.as_ref(), "https://example.com/mcp");
729 assert_eq!(config.auth_header.as_deref(), Some("token123"));
730 }
731 other => panic!("Expected Http, got {other:?}"),
732 }
733 }
734
735 #[test]
736 fn test_http_auth_header_strips_bearer_case_insensitively() {
737 let cases = [
738 ("Bearer token123", "token123"),
739 ("bearer token123", "token123"),
740 ("BEARER token123", "token123"),
741 ("bEaReR token123", "token123"),
742 ("Token foo", "Token foo"),
745 ("token123", "token123"),
746 ];
747
748 for (input, expected) in cases {
749 let server = acp::McpServer::Http(
750 acp::McpServerHttp::new("http-server", "https://example.com/mcp")
751 .headers(vec![acp::HttpHeader::new("Authorization", input)]),
752 );
753 let configs = map_acp_mcp_servers(vec![server]);
754 match &configs[0] {
755 McpServerConfig::Server(ServerConfig::Http { config, .. }) => {
756 assert_eq!(config.auth_header.as_deref(), Some(expected), "input was {input:?}");
757 }
758 other => panic!("Expected Http, got {other:?}"),
759 }
760 }
761 }
762
763 #[test]
764 fn test_map_acp_sse_server() {
765 let server = acp::McpServer::Sse(acp::McpServerSse::new("sse-server", "https://example.com/sse"));
766
767 let configs = map_acp_mcp_servers(vec![server]);
768 assert_eq!(configs.len(), 1);
769
770 match &configs[0] {
771 McpServerConfig::Server(ServerConfig::Http { name, config }) => {
772 assert_eq!(name, "sse-server");
773 assert_eq!(config.uri.as_ref(), "https://example.com/sse");
774 assert_eq!(config.auth_header, None);
775 }
776 other => panic!("Expected Http, got {other:?}"),
777 }
778 }
779
780 #[test]
781 fn test_humanize_tool_name() {
782 assert_eq!(humanize_tool_name("coding__read_file"), "Read file");
783 assert_eq!(humanize_tool_name("read_file"), "Read file");
784 assert_eq!(humanize_tool_name("bash"), "Bash");
785 assert_eq!(humanize_tool_name("plugins__coding__read_file"), "Read file");
786 }
787
788 #[test]
789 fn test_result_with_result_meta_sets_meta() {
790 use mcp_utils::display_meta::ToolDisplayMeta;
791
792 let session_id = acp::SessionId::new("test-session");
793 let result = ToolCallResult {
794 id: "call_1".to_string(),
795 name: "coding__read_file".to_string(),
796 arguments: "{}".to_string(),
797 result: "file contents".to_string(),
798 };
799 let rm: ToolResultMeta = ToolDisplayMeta::new("Read file", "Cargo.toml, 156 lines").into();
800
801 let notification = map_tool_result_to_notification(session_id, &result, Some(&rm));
802 match notification.update {
803 acp::SessionUpdate::ToolCallUpdate(update) => {
804 assert_eq!(update.fields.title.as_deref(), Some("Read file"), "native title should be set");
805 let meta = update.meta.expect("meta should be present");
806 assert_eq!(
807 meta.get("display_value").and_then(|v| v.as_str()),
808 Some("Cargo.toml, 156 lines"),
809 "display_value should be a flat key in _meta"
810 );
811 assert!(meta.get("display").is_none(), "old nested display object should not be in _meta");
812 }
813 other => panic!("Expected ToolCallUpdate, got {other:?}"),
814 }
815 }
816
817 #[test]
818 fn test_result_without_result_meta() {
819 let session_id = acp::SessionId::new("test-session");
820 let result = ToolCallResult {
821 id: "call_1".to_string(),
822 name: "external__some_tool".to_string(),
823 arguments: "{}".to_string(),
824 result: "ok".to_string(),
825 };
826
827 let notification = map_tool_result_to_notification(session_id, &result, None);
828 match notification.update {
829 acp::SessionUpdate::ToolCallUpdate(update) => {
830 assert!(update.fields.title.is_none());
831 assert!(update.meta.is_none());
832 }
833 other => panic!("Expected ToolCallUpdate, got {other:?}"),
834 }
835 }
836
837 #[test]
838 fn test_plan_notification_extracted_from_result_meta() {
839 use mcp_utils::display_meta::{PlanMeta, PlanMetaEntry, PlanMetaStatus, ToolDisplayMeta};
840
841 let session_id = acp::SessionId::new("test-session");
842 let meta = ToolResultMeta::with_plan(
843 ToolDisplayMeta::new("Todo", "Research AI agents"),
844 PlanMeta {
845 entries: vec![
846 PlanMetaEntry { content: "Research AI agents".to_string(), status: PlanMetaStatus::InProgress },
847 PlanMetaEntry { content: "Write tests".to_string(), status: PlanMetaStatus::Pending },
848 ],
849 },
850 );
851
852 let notification = try_extract_plan_notification(session_id, Some(&meta)).expect("should produce plan");
853 match notification.update {
854 acp::SessionUpdate::Plan(plan) => {
855 assert_eq!(plan.entries.len(), 2);
856 assert_eq!(plan.entries[0].content, "Research AI agents");
857 assert_eq!(plan.entries[0].status, acp::PlanEntryStatus::InProgress);
858 assert_eq!(plan.entries[1].content, "Write tests");
859 assert_eq!(plan.entries[1].status, acp::PlanEntryStatus::Pending);
860 }
861 other => panic!("Expected Plan, got {other:?}"),
862 }
863 }
864
865 #[test]
866 fn test_plan_notification_none_when_no_plan_or_no_meta() {
867 use mcp_utils::display_meta::ToolDisplayMeta;
868
869 let sid = acp::SessionId::new("test-session");
870 let meta: ToolResultMeta = ToolDisplayMeta::new("Read file", "main.rs").into();
871 assert!(try_extract_plan_notification(sid.clone(), Some(&meta)).is_none());
872 assert!(try_extract_plan_notification(sid, None).is_none());
873 }
874
875 #[test]
876 fn test_tool_progress_with_display_meta_emits_meta_update() {
877 use mcp_utils::display_meta::ToolDisplayMeta;
878
879 let session_id = acp::SessionId::new("test-session");
880 let meta = ToolResultMeta::from(ToolDisplayMeta::new("Read file", "main.rs"));
881 let serialized = serde_json::to_string(&meta).unwrap();
882
883 let request = ToolCallRequest {
884 id: "call_789".to_string(),
885 name: "coding__read_file".to_string(),
886 arguments: "{}".to_string(),
887 };
888
889 let notification = map_tool_progress_to_notification(session_id, &request, 0.0, None, Some(&serialized))
890 .expect("should produce notification");
891
892 match notification.update {
893 acp::SessionUpdate::ToolCallUpdate(update) => {
894 assert_eq!(&*update.tool_call_id.0, "call_789");
895 assert_eq!(update.fields.title.as_deref(), Some("Read file"), "native title should be set");
896 let meta_map = update.meta.expect("meta should be present");
897 assert_eq!(
898 meta_map.get("display_value").and_then(|v| v.as_str()),
899 Some("main.rs"),
900 "display_value should be a flat key in _meta"
901 );
902 assert!(meta_map.get("display").is_none(), "old nested display object should not be in _meta");
903 assert_eq!(update.fields.status, Some(acp::ToolCallStatus::InProgress));
904 assert!(update.fields.content.is_none());
906 }
907 other => panic!("Expected ToolCallUpdate, got {other:?}"),
908 }
909 }
910
911 #[tokio::test(flavor = "current_thread")]
912 async fn replay_to_client_forwards_each_event_as_session_notification() {
913 use acp_utils::testing::test_connection;
914 use llm::ContentBlock as LlmContentBlock;
915 use tokio::task::LocalSet;
916
917 LocalSet::new()
918 .run_until(async {
919 let (cx, mut peer) = test_connection().await;
920 let session_id = acp::SessionId::new("test-session");
921 let events = vec![SessionEvent::User(UserEvent::Message {
922 content: vec![LlmContentBlock::text("hello"), LlmContentBlock::text("world")],
923 })];
924
925 replay_to_client(&events, &cx, &session_id).await;
926
927 for _ in 0..2 {
928 let notif = peer.next_session_notification().await;
929 assert_eq!(notif.session_id, session_id);
930 assert!(matches!(notif.update, SessionUpdate::UserMessageChunk(_)));
931 }
932 })
933 .await;
934 }
935}