1use acp_utils::notifications::{AuthMethodsUpdatedParams, McpRequest};
2use acp_utils::server::AcpServerError;
3use aether_auth::OAuthCredentialStorage;
4use agent_client_protocol::schema::{
5 self as acp, AgentCapabilities, AuthMethod, AuthenticateRequest, AuthenticateResponse, AvailableCommandsUpdate,
6 ConfigOptionUpdate, Implementation, InitializeRequest, InitializeResponse, ListSessionsRequest,
7 ListSessionsResponse, LoadSessionRequest, LoadSessionResponse, McpCapabilities, NewSessionRequest,
8 NewSessionResponse, PromptCapabilities, PromptResponse, ProtocolVersion, SessionId, SessionNotification,
9 SessionUpdate, SetSessionConfigOptionRequest, SetSessionConfigOptionResponse,
10};
11use agent_client_protocol::{Client, ConnectionTo};
12use llm::catalog::{LlmModel, get_local_models};
13use llm::types::IsoString;
14use llm::{ContentBlock, ReasoningEffort};
15use std::collections::HashSet;
16use std::path::Path;
17use std::sync::Arc;
18use tokio::spawn;
19use tokio::sync::oneshot;
20use tracing::{error, info, warn};
21
22use super::config_setting::ConfigSetting;
23use super::mappers::{map_acp_mcp_servers, replay_to_client};
24use super::model_config::{
25 ValidatedMode, build_config_options_from_modes, model_exists, pick_default_model, supports_prompt_audio,
26 validated_modes_from_specs,
27};
28use super::relay::{SessionCommand, spawn_relay};
29use super::session::Session;
30use super::session_registry::{ConfigSnapshot, SessionRegistry};
31use super::session_store::{SessionMeta, SessionStore};
32use crate::settings_args::SettingsSourceArgs;
33use acp_utils::content::format_embedded_resource;
34use aether_core::agent_spec::AgentSpec;
35use aether_core::context::ext::ContextExt;
36use aether_project::{AetherSettings, AgentCatalog};
37use llm::Context;
38
39#[derive(Clone, Debug, Default)]
41pub enum InitialSessionSelection {
42 #[default]
43 Default,
44 Agent(String),
45 Model {
46 model: String,
47 reasoning_effort: Option<ReasoningEffort>,
48 },
49}
50
51impl InitialSessionSelection {
52 pub fn agent(name: String) -> Self {
53 Self::Agent(name)
54 }
55
56 pub fn model(model: String, reasoning_effort: Option<ReasoningEffort>) -> Self {
57 Self::Model { model, reasoning_effort }
58 }
59}
60
61pub struct SessionManager {
63 registry: Arc<SessionRegistry>,
64 session_store: Arc<SessionStore>,
65 oauth_credential_store: Arc<dyn OAuthCredentialStorage>,
66 initial_selection: InitialSessionSelection,
67 settings_source: SettingsSourceArgs,
68}
69
70pub(crate) struct SessionManagerConfig {
71 pub(crate) registry: Arc<SessionRegistry>,
72 pub(crate) session_store: Arc<SessionStore>,
73 pub(crate) oauth_credential_store: Arc<dyn OAuthCredentialStorage>,
74 pub(crate) initial_selection: InitialSessionSelection,
75 pub(crate) settings_source: SettingsSourceArgs,
76}
77
78struct SessionModeCatalog {
79 catalog: AgentCatalog,
80 modes: Vec<ValidatedMode>,
81 available: Vec<LlmModel>,
82}
83
84struct ResolvedInitialSession {
85 spec: AgentSpec,
86 selected_mode: Option<String>,
87}
88
89#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
90struct PromptModalities {
91 image: bool,
92 audio: bool,
93}
94
95impl PromptModalities {
96 fn from_content(content: &[ContentBlock]) -> Self {
97 Self {
98 image: content.iter().any(ContentBlock::is_image),
99 audio: content.iter().any(|block| matches!(block, ContentBlock::Audio { .. })),
100 }
101 }
102
103 fn is_empty(self) -> bool {
104 !self.image && !self.audio
105 }
106}
107
108impl SessionManager {
109 pub(crate) fn new(deps: SessionManagerConfig) -> Self {
110 Self {
111 registry: deps.registry,
112 session_store: deps.session_store,
113 oauth_credential_store: deps.oauth_credential_store,
114 initial_selection: deps.initial_selection,
115 settings_source: deps.settings_source,
116 }
117 }
118
119 fn resolve_initial_session(
120 &self,
121 mode_catalog: &SessionModeCatalog,
122 default_model: &LlmModel,
123 ) -> Result<ResolvedInitialSession, acp::Error> {
124 match &self.initial_selection {
125 InitialSessionSelection::Default => resolve_default_initial_session(mode_catalog, default_model),
126 InitialSessionSelection::Agent(agent) => {
127 if !mode_catalog.modes.iter().any(|mode| mode.name == *agent) {
128 warn!("Unknown agent `{agent}` requested via --agent");
129 return Err(acp::Error::invalid_params());
130 }
131 resolve_agent_spec(&mode_catalog.catalog, agent)
132 .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(agent.clone()) })
133 }
134 InitialSessionSelection::Model { model, reasoning_effort } => {
135 let model = parse_available_model(model, &mode_catalog.available)?;
136 Ok(ResolvedInitialSession {
137 spec: AgentSpec::default_spec(&model, *reasoning_effort, Vec::new()),
138 selected_mode: None,
139 })
140 }
141 }
142 }
143
144 async fn load_mode_catalog(&self, cwd: &Path) -> Result<SessionModeCatalog, acp::Error> {
145 let config = if let Some(source) = self.settings_source.source(cwd) {
146 AetherSettings::load(cwd, [source])
147 } else {
148 AetherSettings::load_default(cwd)
149 }
150 .map_err(|e| {
151 error!("Failed to load agent catalog: {e}");
152 acp::Error::invalid_params()
153 })?;
154 let catalog = if config.agents.is_empty() {
155 AgentCatalog::empty(cwd.to_path_buf())
156 } else {
157 AgentCatalog::from_settings(cwd, config).map_err(|e| {
158 error!("Failed to load agent catalog: {e}");
159 acp::Error::invalid_params()
160 })?
161 };
162
163 let available = get_local_models().await;
164 let specs: Vec<_> = catalog.user_invocable().cloned().collect();
165 let modes = validated_modes_from_specs(&specs, &available);
166
167 Ok(SessionModeCatalog { catalog, modes, available })
168 }
169
170 #[allow(clippy::too_many_arguments, clippy::similar_names)]
171 async fn register_session(
172 &self,
173 session: Session,
174 session_id: &str,
175 acp_session_id: &SessionId,
176 model: &str,
177 selected_mode: Option<String>,
178 reasoning_effort: Option<ReasoningEffort>,
179 modes: Vec<ValidatedMode>,
180 cx: &ConnectionTo<Client>,
181 ) -> Vec<acp::SessionConfigOption> {
182 let relay = spawn_relay(
183 session,
184 cx.clone(),
185 acp_session_id.clone(),
186 self.session_store.clone(),
187 Arc::clone(&self.oauth_credential_store),
188 );
189
190 self.registry
191 .insert(
192 session_id.to_string(),
193 relay,
194 model.to_string(),
195 selected_mode.clone(),
196 reasoning_effort,
197 modes.clone(),
198 )
199 .await;
200
201 let available = get_local_models().await;
202 let all_models = get_all_models(&available);
203 build_config_options_from_modes(
204 &modes,
205 &available,
206 selected_mode.as_deref(),
207 model,
208 reasoning_effort,
209 &all_models,
210 self.oauth_credential_store.as_ref(),
211 )
212 }
213
214 fn send_available_commands_notification(
215 available_commands: Vec<acp::AvailableCommand>,
216 acp_session_id: SessionId,
217 session_id: &str,
218 cx: &ConnectionTo<Client>,
219 ) {
220 if available_commands.is_empty() {
221 return;
222 }
223 let command_count = available_commands.len();
224 let notification = SessionNotification::new(
225 acp_session_id,
226 SessionUpdate::AvailableCommandsUpdate(AvailableCommandsUpdate::new(available_commands)),
227 );
228 if let Err(e) = cx.send_notification(notification).map_err(|e| AcpServerError::protocol("session/update", e)) {
229 error!("Failed to send available commands notification: {:?}", e);
230 } else {
231 info!("Sent available commands update for session {} ({} commands)", session_id, command_count);
232 }
233 }
234
235 pub async fn shutdown_all_sessions(&self) {
238 self.registry.shutdown_all().await;
239 }
240}
241
242fn options_from_snapshot(
243 snapshot: &ConfigSnapshot,
244 available: &[LlmModel],
245 all_models: &[LlmModel],
246 credential_store: &dyn OAuthCredentialStorage,
247) -> Vec<acp::SessionConfigOption> {
248 build_config_options_from_modes(
249 &snapshot.modes,
250 available,
251 snapshot.selected_mode.as_deref(),
252 &snapshot.effective_model,
253 snapshot.reasoning_effort,
254 all_models,
255 credential_store,
256 )
257}
258
259fn get_all_models(discovered: &[LlmModel]) -> Vec<LlmModel> {
262 let mut all = LlmModel::all().to_vec();
263 for m in discovered {
264 if !all.contains(m) {
265 all.push(m.clone());
266 }
267 }
268 all
269}
270
271fn build_auth_methods(store: &dyn OAuthCredentialStorage) -> Vec<AuthMethod> {
272 let mut seen = HashSet::new();
273 LlmModel::all()
274 .iter()
275 .filter_map(LlmModel::oauth_provider_id)
276 .filter(|id| seen.insert(*id))
277 .map(|id| {
278 let display = LlmModel::all()
279 .iter()
280 .find(|m| m.oauth_provider_id() == Some(id))
281 .map_or(id, |m| m.provider_display_name());
282 let mut method = acp::AuthMethodAgent::new(id, display);
283 if store.has_credential(id) {
284 method = method.description("authenticated");
285 }
286 AuthMethod::Agent(method)
287 })
288 .collect()
289}
290
291fn map_acp_to_content_blocks(blocks: Vec<acp::ContentBlock>) -> Vec<ContentBlock> {
292 blocks
293 .into_iter()
294 .map(|block| match block {
295 acp::ContentBlock::Text(t) => ContentBlock::text(t.text),
296 acp::ContentBlock::Image(img) => ContentBlock::Image { data: img.data, mime_type: img.mime_type },
297 acp::ContentBlock::Audio(aud) => ContentBlock::Audio { data: aud.data, mime_type: aud.mime_type },
298 acp::ContentBlock::Resource(r) => ContentBlock::text(format_embedded_resource(&r)),
299 acp::ContentBlock::ResourceLink(l) => ContentBlock::text(format!("[Resource: {}]", l.uri)),
300 _ => ContentBlock::text("[Unknown content]"),
301 })
302 .collect()
303}
304
305fn resolve_agent_spec(catalog: &AgentCatalog, mode_name: &str) -> Result<AgentSpec, acp::Error> {
306 catalog.resolve(mode_name).map_err(|e| {
307 error!("Failed to resolve runtime inputs for mode '{}': {e}", mode_name);
308 acp::Error::invalid_params()
309 })
310}
311
312fn resolve_default_initial_session(
313 mode_catalog: &SessionModeCatalog,
314 default_model: &LlmModel,
315) -> Result<ResolvedInitialSession, acp::Error> {
316 if let Some(mode) = mode_catalog.modes.first() {
317 return resolve_agent_spec(&mode_catalog.catalog, &mode.name)
318 .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(mode.name.clone()) });
319 }
320
321 Ok(ResolvedInitialSession { spec: AgentSpec::default_spec(default_model, None, Vec::new()), selected_mode: None })
322}
323
324fn parse_available_model(model: &str, available: &[LlmModel]) -> Result<LlmModel, acp::Error> {
325 let parsed = model.parse().map_err(|e: String| {
326 warn!("Failed to parse --model `{model}`: {e}");
327 acp::Error::invalid_params()
328 })?;
329
330 if model_exists(available, model) {
331 Ok(parsed)
332 } else {
333 warn!("Requested model `{model}` is not available");
334 Err(acp::Error::invalid_params())
335 }
336}
337
338fn prompt_capabilities_for_models(models: &[LlmModel]) -> PromptCapabilities {
339 PromptCapabilities::new()
340 .embedded_context(true)
341 .image(models.iter().any(LlmModel::supports_image))
342 .audio(models.iter().any(supports_prompt_audio))
343}
344
345fn selected_models(model_value: &str) -> Result<Vec<LlmModel>, acp::Error> {
346 model_value
347 .split(',')
348 .map(str::trim)
349 .filter(|part| !part.is_empty())
350 .map(|part| part.parse::<LlmModel>().map_err(|_| acp::Error::invalid_params()))
351 .collect()
352}
353
354fn validate_prompt_support(model_value: &str, content: &[ContentBlock]) -> Result<(), acp::Error> {
355 let modalities = PromptModalities::from_content(content);
356 if modalities.is_empty() {
357 return Ok(());
358 }
359
360 let selected = selected_models(model_value)?;
361 if modalities.image && selected.iter().any(|model| !model.supports_image()) {
362 return Err(acp::Error::invalid_params());
363 }
364 if modalities.audio && selected.iter().any(|model| !supports_prompt_audio(model)) {
365 return Err(acp::Error::invalid_params());
366 }
367
368 Ok(())
369}
370
371#[cfg(test)]
372#[allow(clippy::items_after_test_module)]
373mod tests {
374 use super::*;
375 use agent_client_protocol::schema::{InitializeRequest, ProtocolVersion};
376 use llm::catalog::BedrockModel;
377
378 const SONNET: &str = "anthropic:claude-sonnet-4-5";
379 const DEEPSEEK: &str = "deepseek:deepseek-chat";
380 const BEDROCK_PROFILE_ARN: &str =
381 "bedrock:arn:aws:bedrock:us-west-2:000000000000:application-inference-profile/000000000000";
382
383 fn mock_oauth_store() -> Arc<dyn OAuthCredentialStorage> {
384 Arc::new(aether_auth::FakeOAuthCredentialStore::new())
385 }
386
387 #[tokio::test]
388 async fn initialize_always_advertises_load_session_support() {
389 let session_store =
390 SessionStore::new().map_or_else(|e| panic!("Failed to initialize session store: {e}"), Arc::new);
391 let manager = SessionManager::new(SessionManagerConfig {
392 registry: Arc::new(SessionRegistry::new()),
393 session_store,
394 oauth_credential_store: mock_oauth_store(),
395 initial_selection: InitialSessionSelection::default(),
396 settings_source: SettingsSourceArgs::default(),
397 });
398 let response =
399 manager.initialize(InitializeRequest::new(ProtocolVersion::LATEST)).await.expect("initialize succeeds");
400 let json = serde_json::to_string(&response).expect("response serializes");
401 assert!(json.contains("\"loadSession\":true"));
402 }
403
404 #[test]
405 fn prompt_capabilities_reflect_available_modalities() {
406 let image_only = prompt_capabilities_for_models(&["anthropic:claude-sonnet-4-5".parse().unwrap()]);
407 assert!(image_only.image);
408 assert!(!image_only.audio);
409
410 let audio_capable =
411 prompt_capabilities_for_models(&["gemini:gemini-live-2.5-flash-preview-native-audio".parse().unwrap()]);
412 assert!(!audio_capable.image);
413 assert!(audio_capable.audio);
414
415 let text_only = prompt_capabilities_for_models(&[DEEPSEEK.parse().unwrap()]);
416 assert!(!text_only.image);
417 assert!(!text_only.audio);
418 }
419
420 #[test]
421 fn validate_prompt_support_requires_all_selected_models_to_support_media() {
422 let image_content = vec![ContentBlock::Image { data: "aW1n".to_string(), mime_type: "image/png".to_string() }];
423 let audio_content =
424 vec![ContentBlock::Audio { data: "YXVkaW8=".to_string(), mime_type: "audio/wav".to_string() }];
425
426 assert!(validate_prompt_support(SONNET, &image_content).is_ok());
427 assert!(validate_prompt_support(DEEPSEEK, &image_content).is_err());
428 assert!(validate_prompt_support("gemini:gemini-live-2.5-flash-preview-native-audio", &audio_content,).is_ok());
429 assert!(validate_prompt_support(SONNET, &audio_content).is_err());
430 assert!(
431 validate_prompt_support("anthropic:claude-sonnet-4-5,deepseek:deepseek-chat", &image_content,).is_err()
432 );
433 assert!(
434 validate_prompt_support(
435 "gemini:gemini-live-2.5-flash-preview-native-audio,deepseek:deepseek-chat",
436 &audio_content,
437 )
438 .is_err()
439 );
440 }
441
442 #[test]
443 fn parse_available_model_accepts_bedrock_inference_profile_arn() {
444 let available: Vec<LlmModel> = Vec::new();
445 let parsed = parse_available_model(BEDROCK_PROFILE_ARN, &available)
446 .expect("Bedrock inference-profile ARN should be accepted");
447 assert!(matches!(parsed, LlmModel::Bedrock(BedrockModel::Profile(_))));
448 }
449
450 #[test]
451 fn parse_available_model_rejects_unknown_catalog_model() {
452 let available: Vec<LlmModel> = vec![DEEPSEEK.parse().unwrap()];
453 assert!(parse_available_model(SONNET, &available).is_err());
454 }
455
456 #[test]
457 fn parse_available_model_accepts_catalog_model_when_present() {
458 let sonnet: LlmModel = SONNET.parse().unwrap();
459 let available = vec![sonnet.clone()];
460 assert_eq!(parse_available_model(SONNET, &available).unwrap(), sonnet);
461 }
462}
463
464impl SessionManager {
465 pub async fn initialize(&self, args: InitializeRequest) -> Result<InitializeResponse, acp::Error> {
466 info!("Received initialize request: {:?}", args);
467 let auth_methods = build_auth_methods(self.oauth_credential_store.as_ref());
468 let available = get_local_models().await;
469 Ok(InitializeResponse::new(ProtocolVersion::V1)
470 .agent_info(Implementation::new("Aether", "0.1.0"))
471 .agent_capabilities(
472 AgentCapabilities::new()
473 .load_session(true)
474 .mcp_capabilities(McpCapabilities::new().http(true).sse(true))
475 .session_capabilities(acp::SessionCapabilities::new().list(acp::SessionListCapabilities::new()))
476 .prompt_capabilities(prompt_capabilities_for_models(&available)),
477 )
478 .auth_methods(auth_methods))
479 }
480
481 pub async fn authenticate(
482 &self,
483 args: AuthenticateRequest,
484 cx: &ConnectionTo<Client>,
485 ) -> Result<AuthenticateResponse, acp::Error> {
486 info!("Received authenticate request: {:?}", args);
487 let method_id = args.method_id.0.as_ref();
488 match method_id {
489 "codex" => {
490 llm::perform_codex_oauth_flow(self.oauth_credential_store.as_ref()).await.map_err(|e| {
491 error!("OAuth flow failed for {method_id}: {e}");
492 acp::Error::internal_error()
493 })?;
494 }
495 _ => return Err(acp::Error::invalid_params()),
496 }
497 let auth_methods = build_auth_methods(self.oauth_credential_store.as_ref());
498 if let Err(e) = cx
499 .send_notification(AuthMethodsUpdatedParams { auth_methods })
500 .map_err(|e| AcpServerError::protocol("_aether/auth_methods_updated", e))
501 {
502 error!("Failed to send auth methods updated notification: {:?}", e);
503 }
504
505 let available = get_local_models().await;
506 let all_models = get_all_models(&available);
507 let snapshots = self.registry.snapshot_all_configs().await;
508
509 for (id, snap) in snapshots {
510 let options = options_from_snapshot(&snap, &available, &all_models, self.oauth_credential_store.as_ref());
511 let notification = SessionNotification::new(
512 SessionId::new(id),
513 SessionUpdate::ConfigOptionUpdate(ConfigOptionUpdate::new(options)),
514 );
515 let _ = cx.send_notification(notification);
516 }
517
518 Ok(AuthenticateResponse::default())
519 }
520
521 pub async fn new_session(
522 &self,
523 mut args: NewSessionRequest,
524 cx: &ConnectionTo<Client>,
525 ) -> Result<NewSessionResponse, acp::Error> {
526 if std::env::var("AETHER_INSIDE_SANDBOX").is_ok() {
529 let container_cwd = std::env::current_dir().unwrap_or_else(|_| "/workspace".into());
530 info!("Sandbox: remapping cwd {:?} -> {:?}", args.cwd, container_cwd);
531 args.cwd = container_cwd;
532 }
533
534 info!("Creating new session with cwd: {:?}", args.cwd);
535 let session_id = uuid::Uuid::new_v4().to_string();
536 let acp_session_id = acp::SessionId::new(session_id.clone());
537
538 let mode_catalog = self.load_mode_catalog(&args.cwd).await?;
539 let default_model = pick_default_model(&mode_catalog.available).ok_or_else(|| {
540 error!("No models available — set an API key env var (e.g. ANTHROPIC_API_KEY)");
541 acp::Error::internal_error()
542 })?;
543
544 let ResolvedInitialSession { spec, selected_mode } =
545 self.resolve_initial_session(&mode_catalog, default_model)?;
546 let model_str = spec.model.clone();
547 let reasoning_effort = spec.reasoning_effort;
548
549 let session = Session::new(
550 spec,
551 args.cwd.clone(),
552 map_acp_mcp_servers(args.mcp_servers),
553 None,
554 Some(session_id.clone()),
555 Arc::clone(&self.oauth_credential_store),
556 )
557 .await
558 .map_err(|e| {
559 error!("Failed to create session: {}", e);
560 acp::Error::internal_error()
561 })?;
562
563 let available_commands = session.list_available_commands().await.map_err(|e| {
564 error!("Failed to list available commands: {}", e);
565 acp::Error::internal_error()
566 })?;
567
568 let meta = SessionMeta {
569 session_id: session_id.clone(),
570 cwd: args.cwd.clone(),
571 model: model_str.clone(),
572 selected_mode: selected_mode.clone(),
573 created_at: IsoString::now().0,
574 };
575 if let Err(e) = self.session_store.append_meta(&session_id, &meta) {
576 error!("Failed to write session meta: {e}");
577 }
578
579 let config_options = self
580 .register_session(
581 session,
582 &session_id,
583 &acp_session_id,
584 &model_str,
585 selected_mode,
586 reasoning_effort,
587 mode_catalog.modes,
588 cx,
589 )
590 .await;
591
592 info!("Session {} created successfully", session_id);
593
594 let response = NewSessionResponse::new(acp_session_id.clone()).config_options(config_options);
595
596 Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
597
598 Ok(response)
599 }
600
601 pub fn list_sessions(&self, args: &ListSessionsRequest) -> Result<ListSessionsResponse, acp::Error> {
602 info!("Listing sessions, cwd filter: {:?}", args.cwd);
603 let mut summaries = self.session_store.list();
604
605 if let Some(cwd) = args.cwd.as_ref() {
606 summaries.retain(|s| s.meta.cwd == *cwd);
607 }
608
609 let sessions: Vec<acp::SessionInfo> = summaries
610 .into_iter()
611 .map(|s| acp::SessionInfo::new(s.meta.session_id, s.meta.cwd).updated_at(s.meta.created_at).title(s.title))
612 .collect();
613
614 info!("Found {} sessions", sessions.len());
615 Ok(ListSessionsResponse::new(sessions))
616 }
617
618 pub async fn load_session(
619 &self,
620 args: LoadSessionRequest,
621 cx: &ConnectionTo<Client>,
622 ) -> Result<LoadSessionResponse, acp::Error> {
623 let session_id = args.session_id.0.to_string();
624 info!("Loading session: {session_id}");
625
626 let (meta, events) = self.session_store.load(&session_id).ok_or_else(|| {
627 error!("Session not found: {session_id}");
628 acp::Error::invalid_params()
629 })?;
630
631 let context = Context::from_events(&events);
632 let mode_catalog = self.load_mode_catalog(&args.cwd).await?;
633
634 let spec = if let Some(mode_name) = meta.selected_mode.as_deref() {
635 resolve_agent_spec(&mode_catalog.catalog, mode_name)?
636 } else {
637 let parsed_model: LlmModel = meta.model.parse().map_err(|e: String| {
638 error!("Failed to parse restored model '{}': {e}", meta.model);
639 acp::Error::invalid_params()
640 })?;
641 AgentSpec::default_spec(&parsed_model, None, Vec::new())
642 };
643
644 let model = spec.model.clone();
645
646 let restored_messages: Vec<_> = context.messages().iter().filter(|m| !m.is_system()).cloned().collect();
647
648 let session = Session::new(
649 spec,
650 args.cwd.clone(),
651 map_acp_mcp_servers(args.mcp_servers),
652 Some(restored_messages),
653 Some(session_id.clone()),
654 Arc::clone(&self.oauth_credential_store),
655 )
656 .await
657 .map_err(|e| {
658 error!("Failed to create session for load: {e}");
659 acp::Error::internal_error()
660 })?;
661
662 let available_commands = session.list_available_commands().await.map_err(|e| {
663 error!("Failed to list available commands: {e}");
664 acp::Error::internal_error()
665 })?;
666
667 let acp_session_id = acp::SessionId::new(session_id.clone());
668
669 let config_options = self
670 .register_session(
671 session,
672 &session_id,
673 &acp_session_id,
674 &model,
675 meta.selected_mode,
676 None,
677 mode_catalog.modes,
678 cx,
679 )
680 .await;
681
682 info!("Session {session_id} loaded successfully");
683
684 let response = LoadSessionResponse::new().config_options(config_options);
685
686 let cx_clone = cx.clone();
687 let replay_session_id = acp_session_id.clone();
688 spawn(async move {
689 replay_to_client(&events, &cx_clone, &replay_session_id).await;
690 });
691
692 Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
693
694 Ok(response)
695 }
696
697 pub async fn prompt(&self, args: acp::PromptRequest) -> Result<acp::PromptResponse, acp::Error> {
698 info!("Received prompt for session: {:?}", args.session_id);
699 let session_id_str = args.session_id.0.to_string();
700 let content = map_acp_to_content_blocks(args.prompt);
701
702 let model = self.registry.effective_model(&session_id_str).await.ok_or_else(|| {
703 error!("Session not found: {}", session_id_str);
704 acp::Error::invalid_params()
705 })?;
706 validate_prompt_support(&model, &content)?;
707
708 let dispatch = self.registry.begin_prompt(&session_id_str).await.ok_or_else(|| {
709 error!("Session not found: {}", session_id_str);
710 acp::Error::invalid_params()
711 })?;
712
713 let (result_tx, result_rx) = oneshot::channel();
714 dispatch
715 .relay_tx
716 .send(SessionCommand::Prompt {
717 content,
718 switch_model: dispatch.switch_model,
719 reasoning_effort: dispatch.reasoning_effort,
720 result_tx,
721 })
722 .await
723 .map_err(|_| {
724 error!("Relay channel closed for session {}", session_id_str);
725 acp::Error::internal_error()
726 })?;
727
728 let stop_reason = result_rx
729 .await
730 .map_err(|_| {
731 error!("Relay dropped result channel for session {}", session_id_str);
732 acp::Error::internal_error()
733 })?
734 .map_err(|e| {
735 error!("Relay error for session {}: {}", session_id_str, e);
736 acp::Error::internal_error()
737 })?;
738
739 info!("Prompt completed with stop reason: {:?}", stop_reason);
740 Ok(PromptResponse::new(stop_reason))
741 }
742
743 pub async fn cancel(&self, args: acp::CancelNotification) -> Result<(), acp::Error> {
744 info!("Received cancel for session: {:?}", args.session_id);
745 let session_id_str = args.session_id.0.to_string();
746 let relay = self.registry.relay(&session_id_str).await.ok_or_else(|| {
747 error!("Session not found for cancel: {}", session_id_str);
748 acp::Error::invalid_params()
749 })?;
750
751 relay.cmd.send(SessionCommand::Cancel).await.map_err(|_| {
752 error!("Relay channel closed for cancel: {}", session_id_str);
753 acp::Error::internal_error()
754 })?;
755
756 Ok(())
757 }
758
759 pub async fn set_session_config_option(
760 &self,
761 args: SetSessionConfigOptionRequest,
762 ) -> Result<SetSessionConfigOptionResponse, acp::Error> {
763 let session_id_str = args.session_id.0.to_string();
764 let config_id = args.config_id.0.to_string();
765 let value = args.value.0.to_string();
766
767 info!("set_session_config_option: session={}, config={}, value={}", session_id_str, config_id, value);
768
769 let setting = ConfigSetting::parse(&config_id, &value).map_err(|e| {
770 error!("{e}");
771 acp::Error::invalid_params()
772 })?;
773
774 let available = get_local_models().await;
775 let all_models = get_all_models(&available);
776
777 let snapshot =
778 self.registry.apply_config_change(&session_id_str, &setting, &available).await.ok_or_else(|| {
779 error!("Session not found: {}", session_id_str);
780 acp::Error::invalid_params()
781 })??;
782
783 let options = options_from_snapshot(&snapshot, &available, &all_models, self.oauth_credential_store.as_ref());
784 Ok(SetSessionConfigOptionResponse::new(options))
785 }
786
787 pub async fn on_mcp_request(&self, request: McpRequest) -> Result<(), acp::Error> {
788 info!("Received MCP ext request: {:?}", request);
789 match request {
790 McpRequest::Authenticate { session_id, server_name } => {
791 let relay = self.registry.relay(&session_id).await.ok_or_else(|| {
792 error!("Session not found for authenticate_mcp_server: {}", session_id);
793 acp::Error::invalid_params()
794 })?;
795
796 relay.mcp_request.send(McpRequest::Authenticate { session_id, server_name }).await.map_err(|_| {
797 error!("MCP request channel closed for session");
798 acp::Error::internal_error()
799 })?;
800 }
801 }
802
803 Ok(())
804 }
805}