1use acp_utils::notifications::{AuthMethodsUpdatedParams, McpRequest};
2use acp_utils::server::AcpServerError;
3use aether_auth::OAuthCredentialStorage;
4use agent_client_protocol::schema::{
5 self as acp, AgentCapabilities, AuthMethod, AuthenticateRequest, AuthenticateResponse, AvailableCommandsUpdate,
6 ConfigOptionUpdate, Implementation, InitializeRequest, InitializeResponse, ListSessionsRequest,
7 ListSessionsResponse, LoadSessionRequest, LoadSessionResponse, McpCapabilities, NewSessionRequest,
8 NewSessionResponse, PromptCapabilities, PromptResponse, ProtocolVersion, SessionId, SessionNotification,
9 SessionUpdate, SetSessionConfigOptionRequest, SetSessionConfigOptionResponse,
10};
11use agent_client_protocol::{Client, ConnectionTo};
12use llm::catalog::{LlmModel, get_local_models};
13use llm::types::IsoString;
14use llm::{ContentBlock, ProviderConnectionOverrides, ReasoningEffort};
15use std::collections::HashSet;
16use std::path::Path;
17use std::sync::Arc;
18use tokio::spawn;
19use tokio::sync::oneshot;
20use tracing::{error, info, warn};
21
22use super::config_setting::ConfigSetting;
23use super::mappers::{map_acp_mcp_servers, replay_to_client};
24use super::model_config::{
25 ValidatedMode, build_config_options_from_modes, model_exists, pick_default_model, supports_prompt_audio,
26 validated_modes_from_specs,
27};
28use super::relay::{SessionCommand, spawn_relay};
29use super::session::Session;
30use super::session_registry::{ConfigSnapshot, SessionRegistry};
31use super::session_store::{SessionMeta, SessionStore};
32use crate::settings_args::SettingsSourceArgs;
33use acp_utils::content::format_embedded_resource;
34use aether_core::agent_spec::AgentSpec;
35use aether_core::context::ext::ContextExt;
36use aether_project::{AetherSettings, AgentCatalog};
37use llm::Context;
38
39#[derive(Clone, Debug, Default)]
41pub enum InitialSessionSelection {
42 #[default]
43 Default,
44 Agent(String),
45 Model {
46 model: String,
47 reasoning_effort: Option<ReasoningEffort>,
48 },
49}
50
51impl InitialSessionSelection {
52 pub fn agent(name: String) -> Self {
53 Self::Agent(name)
54 }
55
56 pub fn model(model: String, reasoning_effort: Option<ReasoningEffort>) -> Self {
57 Self::Model { model, reasoning_effort }
58 }
59}
60
61pub struct SessionManager {
63 registry: Arc<SessionRegistry>,
64 session_store: Arc<SessionStore>,
65 oauth_credential_store: Arc<dyn OAuthCredentialStorage>,
66 initial_selection: InitialSessionSelection,
67 settings_source: SettingsSourceArgs,
68 provider_connections: ProviderConnectionOverrides,
69}
70
71pub(crate) struct SessionManagerConfig {
72 pub(crate) registry: Arc<SessionRegistry>,
73 pub(crate) session_store: Arc<SessionStore>,
74 pub(crate) oauth_credential_store: Arc<dyn OAuthCredentialStorage>,
75 pub(crate) initial_selection: InitialSessionSelection,
76 pub(crate) settings_source: SettingsSourceArgs,
77 pub(crate) provider_connections: ProviderConnectionOverrides,
78}
79
80struct SessionModeCatalog {
81 catalog: AgentCatalog,
82 modes: Vec<ValidatedMode>,
83 available: Vec<LlmModel>,
84}
85
86struct ResolvedInitialSession {
87 spec: AgentSpec,
88 selected_mode: Option<String>,
89}
90
91#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
92struct PromptModalities {
93 image: bool,
94 audio: bool,
95}
96
97impl PromptModalities {
98 fn from_content(content: &[ContentBlock]) -> Self {
99 Self {
100 image: content.iter().any(ContentBlock::is_image),
101 audio: content.iter().any(|block| matches!(block, ContentBlock::Audio { .. })),
102 }
103 }
104
105 fn is_empty(self) -> bool {
106 !self.image && !self.audio
107 }
108}
109
110impl SessionManager {
111 pub(crate) fn new(deps: SessionManagerConfig) -> Self {
112 Self {
113 registry: deps.registry,
114 session_store: deps.session_store,
115 oauth_credential_store: deps.oauth_credential_store,
116 initial_selection: deps.initial_selection,
117 settings_source: deps.settings_source,
118 provider_connections: deps.provider_connections,
119 }
120 }
121
122 fn apply_provider_connection_overrides(&self, mut spec: AgentSpec) -> AgentSpec {
123 spec.provider_connections.merge(self.provider_connections.clone());
124 spec
125 }
126
127 fn resolve_initial_session(
128 &self,
129 mode_catalog: &SessionModeCatalog,
130 default_model: &LlmModel,
131 ) -> Result<ResolvedInitialSession, acp::Error> {
132 let mut resolved = match &self.initial_selection {
133 InitialSessionSelection::Default => resolve_default_initial_session(mode_catalog, default_model),
134 InitialSessionSelection::Agent(agent) => {
135 if !mode_catalog.modes.iter().any(|mode| mode.name == *agent) {
136 warn!("Unknown agent `{agent}` requested via --agent");
137 return Err(acp::Error::invalid_params());
138 }
139 resolve_agent_spec(&mode_catalog.catalog, agent)
140 .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(agent.clone()) })
141 }
142 InitialSessionSelection::Model { model, reasoning_effort } => {
143 let model = parse_available_model(model, &mode_catalog.available)?;
144 Ok(ResolvedInitialSession {
145 spec: AgentSpec::default_spec(&model, *reasoning_effort, Vec::new()),
146 selected_mode: None,
147 })
148 }
149 }?;
150 resolved.spec = self.apply_provider_connection_overrides(resolved.spec);
151 Ok(resolved)
152 }
153
154 async fn load_mode_catalog(&self, cwd: &Path) -> Result<SessionModeCatalog, acp::Error> {
155 let config = if let Some(source) = self.settings_source.source(cwd) {
156 AetherSettings::load(cwd, [source])
157 } else {
158 AetherSettings::load_default(cwd)
159 }
160 .map_err(|e| {
161 error!("Failed to load agent catalog: {e}");
162 acp::Error::invalid_params()
163 })?;
164 let catalog = if config.agents.is_empty() {
165 AgentCatalog::empty(cwd.to_path_buf())
166 } else {
167 AgentCatalog::from_settings(cwd, config).map_err(|e| {
168 error!("Failed to load agent catalog: {e}");
169 acp::Error::invalid_params()
170 })?
171 };
172
173 let available = get_local_models().await;
174 let specs: Vec<_> = catalog.user_invocable().cloned().collect();
175 let modes = validated_modes_from_specs(&specs, &available);
176
177 Ok(SessionModeCatalog { catalog, modes, available })
178 }
179
180 #[allow(clippy::too_many_arguments, clippy::similar_names)]
181 async fn register_session(
182 &self,
183 session: Session,
184 session_id: &str,
185 acp_session_id: &SessionId,
186 model: &str,
187 selected_mode: Option<String>,
188 reasoning_effort: Option<ReasoningEffort>,
189 modes: Vec<ValidatedMode>,
190 cx: &ConnectionTo<Client>,
191 ) -> Vec<acp::SessionConfigOption> {
192 let relay = spawn_relay(
193 session,
194 cx.clone(),
195 acp_session_id.clone(),
196 self.session_store.clone(),
197 Arc::clone(&self.oauth_credential_store),
198 );
199
200 self.registry
201 .insert(
202 session_id.to_string(),
203 relay,
204 model.to_string(),
205 selected_mode.clone(),
206 reasoning_effort,
207 modes.clone(),
208 )
209 .await;
210
211 let available = get_local_models().await;
212 let all_models = get_all_models(&available);
213 build_config_options_from_modes(
214 &modes,
215 &available,
216 selected_mode.as_deref(),
217 model,
218 reasoning_effort,
219 &all_models,
220 self.oauth_credential_store.as_ref(),
221 )
222 }
223
224 fn send_available_commands_notification(
225 available_commands: Vec<acp::AvailableCommand>,
226 acp_session_id: SessionId,
227 session_id: &str,
228 cx: &ConnectionTo<Client>,
229 ) {
230 if available_commands.is_empty() {
231 return;
232 }
233 let command_count = available_commands.len();
234 let notification = SessionNotification::new(
235 acp_session_id,
236 SessionUpdate::AvailableCommandsUpdate(AvailableCommandsUpdate::new(available_commands)),
237 );
238 if let Err(e) = cx.send_notification(notification).map_err(|e| AcpServerError::protocol("session/update", e)) {
239 error!("Failed to send available commands notification: {:?}", e);
240 } else {
241 info!("Sent available commands update for session {} ({} commands)", session_id, command_count);
242 }
243 }
244
245 pub async fn shutdown_all_sessions(&self) {
248 self.registry.shutdown_all().await;
249 }
250}
251
252fn options_from_snapshot(
253 snapshot: &ConfigSnapshot,
254 available: &[LlmModel],
255 all_models: &[LlmModel],
256 credential_store: &dyn OAuthCredentialStorage,
257) -> Vec<acp::SessionConfigOption> {
258 build_config_options_from_modes(
259 &snapshot.modes,
260 available,
261 snapshot.selected_mode.as_deref(),
262 &snapshot.effective_model,
263 snapshot.reasoning_effort,
264 all_models,
265 credential_store,
266 )
267}
268
269fn get_all_models(discovered: &[LlmModel]) -> Vec<LlmModel> {
272 let mut all = LlmModel::all().to_vec();
273 for m in discovered {
274 if !all.contains(m) {
275 all.push(m.clone());
276 }
277 }
278 all
279}
280
281fn build_auth_methods(store: &dyn OAuthCredentialStorage) -> Vec<AuthMethod> {
282 let mut seen = HashSet::new();
283 LlmModel::all()
284 .iter()
285 .filter_map(LlmModel::oauth_provider_id)
286 .filter(|id| seen.insert(*id))
287 .map(|id| {
288 let display = LlmModel::all()
289 .iter()
290 .find(|m| m.oauth_provider_id() == Some(id))
291 .map_or(id, |m| m.provider_display_name());
292 let mut method = acp::AuthMethodAgent::new(id, display);
293 if store.has_credential(id) {
294 method = method.description("authenticated");
295 }
296 AuthMethod::Agent(method)
297 })
298 .collect()
299}
300
301fn map_acp_to_content_blocks(blocks: Vec<acp::ContentBlock>) -> Vec<ContentBlock> {
302 blocks
303 .into_iter()
304 .map(|block| match block {
305 acp::ContentBlock::Text(t) => ContentBlock::text(t.text),
306 acp::ContentBlock::Image(img) => ContentBlock::Image { data: img.data, mime_type: img.mime_type },
307 acp::ContentBlock::Audio(aud) => ContentBlock::Audio { data: aud.data, mime_type: aud.mime_type },
308 acp::ContentBlock::Resource(r) => ContentBlock::text(format_embedded_resource(&r)),
309 acp::ContentBlock::ResourceLink(l) => ContentBlock::text(format!("[Resource: {}]", l.uri)),
310 _ => ContentBlock::text("[Unknown content]"),
311 })
312 .collect()
313}
314
315fn resolve_agent_spec(catalog: &AgentCatalog, mode_name: &str) -> Result<AgentSpec, acp::Error> {
316 catalog.resolve(mode_name).map_err(|e| {
317 error!("Failed to resolve runtime inputs for mode '{}': {e}", mode_name);
318 acp::Error::invalid_params()
319 })
320}
321
322fn resolve_default_initial_session(
323 mode_catalog: &SessionModeCatalog,
324 default_model: &LlmModel,
325) -> Result<ResolvedInitialSession, acp::Error> {
326 if let Some(mode) = mode_catalog.modes.first() {
327 return resolve_agent_spec(&mode_catalog.catalog, &mode.name)
328 .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(mode.name.clone()) });
329 }
330
331 Ok(ResolvedInitialSession { spec: AgentSpec::default_spec(default_model, None, Vec::new()), selected_mode: None })
332}
333
334fn parse_available_model(model: &str, available: &[LlmModel]) -> Result<LlmModel, acp::Error> {
335 let parsed = model.parse().map_err(|e: String| {
336 warn!("Failed to parse --model `{model}`: {e}");
337 acp::Error::invalid_params()
338 })?;
339
340 if model_exists(available, model) {
341 Ok(parsed)
342 } else {
343 warn!("Requested model `{model}` is not available");
344 Err(acp::Error::invalid_params())
345 }
346}
347
348fn prompt_capabilities_for_models(models: &[LlmModel]) -> PromptCapabilities {
349 PromptCapabilities::new()
350 .embedded_context(true)
351 .image(models.iter().any(LlmModel::supports_image))
352 .audio(models.iter().any(supports_prompt_audio))
353}
354
355fn selected_models(model_value: &str) -> Result<Vec<LlmModel>, acp::Error> {
356 model_value
357 .split(',')
358 .map(str::trim)
359 .filter(|part| !part.is_empty())
360 .map(|part| part.parse::<LlmModel>().map_err(|_| acp::Error::invalid_params()))
361 .collect()
362}
363
364fn validate_prompt_support(model_value: &str, content: &[ContentBlock]) -> Result<(), acp::Error> {
365 let modalities = PromptModalities::from_content(content);
366 if modalities.is_empty() {
367 return Ok(());
368 }
369
370 let selected = selected_models(model_value)?;
371 if modalities.image && selected.iter().any(|model| !model.supports_image()) {
372 return Err(acp::Error::invalid_params());
373 }
374 if modalities.audio && selected.iter().any(|model| !supports_prompt_audio(model)) {
375 return Err(acp::Error::invalid_params());
376 }
377
378 Ok(())
379}
380
381#[cfg(test)]
382#[allow(clippy::items_after_test_module)]
383mod tests {
384 use super::*;
385 use agent_client_protocol::schema::{InitializeRequest, ProtocolVersion};
386 use llm::catalog::BedrockModel;
387
388 const SONNET: &str = "anthropic:claude-sonnet-4-5";
389 const DEEPSEEK: &str = "deepseek:deepseek-chat";
390 const BEDROCK_PROFILE_ARN: &str =
391 "bedrock:arn:aws:bedrock:us-west-2:000000000000:application-inference-profile/000000000000";
392
393 fn mock_oauth_store() -> Arc<dyn OAuthCredentialStorage> {
394 Arc::new(aether_auth::FakeOAuthCredentialStore::new())
395 }
396
397 #[tokio::test]
398 async fn initialize_always_advertises_load_session_support() {
399 let session_store =
400 SessionStore::new().map_or_else(|e| panic!("Failed to initialize session store: {e}"), Arc::new);
401 let manager = SessionManager::new(SessionManagerConfig {
402 registry: Arc::new(SessionRegistry::new()),
403 session_store,
404 oauth_credential_store: mock_oauth_store(),
405 initial_selection: InitialSessionSelection::default(),
406 settings_source: SettingsSourceArgs::default(),
407 provider_connections: ProviderConnectionOverrides::default(),
408 });
409 let response =
410 manager.initialize(InitializeRequest::new(ProtocolVersion::LATEST)).await.expect("initialize succeeds");
411 let json = serde_json::to_string(&response).expect("response serializes");
412 assert!(json.contains("\"loadSession\":true"));
413 }
414
415 #[test]
416 fn prompt_capabilities_reflect_available_modalities() {
417 let image_only = prompt_capabilities_for_models(&["anthropic:claude-sonnet-4-5".parse().unwrap()]);
418 assert!(image_only.image);
419 assert!(!image_only.audio);
420
421 let audio_capable =
422 prompt_capabilities_for_models(&["gemini:gemini-live-2.5-flash-preview-native-audio".parse().unwrap()]);
423 assert!(!audio_capable.image);
424 assert!(audio_capable.audio);
425
426 let text_only = prompt_capabilities_for_models(&[DEEPSEEK.parse().unwrap()]);
427 assert!(!text_only.image);
428 assert!(!text_only.audio);
429 }
430
431 #[test]
432 fn validate_prompt_support_requires_all_selected_models_to_support_media() {
433 let image_content = vec![ContentBlock::Image { data: "aW1n".to_string(), mime_type: "image/png".to_string() }];
434 let audio_content =
435 vec![ContentBlock::Audio { data: "YXVkaW8=".to_string(), mime_type: "audio/wav".to_string() }];
436
437 assert!(validate_prompt_support(SONNET, &image_content).is_ok());
438 assert!(validate_prompt_support(DEEPSEEK, &image_content).is_err());
439 assert!(validate_prompt_support("gemini:gemini-live-2.5-flash-preview-native-audio", &audio_content,).is_ok());
440 assert!(validate_prompt_support(SONNET, &audio_content).is_err());
441 assert!(
442 validate_prompt_support("anthropic:claude-sonnet-4-5,deepseek:deepseek-chat", &image_content,).is_err()
443 );
444 assert!(
445 validate_prompt_support(
446 "gemini:gemini-live-2.5-flash-preview-native-audio,deepseek:deepseek-chat",
447 &audio_content,
448 )
449 .is_err()
450 );
451 }
452
453 #[test]
454 fn parse_available_model_accepts_bedrock_inference_profile_arn() {
455 let available: Vec<LlmModel> = Vec::new();
456 let parsed = parse_available_model(BEDROCK_PROFILE_ARN, &available)
457 .expect("Bedrock inference-profile ARN should be accepted");
458 assert!(matches!(parsed, LlmModel::Bedrock(BedrockModel::Profile(_))));
459 }
460
461 #[test]
462 fn parse_available_model_rejects_unknown_catalog_model() {
463 let available: Vec<LlmModel> = vec![DEEPSEEK.parse().unwrap()];
464 assert!(parse_available_model(SONNET, &available).is_err());
465 }
466
467 #[test]
468 fn parse_available_model_accepts_catalog_model_when_present() {
469 let sonnet: LlmModel = SONNET.parse().unwrap();
470 let available = vec![sonnet.clone()];
471 assert_eq!(parse_available_model(SONNET, &available).unwrap(), sonnet);
472 }
473}
474
475impl SessionManager {
476 pub async fn initialize(&self, args: InitializeRequest) -> Result<InitializeResponse, acp::Error> {
477 info!("Received initialize request: {:?}", args);
478 let auth_methods = build_auth_methods(self.oauth_credential_store.as_ref());
479 let available = get_local_models().await;
480 Ok(InitializeResponse::new(ProtocolVersion::V1)
481 .agent_info(Implementation::new("Aether", "0.1.0"))
482 .agent_capabilities(
483 AgentCapabilities::new()
484 .load_session(true)
485 .mcp_capabilities(McpCapabilities::new().http(true).sse(true))
486 .session_capabilities(acp::SessionCapabilities::new().list(acp::SessionListCapabilities::new()))
487 .prompt_capabilities(prompt_capabilities_for_models(&available)),
488 )
489 .auth_methods(auth_methods))
490 }
491
492 pub async fn authenticate(
493 &self,
494 args: AuthenticateRequest,
495 cx: &ConnectionTo<Client>,
496 ) -> Result<AuthenticateResponse, acp::Error> {
497 info!("Received authenticate request: {:?}", args);
498 let method_id = args.method_id.0.as_ref();
499 match method_id {
500 "codex" => {
501 llm::perform_codex_oauth_flow(self.oauth_credential_store.as_ref()).await.map_err(|e| {
502 error!("OAuth flow failed for {method_id}: {e}");
503 acp::Error::internal_error()
504 })?;
505 }
506 _ => return Err(acp::Error::invalid_params()),
507 }
508 let auth_methods = build_auth_methods(self.oauth_credential_store.as_ref());
509 if let Err(e) = cx
510 .send_notification(AuthMethodsUpdatedParams { auth_methods })
511 .map_err(|e| AcpServerError::protocol("_aether/auth_methods_updated", e))
512 {
513 error!("Failed to send auth methods updated notification: {:?}", e);
514 }
515
516 let available = get_local_models().await;
517 let all_models = get_all_models(&available);
518 let snapshots = self.registry.snapshot_all_configs().await;
519
520 for (id, snap) in snapshots {
521 let options = options_from_snapshot(&snap, &available, &all_models, self.oauth_credential_store.as_ref());
522 let notification = SessionNotification::new(
523 SessionId::new(id),
524 SessionUpdate::ConfigOptionUpdate(ConfigOptionUpdate::new(options)),
525 );
526 let _ = cx.send_notification(notification);
527 }
528
529 Ok(AuthenticateResponse::default())
530 }
531
532 pub async fn new_session(
533 &self,
534 mut args: NewSessionRequest,
535 cx: &ConnectionTo<Client>,
536 ) -> Result<NewSessionResponse, acp::Error> {
537 if std::env::var("AETHER_INSIDE_SANDBOX").is_ok() {
540 let container_cwd = std::env::current_dir().unwrap_or_else(|_| "/workspace".into());
541 info!("Sandbox: remapping cwd {:?} -> {:?}", args.cwd, container_cwd);
542 args.cwd = container_cwd;
543 }
544
545 info!("Creating new session with cwd: {:?}", args.cwd);
546 let session_id = uuid::Uuid::new_v4().to_string();
547 let acp_session_id = acp::SessionId::new(session_id.clone());
548
549 let mode_catalog = self.load_mode_catalog(&args.cwd).await?;
550 let default_model = pick_default_model(&mode_catalog.available).ok_or_else(|| {
551 error!("No models available — set an API key env var (e.g. ANTHROPIC_API_KEY)");
552 acp::Error::internal_error()
553 })?;
554
555 let ResolvedInitialSession { spec, selected_mode } =
556 self.resolve_initial_session(&mode_catalog, default_model)?;
557 let model_str = spec.model.clone();
558 let reasoning_effort = spec.reasoning_effort;
559
560 let session = Session::new(
561 spec,
562 args.cwd.clone(),
563 map_acp_mcp_servers(args.mcp_servers),
564 None,
565 Some(session_id.clone()),
566 Arc::clone(&self.oauth_credential_store),
567 )
568 .await
569 .map_err(|e| {
570 error!("Failed to create session: {}", e);
571 acp::Error::internal_error()
572 })?;
573
574 let available_commands = session.list_available_commands().await.map_err(|e| {
575 error!("Failed to list available commands: {}", e);
576 acp::Error::internal_error()
577 })?;
578
579 let meta = SessionMeta {
580 session_id: session_id.clone(),
581 cwd: args.cwd.clone(),
582 model: model_str.clone(),
583 selected_mode: selected_mode.clone(),
584 created_at: IsoString::now().0,
585 };
586 if let Err(e) = self.session_store.append_meta(&session_id, &meta) {
587 error!("Failed to write session meta: {e}");
588 }
589
590 let config_options = self
591 .register_session(
592 session,
593 &session_id,
594 &acp_session_id,
595 &model_str,
596 selected_mode,
597 reasoning_effort,
598 mode_catalog.modes,
599 cx,
600 )
601 .await;
602
603 info!("Session {} created successfully", session_id);
604
605 let response = NewSessionResponse::new(acp_session_id.clone()).config_options(config_options);
606
607 Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
608
609 Ok(response)
610 }
611
612 pub fn list_sessions(&self, args: &ListSessionsRequest) -> Result<ListSessionsResponse, acp::Error> {
613 info!("Listing sessions, cwd filter: {:?}", args.cwd);
614 let mut summaries = self.session_store.list();
615
616 if let Some(cwd) = args.cwd.as_ref() {
617 summaries.retain(|s| s.meta.cwd == *cwd);
618 }
619
620 let sessions: Vec<acp::SessionInfo> = summaries
621 .into_iter()
622 .map(|s| acp::SessionInfo::new(s.meta.session_id, s.meta.cwd).updated_at(s.meta.created_at).title(s.title))
623 .collect();
624
625 info!("Found {} sessions", sessions.len());
626 Ok(ListSessionsResponse::new(sessions))
627 }
628
629 pub async fn load_session(
630 &self,
631 args: LoadSessionRequest,
632 cx: &ConnectionTo<Client>,
633 ) -> Result<LoadSessionResponse, acp::Error> {
634 let session_id = args.session_id.0.to_string();
635 info!("Loading session: {session_id}");
636
637 let (meta, events) = self.session_store.load(&session_id).ok_or_else(|| {
638 error!("Session not found: {session_id}");
639 acp::Error::invalid_params()
640 })?;
641
642 let context = Context::from_events(&events);
643 let mode_catalog = self.load_mode_catalog(&args.cwd).await?;
644
645 let spec = self.apply_provider_connection_overrides(if let Some(mode_name) = meta.selected_mode.as_deref() {
646 resolve_agent_spec(&mode_catalog.catalog, mode_name)?
647 } else {
648 let parsed_model: LlmModel = meta.model.parse().map_err(|e: String| {
649 error!("Failed to parse restored model '{}': {e}", meta.model);
650 acp::Error::invalid_params()
651 })?;
652 AgentSpec::default_spec(&parsed_model, None, Vec::new())
653 });
654
655 let model = spec.model.clone();
656
657 let restored_messages: Vec<_> = context.messages().iter().filter(|m| !m.is_system()).cloned().collect();
658
659 let session = Session::new(
660 spec,
661 args.cwd.clone(),
662 map_acp_mcp_servers(args.mcp_servers),
663 Some(restored_messages),
664 Some(session_id.clone()),
665 Arc::clone(&self.oauth_credential_store),
666 )
667 .await
668 .map_err(|e| {
669 error!("Failed to create session for load: {e}");
670 acp::Error::internal_error()
671 })?;
672
673 let available_commands = session.list_available_commands().await.map_err(|e| {
674 error!("Failed to list available commands: {e}");
675 acp::Error::internal_error()
676 })?;
677
678 let acp_session_id = acp::SessionId::new(session_id.clone());
679
680 let config_options = self
681 .register_session(
682 session,
683 &session_id,
684 &acp_session_id,
685 &model,
686 meta.selected_mode,
687 None,
688 mode_catalog.modes,
689 cx,
690 )
691 .await;
692
693 info!("Session {session_id} loaded successfully");
694
695 let response = LoadSessionResponse::new().config_options(config_options);
696
697 let cx_clone = cx.clone();
698 let replay_session_id = acp_session_id.clone();
699 spawn(async move {
700 replay_to_client(&events, &cx_clone, &replay_session_id).await;
701 });
702
703 Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
704
705 Ok(response)
706 }
707
708 pub async fn prompt(&self, args: acp::PromptRequest) -> Result<acp::PromptResponse, acp::Error> {
709 info!("Received prompt for session: {:?}", args.session_id);
710 let session_id_str = args.session_id.0.to_string();
711 let content = map_acp_to_content_blocks(args.prompt);
712
713 let model = self.registry.effective_model(&session_id_str).await.ok_or_else(|| {
714 error!("Session not found: {}", session_id_str);
715 acp::Error::invalid_params()
716 })?;
717 validate_prompt_support(&model, &content)?;
718
719 let dispatch = self.registry.begin_prompt(&session_id_str).await.ok_or_else(|| {
720 error!("Session not found: {}", session_id_str);
721 acp::Error::invalid_params()
722 })?;
723
724 let (result_tx, result_rx) = oneshot::channel();
725 dispatch
726 .relay_tx
727 .send(SessionCommand::Prompt {
728 content,
729 switch_model: dispatch.switch_model,
730 reasoning_effort: dispatch.reasoning_effort,
731 result_tx,
732 })
733 .await
734 .map_err(|_| {
735 error!("Relay channel closed for session {}", session_id_str);
736 acp::Error::internal_error()
737 })?;
738
739 let stop_reason = result_rx
740 .await
741 .map_err(|_| {
742 error!("Relay dropped result channel for session {}", session_id_str);
743 acp::Error::internal_error()
744 })?
745 .map_err(|e| {
746 error!("Relay error for session {}: {}", session_id_str, e);
747 acp::Error::internal_error()
748 })?;
749
750 info!("Prompt completed with stop reason: {:?}", stop_reason);
751 Ok(PromptResponse::new(stop_reason))
752 }
753
754 pub async fn cancel(&self, args: acp::CancelNotification) -> Result<(), acp::Error> {
755 info!("Received cancel for session: {:?}", args.session_id);
756 let session_id_str = args.session_id.0.to_string();
757 let relay = self.registry.relay(&session_id_str).await.ok_or_else(|| {
758 error!("Session not found for cancel: {}", session_id_str);
759 acp::Error::invalid_params()
760 })?;
761
762 relay.cmd.send(SessionCommand::Cancel).await.map_err(|_| {
763 error!("Relay channel closed for cancel: {}", session_id_str);
764 acp::Error::internal_error()
765 })?;
766
767 Ok(())
768 }
769
770 pub async fn set_session_config_option(
771 &self,
772 args: SetSessionConfigOptionRequest,
773 ) -> Result<SetSessionConfigOptionResponse, acp::Error> {
774 let session_id_str = args.session_id.0.to_string();
775 let config_id = args.config_id.0.to_string();
776 let value = args.value.0.to_string();
777
778 info!("set_session_config_option: session={}, config={}, value={}", session_id_str, config_id, value);
779
780 let setting = ConfigSetting::parse(&config_id, &value).map_err(|e| {
781 error!("{e}");
782 acp::Error::invalid_params()
783 })?;
784
785 let available = get_local_models().await;
786 let all_models = get_all_models(&available);
787
788 let snapshot =
789 self.registry.apply_config_change(&session_id_str, &setting, &available).await.ok_or_else(|| {
790 error!("Session not found: {}", session_id_str);
791 acp::Error::invalid_params()
792 })??;
793
794 let options = options_from_snapshot(&snapshot, &available, &all_models, self.oauth_credential_store.as_ref());
795 Ok(SetSessionConfigOptionResponse::new(options))
796 }
797
798 pub async fn on_mcp_request(&self, request: McpRequest) -> Result<(), acp::Error> {
799 info!("Received MCP ext request: {:?}", request);
800 match request {
801 McpRequest::Authenticate { session_id, server_name } => {
802 let relay = self.registry.relay(&session_id).await.ok_or_else(|| {
803 error!("Session not found for authenticate_mcp_server: {}", session_id);
804 acp::Error::invalid_params()
805 })?;
806
807 relay.mcp_request.send(McpRequest::Authenticate { session_id, server_name }).await.map_err(|_| {
808 error!("MCP request channel closed for session");
809 acp::Error::internal_error()
810 })?;
811 }
812 }
813
814 Ok(())
815 }
816}