1use acp_utils::notifications::{AuthMethodsUpdatedParams, McpRequest};
2use acp_utils::server::AcpServerError;
3use aether_auth::OAuthCredentialStorage;
4use agent_client_protocol::schema::{
5 self as acp, AgentCapabilities, AuthMethod, AuthenticateRequest, AuthenticateResponse, AvailableCommandsUpdate,
6 ConfigOptionUpdate, Implementation, InitializeRequest, InitializeResponse, ListSessionsRequest,
7 ListSessionsResponse, LoadSessionRequest, LoadSessionResponse, McpCapabilities, NewSessionRequest,
8 NewSessionResponse, PromptCapabilities, PromptResponse, ProtocolVersion, SessionId, SessionNotification,
9 SessionUpdate, SetSessionConfigOptionRequest, SetSessionConfigOptionResponse,
10};
11use agent_client_protocol::{Client, ConnectionTo};
12use llm::catalog::{LlmModel, get_local_models};
13use llm::types::IsoString;
14use llm::{ContentBlock, ProviderConnectionOverrides, ReasoningEffort};
15use std::collections::HashSet;
16use std::path::Path;
17use std::sync::Arc;
18use tokio::sync::oneshot;
19use tracing::{error, info, warn};
20
21use super::config_setting::ConfigSetting;
22use super::mappers::{map_acp_mcp_servers, replay_to_client};
23use super::model_config::{
24 ValidatedMode, build_config_options_from_modes, model_exists, pick_default_model, supports_prompt_audio,
25 validated_modes_from_specs,
26};
27use super::relay::{SessionCommand, spawn_relay};
28use super::session::Session;
29use super::session_registry::{ConfigSnapshot, SessionRegistry};
30use super::session_store::{SessionMeta, SessionStore};
31use crate::settings_args::SettingsSourceArgs;
32use acp_utils::content::format_embedded_resource;
33use aether_core::agent_spec::AgentSpec;
34use aether_core::context::ext::ContextExt;
35use aether_project::{AetherSettings, AgentCatalog};
36use llm::Context;
37
38#[derive(Clone, Debug, Default)]
40pub enum InitialSessionSelection {
41 #[default]
42 Default,
43 Agent(String),
44 Model {
45 model: String,
46 reasoning_effort: Option<ReasoningEffort>,
47 },
48}
49
50impl InitialSessionSelection {
51 pub fn agent(name: String) -> Self {
52 Self::Agent(name)
53 }
54
55 pub fn model(model: String, reasoning_effort: Option<ReasoningEffort>) -> Self {
56 Self::Model { model, reasoning_effort }
57 }
58}
59
60pub struct SessionManager {
62 registry: Arc<SessionRegistry>,
63 session_store: Arc<SessionStore>,
64 oauth_credential_store: Arc<dyn OAuthCredentialStorage>,
65 initial_selection: InitialSessionSelection,
66 settings_source: SettingsSourceArgs,
67 provider_connections: ProviderConnectionOverrides,
68}
69
70pub(crate) struct SessionManagerConfig {
71 pub(crate) registry: Arc<SessionRegistry>,
72 pub(crate) session_store: Arc<SessionStore>,
73 pub(crate) oauth_credential_store: Arc<dyn OAuthCredentialStorage>,
74 pub(crate) initial_selection: InitialSessionSelection,
75 pub(crate) settings_source: SettingsSourceArgs,
76 pub(crate) provider_connections: ProviderConnectionOverrides,
77}
78
79struct SessionModeCatalog {
80 catalog: AgentCatalog,
81 modes: Vec<ValidatedMode>,
82 available: Vec<LlmModel>,
83}
84
85struct ResolvedInitialSession {
86 spec: AgentSpec,
87 selected_mode: Option<String>,
88}
89
90#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
91struct PromptModalities {
92 image: bool,
93 audio: bool,
94}
95
96impl PromptModalities {
97 fn from_content(content: &[ContentBlock]) -> Self {
98 Self {
99 image: content.iter().any(ContentBlock::is_image),
100 audio: content.iter().any(|block| matches!(block, ContentBlock::Audio { .. })),
101 }
102 }
103
104 fn is_empty(self) -> bool {
105 !self.image && !self.audio
106 }
107}
108
109impl SessionManager {
110 pub(crate) fn new(deps: SessionManagerConfig) -> Self {
111 Self {
112 registry: deps.registry,
113 session_store: deps.session_store,
114 oauth_credential_store: deps.oauth_credential_store,
115 initial_selection: deps.initial_selection,
116 settings_source: deps.settings_source,
117 provider_connections: deps.provider_connections,
118 }
119 }
120
121 fn apply_provider_connection_overrides(&self, mut spec: AgentSpec) -> AgentSpec {
122 spec.provider_connections.merge(self.provider_connections.clone());
123 spec
124 }
125
126 fn resolve_initial_session(
127 &self,
128 mode_catalog: &SessionModeCatalog,
129 default_model: &LlmModel,
130 ) -> Result<ResolvedInitialSession, acp::Error> {
131 let mut resolved = match &self.initial_selection {
132 InitialSessionSelection::Default => resolve_default_initial_session(mode_catalog, default_model),
133 InitialSessionSelection::Agent(agent) => {
134 if !mode_catalog.modes.iter().any(|mode| mode.name == *agent) {
135 warn!("Unknown agent `{agent}` requested via --agent");
136 return Err(acp::Error::invalid_params());
137 }
138 resolve_agent_spec(&mode_catalog.catalog, agent)
139 .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(agent.clone()) })
140 }
141 InitialSessionSelection::Model { model, reasoning_effort } => {
142 let model = parse_available_model(model, &mode_catalog.available)?;
143 Ok(ResolvedInitialSession {
144 spec: AgentSpec::default_spec(&model, *reasoning_effort, Vec::new()),
145 selected_mode: None,
146 })
147 }
148 }?;
149 resolved.spec = self.apply_provider_connection_overrides(resolved.spec);
150 Ok(resolved)
151 }
152
153 async fn load_mode_catalog(&self, cwd: &Path) -> Result<SessionModeCatalog, acp::Error> {
154 let config = if let Some(source) = self.settings_source.source(cwd) {
155 AetherSettings::load(cwd, [source])
156 } else {
157 AetherSettings::load_default(cwd)
158 }
159 .map_err(|e| {
160 error!("Failed to load agent catalog: {e}");
161 acp::Error::invalid_params()
162 })?;
163 let catalog = if config.agents.is_empty() {
164 AgentCatalog::empty(cwd.to_path_buf())
165 } else {
166 AgentCatalog::from_settings(cwd, config).map_err(|e| {
167 error!("Failed to load agent catalog: {e}");
168 acp::Error::invalid_params()
169 })?
170 };
171
172 let available = get_local_models().await;
173 let specs: Vec<_> = catalog.user_invocable().cloned().collect();
174 let modes = validated_modes_from_specs(&specs, &available);
175
176 Ok(SessionModeCatalog { catalog, modes, available })
177 }
178
179 #[allow(clippy::too_many_arguments, clippy::similar_names)]
180 async fn register_session(
181 &self,
182 session: Session,
183 session_id: &str,
184 acp_session_id: &SessionId,
185 model: &str,
186 selected_mode: Option<String>,
187 reasoning_effort: Option<ReasoningEffort>,
188 modes: Vec<ValidatedMode>,
189 cx: &ConnectionTo<Client>,
190 ) -> Vec<acp::SessionConfigOption> {
191 let relay = spawn_relay(
192 session,
193 cx.clone(),
194 acp_session_id.clone(),
195 self.session_store.clone(),
196 Arc::clone(&self.oauth_credential_store),
197 );
198
199 self.registry
200 .insert(
201 session_id.to_string(),
202 relay,
203 model.to_string(),
204 selected_mode.clone(),
205 reasoning_effort,
206 modes.clone(),
207 )
208 .await;
209
210 let available = get_local_models().await;
211 let all_models = get_all_models(&available);
212 build_config_options_from_modes(
213 &modes,
214 &available,
215 selected_mode.as_deref(),
216 model,
217 reasoning_effort,
218 &all_models,
219 self.oauth_credential_store.as_ref(),
220 )
221 }
222
223 fn send_available_commands_notification(
224 available_commands: Vec<acp::AvailableCommand>,
225 acp_session_id: SessionId,
226 session_id: &str,
227 cx: &ConnectionTo<Client>,
228 ) {
229 if available_commands.is_empty() {
230 return;
231 }
232 let command_count = available_commands.len();
233 let notification = SessionNotification::new(
234 acp_session_id,
235 SessionUpdate::AvailableCommandsUpdate(AvailableCommandsUpdate::new(available_commands)),
236 );
237 if let Err(e) = cx.send_notification(notification).map_err(|e| AcpServerError::protocol("session/update", e)) {
238 error!("Failed to send available commands notification: {:?}", e);
239 } else {
240 info!("Sent available commands update for session {} ({} commands)", session_id, command_count);
241 }
242 }
243
244 pub async fn shutdown_all_sessions(&self) {
247 self.registry.shutdown_all().await;
248 }
249}
250
251fn options_from_snapshot(
252 snapshot: &ConfigSnapshot,
253 available: &[LlmModel],
254 all_models: &[LlmModel],
255 credential_store: &dyn OAuthCredentialStorage,
256) -> Vec<acp::SessionConfigOption> {
257 build_config_options_from_modes(
258 &snapshot.modes,
259 available,
260 snapshot.selected_mode.as_deref(),
261 &snapshot.effective_model,
262 snapshot.reasoning_effort,
263 all_models,
264 credential_store,
265 )
266}
267
268fn get_all_models(discovered: &[LlmModel]) -> Vec<LlmModel> {
271 let mut all = LlmModel::all().to_vec();
272 for m in discovered {
273 if !all.contains(m) {
274 all.push(m.clone());
275 }
276 }
277 all
278}
279
280fn build_auth_methods(store: &dyn OAuthCredentialStorage) -> Vec<AuthMethod> {
281 let mut seen = HashSet::new();
282 LlmModel::all()
283 .iter()
284 .filter_map(LlmModel::oauth_provider_id)
285 .filter(|id| seen.insert(*id))
286 .map(|id| {
287 let display = LlmModel::all()
288 .iter()
289 .find(|m| m.oauth_provider_id() == Some(id))
290 .map_or(id, |m| m.provider_display_name());
291 let mut method = acp::AuthMethodAgent::new(id, display);
292 if store.has_credential(id) {
293 method = method.description("authenticated");
294 }
295 AuthMethod::Agent(method)
296 })
297 .collect()
298}
299
300fn map_acp_to_content_blocks(blocks: Vec<acp::ContentBlock>) -> Vec<ContentBlock> {
301 blocks
302 .into_iter()
303 .map(|block| match block {
304 acp::ContentBlock::Text(t) => ContentBlock::text(t.text),
305 acp::ContentBlock::Image(img) => ContentBlock::Image { data: img.data, mime_type: img.mime_type },
306 acp::ContentBlock::Audio(aud) => ContentBlock::Audio { data: aud.data, mime_type: aud.mime_type },
307 acp::ContentBlock::Resource(r) => ContentBlock::text(format_embedded_resource(&r)),
308 acp::ContentBlock::ResourceLink(l) => ContentBlock::text(format!("[Resource: {}]", l.uri)),
309 _ => ContentBlock::text("[Unknown content]"),
310 })
311 .collect()
312}
313
314fn resolve_agent_spec(catalog: &AgentCatalog, mode_name: &str) -> Result<AgentSpec, acp::Error> {
315 catalog.resolve(mode_name).map_err(|e| {
316 error!("Failed to resolve runtime inputs for mode '{}': {e}", mode_name);
317 acp::Error::invalid_params()
318 })
319}
320
321fn resolve_default_initial_session(
322 mode_catalog: &SessionModeCatalog,
323 default_model: &LlmModel,
324) -> Result<ResolvedInitialSession, acp::Error> {
325 if let Some(mode) = mode_catalog.modes.first() {
326 return resolve_agent_spec(&mode_catalog.catalog, &mode.name)
327 .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(mode.name.clone()) });
328 }
329
330 Ok(ResolvedInitialSession { spec: AgentSpec::default_spec(default_model, None, Vec::new()), selected_mode: None })
331}
332
333fn parse_available_model(model: &str, available: &[LlmModel]) -> Result<LlmModel, acp::Error> {
334 let parsed = model.parse().map_err(|e: String| {
335 warn!("Failed to parse --model `{model}`: {e}");
336 acp::Error::invalid_params()
337 })?;
338
339 if model_exists(available, model) {
340 Ok(parsed)
341 } else {
342 warn!("Requested model `{model}` is not available");
343 Err(acp::Error::invalid_params())
344 }
345}
346
347fn prompt_capabilities_for_models(models: &[LlmModel]) -> PromptCapabilities {
348 PromptCapabilities::new()
349 .embedded_context(true)
350 .image(models.iter().any(LlmModel::supports_image))
351 .audio(models.iter().any(supports_prompt_audio))
352}
353
354fn selected_models(model_value: &str) -> Result<Vec<LlmModel>, acp::Error> {
355 model_value
356 .split(',')
357 .map(str::trim)
358 .filter(|part| !part.is_empty())
359 .map(|part| part.parse::<LlmModel>().map_err(|_| acp::Error::invalid_params()))
360 .collect()
361}
362
363fn validate_prompt_support(model_value: &str, content: &[ContentBlock]) -> Result<(), acp::Error> {
364 let modalities = PromptModalities::from_content(content);
365 if modalities.is_empty() {
366 return Ok(());
367 }
368
369 let selected = selected_models(model_value)?;
370 if modalities.image && selected.iter().any(|model| !model.supports_image()) {
371 return Err(acp::Error::invalid_params());
372 }
373 if modalities.audio && selected.iter().any(|model| !supports_prompt_audio(model)) {
374 return Err(acp::Error::invalid_params());
375 }
376
377 Ok(())
378}
379
380#[cfg(test)]
381#[allow(clippy::items_after_test_module)]
382mod tests {
383 use super::*;
384 use agent_client_protocol::schema::{InitializeRequest, ProtocolVersion};
385
386 const SONNET: &str = "anthropic:claude-sonnet-4-5";
387 const DEEPSEEK: &str = "deepseek:deepseek-chat";
388 const BEDROCK_ARN_AS_MODEL_REJECTED: &str =
389 "bedrock:arn:aws:bedrock:us-west-2:000000000000:application-inference-profile/000000000000";
390
391 fn mock_oauth_store() -> Arc<dyn OAuthCredentialStorage> {
392 Arc::new(aether_auth::FakeOAuthCredentialStore::new())
393 }
394
395 #[tokio::test]
396 async fn initialize_always_advertises_load_session_support() {
397 let session_store =
398 SessionStore::new().map_or_else(|e| panic!("Failed to initialize session store: {e}"), Arc::new);
399 let manager = SessionManager::new(SessionManagerConfig {
400 registry: Arc::new(SessionRegistry::new()),
401 session_store,
402 oauth_credential_store: mock_oauth_store(),
403 initial_selection: InitialSessionSelection::default(),
404 settings_source: SettingsSourceArgs::default(),
405 provider_connections: ProviderConnectionOverrides::default(),
406 });
407 let response =
408 manager.initialize(InitializeRequest::new(ProtocolVersion::LATEST)).await.expect("initialize succeeds");
409 let json = serde_json::to_string(&response).expect("response serializes");
410 assert!(json.contains("\"loadSession\":true"));
411 }
412
413 #[test]
414 fn prompt_capabilities_reflect_available_modalities() {
415 let image_only = prompt_capabilities_for_models(&["anthropic:claude-sonnet-4-5".parse().unwrap()]);
416 assert!(image_only.image);
417 assert!(!image_only.audio);
418
419 let audio_capable =
420 prompt_capabilities_for_models(&["gemini:gemini-live-2.5-flash-preview-native-audio".parse().unwrap()]);
421 assert!(!audio_capable.image);
422 assert!(audio_capable.audio);
423
424 let text_only = prompt_capabilities_for_models(&[DEEPSEEK.parse().unwrap()]);
425 assert!(!text_only.image);
426 assert!(!text_only.audio);
427 }
428
429 #[test]
430 fn validate_prompt_support_requires_all_selected_models_to_support_media() {
431 let image_content = vec![ContentBlock::Image { data: "aW1n".to_string(), mime_type: "image/png".to_string() }];
432 let audio_content =
433 vec![ContentBlock::Audio { data: "YXVkaW8=".to_string(), mime_type: "audio/wav".to_string() }];
434
435 assert!(validate_prompt_support(SONNET, &image_content).is_ok());
436 assert!(validate_prompt_support(DEEPSEEK, &image_content).is_err());
437 assert!(validate_prompt_support("gemini:gemini-live-2.5-flash-preview-native-audio", &audio_content,).is_ok());
438 assert!(validate_prompt_support(SONNET, &audio_content).is_err());
439 assert!(
440 validate_prompt_support("anthropic:claude-sonnet-4-5,deepseek:deepseek-chat", &image_content,).is_err()
441 );
442 assert!(
443 validate_prompt_support(
444 "gemini:gemini-live-2.5-flash-preview-native-audio,deepseek:deepseek-chat",
445 &audio_content,
446 )
447 .is_err()
448 );
449 }
450
451 #[test]
452 fn parse_available_model_rejects_bedrock_inference_profile_arn() {
453 let available: Vec<LlmModel> = Vec::new();
454 let error = parse_available_model(BEDROCK_ARN_AS_MODEL_REJECTED, &available).unwrap_err();
455 assert_eq!(error, acp::Error::invalid_params());
456 }
457
458 #[test]
459 fn parse_available_model_rejects_unknown_catalog_model() {
460 let available: Vec<LlmModel> = vec![DEEPSEEK.parse().unwrap()];
461 assert!(parse_available_model(SONNET, &available).is_err());
462 }
463
464 #[test]
465 fn parse_available_model_accepts_catalog_model_when_present() {
466 let sonnet: LlmModel = SONNET.parse().unwrap();
467 let available = vec![sonnet.clone()];
468 assert_eq!(parse_available_model(SONNET, &available).unwrap(), sonnet);
469 }
470}
471
472impl SessionManager {
473 pub async fn initialize(&self, args: InitializeRequest) -> Result<InitializeResponse, acp::Error> {
474 info!("Received initialize request: {:?}", args);
475 let auth_methods = build_auth_methods(self.oauth_credential_store.as_ref());
476 let available = get_local_models().await;
477 Ok(InitializeResponse::new(ProtocolVersion::V1)
478 .agent_info(Implementation::new("Aether", "0.1.0"))
479 .agent_capabilities(
480 AgentCapabilities::new()
481 .load_session(true)
482 .mcp_capabilities(McpCapabilities::new().http(true).sse(true))
483 .session_capabilities(acp::SessionCapabilities::new().list(acp::SessionListCapabilities::new()))
484 .prompt_capabilities(prompt_capabilities_for_models(&available)),
485 )
486 .auth_methods(auth_methods))
487 }
488
489 pub async fn authenticate(
490 &self,
491 args: AuthenticateRequest,
492 cx: &ConnectionTo<Client>,
493 ) -> Result<AuthenticateResponse, acp::Error> {
494 info!("Received authenticate request: {:?}", args);
495 let method_id = args.method_id.0.as_ref();
496 match method_id {
497 "codex" => {
498 llm::perform_codex_oauth_flow(self.oauth_credential_store.as_ref()).await.map_err(|e| {
499 error!("OAuth flow failed for {method_id}: {e}");
500 acp::Error::internal_error()
501 })?;
502 }
503 _ => return Err(acp::Error::invalid_params()),
504 }
505 let auth_methods = build_auth_methods(self.oauth_credential_store.as_ref());
506 if let Err(e) = cx
507 .send_notification(AuthMethodsUpdatedParams { auth_methods })
508 .map_err(|e| AcpServerError::protocol("_aether/auth_methods_updated", e))
509 {
510 error!("Failed to send auth methods updated notification: {:?}", e);
511 }
512
513 let available = get_local_models().await;
514 let all_models = get_all_models(&available);
515 let snapshots = self.registry.snapshot_all_configs().await;
516
517 for (id, snap) in snapshots {
518 let options = options_from_snapshot(&snap, &available, &all_models, self.oauth_credential_store.as_ref());
519 let notification = SessionNotification::new(
520 SessionId::new(id),
521 SessionUpdate::ConfigOptionUpdate(ConfigOptionUpdate::new(options)),
522 );
523 let _ = cx.send_notification(notification);
524 }
525
526 Ok(AuthenticateResponse::default())
527 }
528
529 pub async fn new_session(
530 &self,
531 mut args: NewSessionRequest,
532 cx: &ConnectionTo<Client>,
533 ) -> Result<NewSessionResponse, acp::Error> {
534 if std::env::var("AETHER_INSIDE_SANDBOX").is_ok() {
537 let container_cwd = std::env::current_dir().unwrap_or_else(|_| "/workspace".into());
538 info!("Sandbox: remapping cwd {:?} -> {:?}", args.cwd, container_cwd);
539 args.cwd = container_cwd;
540 }
541
542 info!("Creating new session with cwd: {:?}", args.cwd);
543 let session_id = uuid::Uuid::new_v4().to_string();
544 let acp_session_id = acp::SessionId::new(session_id.clone());
545
546 let mode_catalog = self.load_mode_catalog(&args.cwd).await?;
547 let default_model = pick_default_model(&mode_catalog.available).ok_or_else(|| {
548 error!("No models available — set an API key env var (e.g. ANTHROPIC_API_KEY)");
549 acp::Error::internal_error()
550 })?;
551
552 let ResolvedInitialSession { spec, selected_mode } =
553 self.resolve_initial_session(&mode_catalog, default_model)?;
554 let model_str = spec.model.clone();
555 let reasoning_effort = spec.reasoning_effort;
556
557 let session = Session::new(
558 spec,
559 args.cwd.clone(),
560 map_acp_mcp_servers(args.mcp_servers),
561 None,
562 Some(session_id.clone()),
563 Arc::clone(&self.oauth_credential_store),
564 )
565 .await
566 .map_err(|e| {
567 error!("Failed to create session: {}", e);
568 acp::Error::internal_error()
569 })?;
570
571 let available_commands = session.list_available_commands().await.map_err(|e| {
572 error!("Failed to list available commands: {}", e);
573 acp::Error::internal_error()
574 })?;
575
576 let meta = SessionMeta {
577 session_id: session_id.clone(),
578 cwd: args.cwd.clone(),
579 model: model_str.clone(),
580 selected_mode: selected_mode.clone(),
581 created_at: IsoString::now().0,
582 };
583 if let Err(e) = self.session_store.append_meta(&session_id, &meta) {
584 error!("Failed to write session meta: {e}");
585 }
586
587 let config_options = self
588 .register_session(
589 session,
590 &session_id,
591 &acp_session_id,
592 &model_str,
593 selected_mode,
594 reasoning_effort,
595 mode_catalog.modes,
596 cx,
597 )
598 .await;
599
600 info!("Session {} created successfully", session_id);
601
602 let response = NewSessionResponse::new(acp_session_id.clone()).config_options(config_options);
603
604 Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
605
606 Ok(response)
607 }
608
609 pub fn list_sessions(&self, args: &ListSessionsRequest) -> Result<ListSessionsResponse, acp::Error> {
610 info!("Listing sessions, cwd filter: {:?}", args.cwd);
611 let mut summaries = self.session_store.list();
612
613 if let Some(cwd) = args.cwd.as_ref() {
614 summaries.retain(|s| s.meta.cwd == *cwd);
615 }
616
617 let sessions: Vec<acp::SessionInfo> = summaries
618 .into_iter()
619 .map(|s| acp::SessionInfo::new(s.meta.session_id, s.meta.cwd).updated_at(s.meta.created_at).title(s.title))
620 .collect();
621
622 info!("Found {} sessions", sessions.len());
623 Ok(ListSessionsResponse::new(sessions))
624 }
625
626 pub async fn load_session(
627 &self,
628 args: LoadSessionRequest,
629 cx: &ConnectionTo<Client>,
630 ) -> Result<LoadSessionResponse, acp::Error> {
631 let session_id = args.session_id.0.to_string();
632 info!("Loading session: {session_id}");
633
634 let (meta, events) = self.session_store.load(&session_id).ok_or_else(|| {
635 error!("Session not found: {session_id}");
636 acp::Error::invalid_params()
637 })?;
638
639 let context = Context::from_events(&events);
640 let mode_catalog = self.load_mode_catalog(&args.cwd).await?;
641
642 let spec = self.apply_provider_connection_overrides(if let Some(mode_name) = meta.selected_mode.as_deref() {
643 resolve_agent_spec(&mode_catalog.catalog, mode_name)?
644 } else {
645 let parsed_model: LlmModel = meta.model.parse().map_err(|e: String| {
646 error!("Failed to parse restored model '{}': {e}", meta.model);
647 acp::Error::invalid_params()
648 })?;
649 AgentSpec::default_spec(&parsed_model, None, Vec::new())
650 });
651
652 let model = spec.model.clone();
653
654 let restored_messages: Vec<_> = context.messages().iter().filter(|m| !m.is_system()).cloned().collect();
655
656 let session = Session::new(
657 spec,
658 args.cwd.clone(),
659 map_acp_mcp_servers(args.mcp_servers),
660 Some(restored_messages),
661 Some(session_id.clone()),
662 Arc::clone(&self.oauth_credential_store),
663 )
664 .await
665 .map_err(|e| {
666 error!("Failed to create session for load: {e}");
667 acp::Error::internal_error()
668 })?;
669
670 let available_commands = session.list_available_commands().await.map_err(|e| {
671 error!("Failed to list available commands: {e}");
672 acp::Error::internal_error()
673 })?;
674
675 let acp_session_id = acp::SessionId::new(session_id.clone());
676
677 let config_options = self
678 .register_session(
679 session,
680 &session_id,
681 &acp_session_id,
682 &model,
683 meta.selected_mode,
684 None,
685 mode_catalog.modes,
686 cx,
687 )
688 .await;
689
690 info!("Session {session_id} loaded successfully");
691
692 let response = LoadSessionResponse::new().config_options(config_options);
693 replay_to_client(&events, cx, &acp_session_id).await;
694 Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
695
696 Ok(response)
697 }
698
699 pub async fn prompt(&self, args: acp::PromptRequest) -> Result<acp::PromptResponse, acp::Error> {
700 info!("Received prompt for session: {:?}", args.session_id);
701 let session_id_str = args.session_id.0.to_string();
702 let content = map_acp_to_content_blocks(args.prompt);
703
704 let model = self.registry.effective_model(&session_id_str).await.ok_or_else(|| {
705 error!("Session not found: {}", session_id_str);
706 acp::Error::invalid_params()
707 })?;
708 validate_prompt_support(&model, &content)?;
709
710 let dispatch = self.registry.begin_prompt(&session_id_str).await.ok_or_else(|| {
711 error!("Session not found: {}", session_id_str);
712 acp::Error::invalid_params()
713 })?;
714
715 let (result_tx, result_rx) = oneshot::channel();
716 dispatch
717 .relay_tx
718 .send(SessionCommand::Prompt {
719 content,
720 switch_model: dispatch.switch_model,
721 reasoning_effort: dispatch.reasoning_effort,
722 result_tx,
723 })
724 .await
725 .map_err(|_| {
726 error!("Relay channel closed for session {}", session_id_str);
727 acp::Error::internal_error()
728 })?;
729
730 let stop_reason = result_rx
731 .await
732 .map_err(|_| {
733 error!("Relay dropped result channel for session {}", session_id_str);
734 acp::Error::internal_error()
735 })?
736 .map_err(|e| {
737 error!("Relay error for session {}: {}", session_id_str, e);
738 acp::Error::internal_error()
739 })?;
740
741 info!("Prompt completed with stop reason: {:?}", stop_reason);
742 Ok(PromptResponse::new(stop_reason))
743 }
744
745 pub async fn cancel(&self, args: acp::CancelNotification) -> Result<(), acp::Error> {
746 info!("Received cancel for session: {:?}", args.session_id);
747 let session_id_str = args.session_id.0.to_string();
748 let relay = self.registry.relay(&session_id_str).await.ok_or_else(|| {
749 error!("Session not found for cancel: {}", session_id_str);
750 acp::Error::invalid_params()
751 })?;
752
753 relay.cmd.send(SessionCommand::Cancel).await.map_err(|_| {
754 error!("Relay channel closed for cancel: {}", session_id_str);
755 acp::Error::internal_error()
756 })?;
757
758 Ok(())
759 }
760
761 pub async fn set_session_config_option(
762 &self,
763 args: SetSessionConfigOptionRequest,
764 ) -> Result<SetSessionConfigOptionResponse, acp::Error> {
765 let session_id_str = args.session_id.0.to_string();
766 let config_id = args.config_id.0.to_string();
767 let value = args.value.0.to_string();
768
769 info!("set_session_config_option: session={}, config={}, value={}", session_id_str, config_id, value);
770
771 let setting = ConfigSetting::parse(&config_id, &value).map_err(|e| {
772 error!("{e}");
773 acp::Error::invalid_params()
774 })?;
775
776 let available = get_local_models().await;
777 let all_models = get_all_models(&available);
778
779 let snapshot =
780 self.registry.apply_config_change(&session_id_str, &setting, &available).await.ok_or_else(|| {
781 error!("Session not found: {}", session_id_str);
782 acp::Error::invalid_params()
783 })??;
784
785 let options = options_from_snapshot(&snapshot, &available, &all_models, self.oauth_credential_store.as_ref());
786 Ok(SetSessionConfigOptionResponse::new(options))
787 }
788
789 pub async fn on_mcp_request(&self, request: McpRequest) -> Result<(), acp::Error> {
790 info!("Received MCP ext request: {:?}", request);
791 match request {
792 McpRequest::Authenticate { session_id, server_name } => {
793 let relay = self.registry.relay(&session_id).await.ok_or_else(|| {
794 error!("Session not found for authenticate_mcp_server: {}", session_id);
795 acp::Error::invalid_params()
796 })?;
797
798 relay.mcp_request.send(McpRequest::Authenticate { session_id, server_name }).await.map_err(|_| {
799 error!("MCP request channel closed for session");
800 acp::Error::internal_error()
801 })?;
802 }
803 }
804
805 Ok(())
806 }
807}