Skip to main content

aether_cli/acp/
session_manager.rs

1use acp_utils::notifications::{AuthMethodsUpdatedParams, McpRequest};
2use acp_utils::server::AcpServerError;
3use aether_auth::OAuthCredentialStorage;
4use agent_client_protocol::schema::{
5    self as acp, AgentCapabilities, AuthMethod, AuthenticateRequest, AuthenticateResponse, AvailableCommandsUpdate,
6    ConfigOptionUpdate, Implementation, InitializeRequest, InitializeResponse, ListSessionsRequest,
7    ListSessionsResponse, LoadSessionRequest, LoadSessionResponse, McpCapabilities, NewSessionRequest,
8    NewSessionResponse, PromptCapabilities, PromptResponse, ProtocolVersion, SessionId, SessionNotification,
9    SessionUpdate, SetSessionConfigOptionRequest, SetSessionConfigOptionResponse,
10};
11use agent_client_protocol::{Client, ConnectionTo};
12use llm::catalog::{LlmModel, get_local_models};
13use llm::types::IsoString;
14use llm::{ContentBlock, ProviderConnectionOverrides, ReasoningEffort};
15use std::collections::HashSet;
16use std::path::Path;
17use std::sync::Arc;
18use tokio::spawn;
19use tokio::sync::oneshot;
20use tracing::{error, info, warn};
21
22use super::config_setting::ConfigSetting;
23use super::mappers::{map_acp_mcp_servers, replay_to_client};
24use super::model_config::{
25    ValidatedMode, build_config_options_from_modes, model_exists, pick_default_model, supports_prompt_audio,
26    validated_modes_from_specs,
27};
28use super::relay::{SessionCommand, spawn_relay};
29use super::session::Session;
30use super::session_registry::{ConfigSnapshot, SessionRegistry};
31use super::session_store::{SessionMeta, SessionStore};
32use crate::settings_args::SettingsSourceArgs;
33use acp_utils::content::format_embedded_resource;
34use aether_core::agent_spec::AgentSpec;
35use aether_core::context::ext::ContextExt;
36use aether_project::{AetherSettings, AgentCatalog};
37use llm::Context;
38
39/// Initial session selection supplied when `aether acp` starts.
40#[derive(Clone, Debug, Default)]
41pub enum InitialSessionSelection {
42    #[default]
43    Default,
44    Agent(String),
45    Model {
46        model: String,
47        reasoning_effort: Option<ReasoningEffort>,
48    },
49}
50
51impl InitialSessionSelection {
52    pub fn agent(name: String) -> Self {
53        Self::Agent(name)
54    }
55
56    pub fn model(model: String, reasoning_effort: Option<ReasoningEffort>) -> Self {
57        Self::Model { model, reasoning_effort }
58    }
59}
60
61/// Manages ACP sessions, each session has its own agent and state
62pub struct SessionManager {
63    registry: Arc<SessionRegistry>,
64    session_store: Arc<SessionStore>,
65    oauth_credential_store: Arc<dyn OAuthCredentialStorage>,
66    initial_selection: InitialSessionSelection,
67    settings_source: SettingsSourceArgs,
68    provider_connections: ProviderConnectionOverrides,
69}
70
71pub(crate) struct SessionManagerConfig {
72    pub(crate) registry: Arc<SessionRegistry>,
73    pub(crate) session_store: Arc<SessionStore>,
74    pub(crate) oauth_credential_store: Arc<dyn OAuthCredentialStorage>,
75    pub(crate) initial_selection: InitialSessionSelection,
76    pub(crate) settings_source: SettingsSourceArgs,
77    pub(crate) provider_connections: ProviderConnectionOverrides,
78}
79
80struct SessionModeCatalog {
81    catalog: AgentCatalog,
82    modes: Vec<ValidatedMode>,
83    available: Vec<LlmModel>,
84}
85
86struct ResolvedInitialSession {
87    spec: AgentSpec,
88    selected_mode: Option<String>,
89}
90
91#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
92struct PromptModalities {
93    image: bool,
94    audio: bool,
95}
96
97impl PromptModalities {
98    fn from_content(content: &[ContentBlock]) -> Self {
99        Self {
100            image: content.iter().any(ContentBlock::is_image),
101            audio: content.iter().any(|block| matches!(block, ContentBlock::Audio { .. })),
102        }
103    }
104
105    fn is_empty(self) -> bool {
106        !self.image && !self.audio
107    }
108}
109
110impl SessionManager {
111    pub(crate) fn new(deps: SessionManagerConfig) -> Self {
112        Self {
113            registry: deps.registry,
114            session_store: deps.session_store,
115            oauth_credential_store: deps.oauth_credential_store,
116            initial_selection: deps.initial_selection,
117            settings_source: deps.settings_source,
118            provider_connections: deps.provider_connections,
119        }
120    }
121
122    fn apply_provider_connection_overrides(&self, mut spec: AgentSpec) -> AgentSpec {
123        spec.provider_connections.merge(self.provider_connections.clone());
124        spec
125    }
126
127    fn resolve_initial_session(
128        &self,
129        mode_catalog: &SessionModeCatalog,
130        default_model: &LlmModel,
131    ) -> Result<ResolvedInitialSession, acp::Error> {
132        let mut resolved = match &self.initial_selection {
133            InitialSessionSelection::Default => resolve_default_initial_session(mode_catalog, default_model),
134            InitialSessionSelection::Agent(agent) => {
135                if !mode_catalog.modes.iter().any(|mode| mode.name == *agent) {
136                    warn!("Unknown agent `{agent}` requested via --agent");
137                    return Err(acp::Error::invalid_params());
138                }
139                resolve_agent_spec(&mode_catalog.catalog, agent)
140                    .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(agent.clone()) })
141            }
142            InitialSessionSelection::Model { model, reasoning_effort } => {
143                let model = parse_available_model(model, &mode_catalog.available)?;
144                Ok(ResolvedInitialSession {
145                    spec: AgentSpec::default_spec(&model, *reasoning_effort, Vec::new()),
146                    selected_mode: None,
147                })
148            }
149        }?;
150        resolved.spec = self.apply_provider_connection_overrides(resolved.spec);
151        Ok(resolved)
152    }
153
154    async fn load_mode_catalog(&self, cwd: &Path) -> Result<SessionModeCatalog, acp::Error> {
155        let config = if let Some(source) = self.settings_source.source(cwd) {
156            AetherSettings::load(cwd, [source])
157        } else {
158            AetherSettings::load_default(cwd)
159        }
160        .map_err(|e| {
161            error!("Failed to load agent catalog: {e}");
162            acp::Error::invalid_params()
163        })?;
164        let catalog = if config.agents.is_empty() {
165            AgentCatalog::empty(cwd.to_path_buf())
166        } else {
167            AgentCatalog::from_settings(cwd, config).map_err(|e| {
168                error!("Failed to load agent catalog: {e}");
169                acp::Error::invalid_params()
170            })?
171        };
172
173        let available = get_local_models().await;
174        let specs: Vec<_> = catalog.user_invocable().cloned().collect();
175        let modes = validated_modes_from_specs(&specs, &available);
176
177        Ok(SessionModeCatalog { catalog, modes, available })
178    }
179
180    #[allow(clippy::too_many_arguments, clippy::similar_names)]
181    async fn register_session(
182        &self,
183        session: Session,
184        session_id: &str,
185        acp_session_id: &SessionId,
186        model: &str,
187        selected_mode: Option<String>,
188        reasoning_effort: Option<ReasoningEffort>,
189        modes: Vec<ValidatedMode>,
190        cx: &ConnectionTo<Client>,
191    ) -> Vec<acp::SessionConfigOption> {
192        let relay = spawn_relay(
193            session,
194            cx.clone(),
195            acp_session_id.clone(),
196            self.session_store.clone(),
197            Arc::clone(&self.oauth_credential_store),
198        );
199
200        self.registry
201            .insert(
202                session_id.to_string(),
203                relay,
204                model.to_string(),
205                selected_mode.clone(),
206                reasoning_effort,
207                modes.clone(),
208            )
209            .await;
210
211        let available = get_local_models().await;
212        let all_models = get_all_models(&available);
213        build_config_options_from_modes(
214            &modes,
215            &available,
216            selected_mode.as_deref(),
217            model,
218            reasoning_effort,
219            &all_models,
220            self.oauth_credential_store.as_ref(),
221        )
222    }
223
224    fn send_available_commands_notification(
225        available_commands: Vec<acp::AvailableCommand>,
226        acp_session_id: SessionId,
227        session_id: &str,
228        cx: &ConnectionTo<Client>,
229    ) {
230        if available_commands.is_empty() {
231            return;
232        }
233        let command_count = available_commands.len();
234        let notification = SessionNotification::new(
235            acp_session_id,
236            SessionUpdate::AvailableCommandsUpdate(AvailableCommandsUpdate::new(available_commands)),
237        );
238        if let Err(e) = cx.send_notification(notification).map_err(|e| AcpServerError::protocol("session/update", e)) {
239            error!("Failed to send available commands notification: {:?}", e);
240        } else {
241            info!("Sent available commands update for session {} ({} commands)", session_id, command_count);
242        }
243    }
244
245    /// Drain every session and stop its relay task. Blocks until every relay
246    /// has exited.
247    pub async fn shutdown_all_sessions(&self) {
248        self.registry.shutdown_all().await;
249    }
250}
251
252fn options_from_snapshot(
253    snapshot: &ConfigSnapshot,
254    available: &[LlmModel],
255    all_models: &[LlmModel],
256    credential_store: &dyn OAuthCredentialStorage,
257) -> Vec<acp::SessionConfigOption> {
258    build_config_options_from_modes(
259        &snapshot.modes,
260        available,
261        snapshot.selected_mode.as_deref(),
262        &snapshot.effective_model,
263        snapshot.reasoning_effort,
264        all_models,
265        credential_store,
266    )
267}
268
269/// Merge catalog `all()` with locally-discovered models for the `all_models`
270/// parameter of `build_model_config_option`.
271fn get_all_models(discovered: &[LlmModel]) -> Vec<LlmModel> {
272    let mut all = LlmModel::all().to_vec();
273    for m in discovered {
274        if !all.contains(m) {
275            all.push(m.clone());
276        }
277    }
278    all
279}
280
281fn build_auth_methods(store: &dyn OAuthCredentialStorage) -> Vec<AuthMethod> {
282    let mut seen = HashSet::new();
283    LlmModel::all()
284        .iter()
285        .filter_map(LlmModel::oauth_provider_id)
286        .filter(|id| seen.insert(*id))
287        .map(|id| {
288            let display = LlmModel::all()
289                .iter()
290                .find(|m| m.oauth_provider_id() == Some(id))
291                .map_or(id, |m| m.provider_display_name());
292            let mut method = acp::AuthMethodAgent::new(id, display);
293            if store.has_credential(id) {
294                method = method.description("authenticated");
295            }
296            AuthMethod::Agent(method)
297        })
298        .collect()
299}
300
301fn map_acp_to_content_blocks(blocks: Vec<acp::ContentBlock>) -> Vec<ContentBlock> {
302    blocks
303        .into_iter()
304        .map(|block| match block {
305            acp::ContentBlock::Text(t) => ContentBlock::text(t.text),
306            acp::ContentBlock::Image(img) => ContentBlock::Image { data: img.data, mime_type: img.mime_type },
307            acp::ContentBlock::Audio(aud) => ContentBlock::Audio { data: aud.data, mime_type: aud.mime_type },
308            acp::ContentBlock::Resource(r) => ContentBlock::text(format_embedded_resource(&r)),
309            acp::ContentBlock::ResourceLink(l) => ContentBlock::text(format!("[Resource: {}]", l.uri)),
310            _ => ContentBlock::text("[Unknown content]"),
311        })
312        .collect()
313}
314
315fn resolve_agent_spec(catalog: &AgentCatalog, mode_name: &str) -> Result<AgentSpec, acp::Error> {
316    catalog.resolve(mode_name).map_err(|e| {
317        error!("Failed to resolve runtime inputs for mode '{}': {e}", mode_name);
318        acp::Error::invalid_params()
319    })
320}
321
322fn resolve_default_initial_session(
323    mode_catalog: &SessionModeCatalog,
324    default_model: &LlmModel,
325) -> Result<ResolvedInitialSession, acp::Error> {
326    if let Some(mode) = mode_catalog.modes.first() {
327        return resolve_agent_spec(&mode_catalog.catalog, &mode.name)
328            .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(mode.name.clone()) });
329    }
330
331    Ok(ResolvedInitialSession { spec: AgentSpec::default_spec(default_model, None, Vec::new()), selected_mode: None })
332}
333
334fn parse_available_model(model: &str, available: &[LlmModel]) -> Result<LlmModel, acp::Error> {
335    let parsed = model.parse().map_err(|e: String| {
336        warn!("Failed to parse --model `{model}`: {e}");
337        acp::Error::invalid_params()
338    })?;
339
340    if model_exists(available, model) {
341        Ok(parsed)
342    } else {
343        warn!("Requested model `{model}` is not available");
344        Err(acp::Error::invalid_params())
345    }
346}
347
348fn prompt_capabilities_for_models(models: &[LlmModel]) -> PromptCapabilities {
349    PromptCapabilities::new()
350        .embedded_context(true)
351        .image(models.iter().any(LlmModel::supports_image))
352        .audio(models.iter().any(supports_prompt_audio))
353}
354
355fn selected_models(model_value: &str) -> Result<Vec<LlmModel>, acp::Error> {
356    model_value
357        .split(',')
358        .map(str::trim)
359        .filter(|part| !part.is_empty())
360        .map(|part| part.parse::<LlmModel>().map_err(|_| acp::Error::invalid_params()))
361        .collect()
362}
363
364fn validate_prompt_support(model_value: &str, content: &[ContentBlock]) -> Result<(), acp::Error> {
365    let modalities = PromptModalities::from_content(content);
366    if modalities.is_empty() {
367        return Ok(());
368    }
369
370    let selected = selected_models(model_value)?;
371    if modalities.image && selected.iter().any(|model| !model.supports_image()) {
372        return Err(acp::Error::invalid_params());
373    }
374    if modalities.audio && selected.iter().any(|model| !supports_prompt_audio(model)) {
375        return Err(acp::Error::invalid_params());
376    }
377
378    Ok(())
379}
380
381#[cfg(test)]
382#[allow(clippy::items_after_test_module)]
383mod tests {
384    use super::*;
385    use agent_client_protocol::schema::{InitializeRequest, ProtocolVersion};
386    use llm::catalog::BedrockModel;
387
388    const SONNET: &str = "anthropic:claude-sonnet-4-5";
389    const DEEPSEEK: &str = "deepseek:deepseek-chat";
390    const BEDROCK_PROFILE_ARN: &str =
391        "bedrock:arn:aws:bedrock:us-west-2:000000000000:application-inference-profile/000000000000";
392
393    fn mock_oauth_store() -> Arc<dyn OAuthCredentialStorage> {
394        Arc::new(aether_auth::FakeOAuthCredentialStore::new())
395    }
396
397    #[tokio::test]
398    async fn initialize_always_advertises_load_session_support() {
399        let session_store =
400            SessionStore::new().map_or_else(|e| panic!("Failed to initialize session store: {e}"), Arc::new);
401        let manager = SessionManager::new(SessionManagerConfig {
402            registry: Arc::new(SessionRegistry::new()),
403            session_store,
404            oauth_credential_store: mock_oauth_store(),
405            initial_selection: InitialSessionSelection::default(),
406            settings_source: SettingsSourceArgs::default(),
407            provider_connections: ProviderConnectionOverrides::default(),
408        });
409        let response =
410            manager.initialize(InitializeRequest::new(ProtocolVersion::LATEST)).await.expect("initialize succeeds");
411        let json = serde_json::to_string(&response).expect("response serializes");
412        assert!(json.contains("\"loadSession\":true"));
413    }
414
415    #[test]
416    fn prompt_capabilities_reflect_available_modalities() {
417        let image_only = prompt_capabilities_for_models(&["anthropic:claude-sonnet-4-5".parse().unwrap()]);
418        assert!(image_only.image);
419        assert!(!image_only.audio);
420
421        let audio_capable =
422            prompt_capabilities_for_models(&["gemini:gemini-live-2.5-flash-preview-native-audio".parse().unwrap()]);
423        assert!(!audio_capable.image);
424        assert!(audio_capable.audio);
425
426        let text_only = prompt_capabilities_for_models(&[DEEPSEEK.parse().unwrap()]);
427        assert!(!text_only.image);
428        assert!(!text_only.audio);
429    }
430
431    #[test]
432    fn validate_prompt_support_requires_all_selected_models_to_support_media() {
433        let image_content = vec![ContentBlock::Image { data: "aW1n".to_string(), mime_type: "image/png".to_string() }];
434        let audio_content =
435            vec![ContentBlock::Audio { data: "YXVkaW8=".to_string(), mime_type: "audio/wav".to_string() }];
436
437        assert!(validate_prompt_support(SONNET, &image_content).is_ok());
438        assert!(validate_prompt_support(DEEPSEEK, &image_content).is_err());
439        assert!(validate_prompt_support("gemini:gemini-live-2.5-flash-preview-native-audio", &audio_content,).is_ok());
440        assert!(validate_prompt_support(SONNET, &audio_content).is_err());
441        assert!(
442            validate_prompt_support("anthropic:claude-sonnet-4-5,deepseek:deepseek-chat", &image_content,).is_err()
443        );
444        assert!(
445            validate_prompt_support(
446                "gemini:gemini-live-2.5-flash-preview-native-audio,deepseek:deepseek-chat",
447                &audio_content,
448            )
449            .is_err()
450        );
451    }
452
453    #[test]
454    fn parse_available_model_accepts_bedrock_inference_profile_arn() {
455        let available: Vec<LlmModel> = Vec::new();
456        let parsed = parse_available_model(BEDROCK_PROFILE_ARN, &available)
457            .expect("Bedrock inference-profile ARN should be accepted");
458        assert!(matches!(parsed, LlmModel::Bedrock(BedrockModel::Profile(_))));
459    }
460
461    #[test]
462    fn parse_available_model_rejects_unknown_catalog_model() {
463        let available: Vec<LlmModel> = vec![DEEPSEEK.parse().unwrap()];
464        assert!(parse_available_model(SONNET, &available).is_err());
465    }
466
467    #[test]
468    fn parse_available_model_accepts_catalog_model_when_present() {
469        let sonnet: LlmModel = SONNET.parse().unwrap();
470        let available = vec![sonnet.clone()];
471        assert_eq!(parse_available_model(SONNET, &available).unwrap(), sonnet);
472    }
473}
474
475impl SessionManager {
476    pub async fn initialize(&self, args: InitializeRequest) -> Result<InitializeResponse, acp::Error> {
477        info!("Received initialize request: {:?}", args);
478        let auth_methods = build_auth_methods(self.oauth_credential_store.as_ref());
479        let available = get_local_models().await;
480        Ok(InitializeResponse::new(ProtocolVersion::V1)
481            .agent_info(Implementation::new("Aether", "0.1.0"))
482            .agent_capabilities(
483                AgentCapabilities::new()
484                    .load_session(true)
485                    .mcp_capabilities(McpCapabilities::new().http(true).sse(true))
486                    .session_capabilities(acp::SessionCapabilities::new().list(acp::SessionListCapabilities::new()))
487                    .prompt_capabilities(prompt_capabilities_for_models(&available)),
488            )
489            .auth_methods(auth_methods))
490    }
491
492    pub async fn authenticate(
493        &self,
494        args: AuthenticateRequest,
495        cx: &ConnectionTo<Client>,
496    ) -> Result<AuthenticateResponse, acp::Error> {
497        info!("Received authenticate request: {:?}", args);
498        let method_id = args.method_id.0.as_ref();
499        match method_id {
500            "codex" => {
501                llm::perform_codex_oauth_flow(self.oauth_credential_store.as_ref()).await.map_err(|e| {
502                    error!("OAuth flow failed for {method_id}: {e}");
503                    acp::Error::internal_error()
504                })?;
505            }
506            _ => return Err(acp::Error::invalid_params()),
507        }
508        let auth_methods = build_auth_methods(self.oauth_credential_store.as_ref());
509        if let Err(e) = cx
510            .send_notification(AuthMethodsUpdatedParams { auth_methods })
511            .map_err(|e| AcpServerError::protocol("_aether/auth_methods_updated", e))
512        {
513            error!("Failed to send auth methods updated notification: {:?}", e);
514        }
515
516        let available = get_local_models().await;
517        let all_models = get_all_models(&available);
518        let snapshots = self.registry.snapshot_all_configs().await;
519
520        for (id, snap) in snapshots {
521            let options = options_from_snapshot(&snap, &available, &all_models, self.oauth_credential_store.as_ref());
522            let notification = SessionNotification::new(
523                SessionId::new(id),
524                SessionUpdate::ConfigOptionUpdate(ConfigOptionUpdate::new(options)),
525            );
526            let _ = cx.send_notification(notification);
527        }
528
529        Ok(AuthenticateResponse::default())
530    }
531
532    pub async fn new_session(
533        &self,
534        mut args: NewSessionRequest,
535        cx: &ConnectionTo<Client>,
536    ) -> Result<NewSessionResponse, acp::Error> {
537        // Inside a sandbox container the client sends the *host* cwd, but the
538        // project is mounted at the container's working directory.
539        if std::env::var("AETHER_INSIDE_SANDBOX").is_ok() {
540            let container_cwd = std::env::current_dir().unwrap_or_else(|_| "/workspace".into());
541            info!("Sandbox: remapping cwd {:?} -> {:?}", args.cwd, container_cwd);
542            args.cwd = container_cwd;
543        }
544
545        info!("Creating new session with cwd: {:?}", args.cwd);
546        let session_id = uuid::Uuid::new_v4().to_string();
547        let acp_session_id = acp::SessionId::new(session_id.clone());
548
549        let mode_catalog = self.load_mode_catalog(&args.cwd).await?;
550        let default_model = pick_default_model(&mode_catalog.available).ok_or_else(|| {
551            error!("No models available — set an API key env var (e.g. ANTHROPIC_API_KEY)");
552            acp::Error::internal_error()
553        })?;
554
555        let ResolvedInitialSession { spec, selected_mode } =
556            self.resolve_initial_session(&mode_catalog, default_model)?;
557        let model_str = spec.model.clone();
558        let reasoning_effort = spec.reasoning_effort;
559
560        let session = Session::new(
561            spec,
562            args.cwd.clone(),
563            map_acp_mcp_servers(args.mcp_servers),
564            None,
565            Some(session_id.clone()),
566            Arc::clone(&self.oauth_credential_store),
567        )
568        .await
569        .map_err(|e| {
570            error!("Failed to create session: {}", e);
571            acp::Error::internal_error()
572        })?;
573
574        let available_commands = session.list_available_commands().await.map_err(|e| {
575            error!("Failed to list available commands: {}", e);
576            acp::Error::internal_error()
577        })?;
578
579        let meta = SessionMeta {
580            session_id: session_id.clone(),
581            cwd: args.cwd.clone(),
582            model: model_str.clone(),
583            selected_mode: selected_mode.clone(),
584            created_at: IsoString::now().0,
585        };
586        if let Err(e) = self.session_store.append_meta(&session_id, &meta) {
587            error!("Failed to write session meta: {e}");
588        }
589
590        let config_options = self
591            .register_session(
592                session,
593                &session_id,
594                &acp_session_id,
595                &model_str,
596                selected_mode,
597                reasoning_effort,
598                mode_catalog.modes,
599                cx,
600            )
601            .await;
602
603        info!("Session {} created successfully", session_id);
604
605        let response = NewSessionResponse::new(acp_session_id.clone()).config_options(config_options);
606
607        Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
608
609        Ok(response)
610    }
611
612    pub fn list_sessions(&self, args: &ListSessionsRequest) -> Result<ListSessionsResponse, acp::Error> {
613        info!("Listing sessions, cwd filter: {:?}", args.cwd);
614        let mut summaries = self.session_store.list();
615
616        if let Some(cwd) = args.cwd.as_ref() {
617            summaries.retain(|s| s.meta.cwd == *cwd);
618        }
619
620        let sessions: Vec<acp::SessionInfo> = summaries
621            .into_iter()
622            .map(|s| acp::SessionInfo::new(s.meta.session_id, s.meta.cwd).updated_at(s.meta.created_at).title(s.title))
623            .collect();
624
625        info!("Found {} sessions", sessions.len());
626        Ok(ListSessionsResponse::new(sessions))
627    }
628
629    pub async fn load_session(
630        &self,
631        args: LoadSessionRequest,
632        cx: &ConnectionTo<Client>,
633    ) -> Result<LoadSessionResponse, acp::Error> {
634        let session_id = args.session_id.0.to_string();
635        info!("Loading session: {session_id}");
636
637        let (meta, events) = self.session_store.load(&session_id).ok_or_else(|| {
638            error!("Session not found: {session_id}");
639            acp::Error::invalid_params()
640        })?;
641
642        let context = Context::from_events(&events);
643        let mode_catalog = self.load_mode_catalog(&args.cwd).await?;
644
645        let spec = self.apply_provider_connection_overrides(if let Some(mode_name) = meta.selected_mode.as_deref() {
646            resolve_agent_spec(&mode_catalog.catalog, mode_name)?
647        } else {
648            let parsed_model: LlmModel = meta.model.parse().map_err(|e: String| {
649                error!("Failed to parse restored model '{}': {e}", meta.model);
650                acp::Error::invalid_params()
651            })?;
652            AgentSpec::default_spec(&parsed_model, None, Vec::new())
653        });
654
655        let model = spec.model.clone();
656
657        let restored_messages: Vec<_> = context.messages().iter().filter(|m| !m.is_system()).cloned().collect();
658
659        let session = Session::new(
660            spec,
661            args.cwd.clone(),
662            map_acp_mcp_servers(args.mcp_servers),
663            Some(restored_messages),
664            Some(session_id.clone()),
665            Arc::clone(&self.oauth_credential_store),
666        )
667        .await
668        .map_err(|e| {
669            error!("Failed to create session for load: {e}");
670            acp::Error::internal_error()
671        })?;
672
673        let available_commands = session.list_available_commands().await.map_err(|e| {
674            error!("Failed to list available commands: {e}");
675            acp::Error::internal_error()
676        })?;
677
678        let acp_session_id = acp::SessionId::new(session_id.clone());
679
680        let config_options = self
681            .register_session(
682                session,
683                &session_id,
684                &acp_session_id,
685                &model,
686                meta.selected_mode,
687                None,
688                mode_catalog.modes,
689                cx,
690            )
691            .await;
692
693        info!("Session {session_id} loaded successfully");
694
695        let response = LoadSessionResponse::new().config_options(config_options);
696
697        let cx_clone = cx.clone();
698        let replay_session_id = acp_session_id.clone();
699        spawn(async move {
700            replay_to_client(&events, &cx_clone, &replay_session_id).await;
701        });
702
703        Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
704
705        Ok(response)
706    }
707
708    pub async fn prompt(&self, args: acp::PromptRequest) -> Result<acp::PromptResponse, acp::Error> {
709        info!("Received prompt for session: {:?}", args.session_id);
710        let session_id_str = args.session_id.0.to_string();
711        let content = map_acp_to_content_blocks(args.prompt);
712
713        let model = self.registry.effective_model(&session_id_str).await.ok_or_else(|| {
714            error!("Session not found: {}", session_id_str);
715            acp::Error::invalid_params()
716        })?;
717        validate_prompt_support(&model, &content)?;
718
719        let dispatch = self.registry.begin_prompt(&session_id_str).await.ok_or_else(|| {
720            error!("Session not found: {}", session_id_str);
721            acp::Error::invalid_params()
722        })?;
723
724        let (result_tx, result_rx) = oneshot::channel();
725        dispatch
726            .relay_tx
727            .send(SessionCommand::Prompt {
728                content,
729                switch_model: dispatch.switch_model,
730                reasoning_effort: dispatch.reasoning_effort,
731                result_tx,
732            })
733            .await
734            .map_err(|_| {
735                error!("Relay channel closed for session {}", session_id_str);
736                acp::Error::internal_error()
737            })?;
738
739        let stop_reason = result_rx
740            .await
741            .map_err(|_| {
742                error!("Relay dropped result channel for session {}", session_id_str);
743                acp::Error::internal_error()
744            })?
745            .map_err(|e| {
746                error!("Relay error for session {}: {}", session_id_str, e);
747                acp::Error::internal_error()
748            })?;
749
750        info!("Prompt completed with stop reason: {:?}", stop_reason);
751        Ok(PromptResponse::new(stop_reason))
752    }
753
754    pub async fn cancel(&self, args: acp::CancelNotification) -> Result<(), acp::Error> {
755        info!("Received cancel for session: {:?}", args.session_id);
756        let session_id_str = args.session_id.0.to_string();
757        let relay = self.registry.relay(&session_id_str).await.ok_or_else(|| {
758            error!("Session not found for cancel: {}", session_id_str);
759            acp::Error::invalid_params()
760        })?;
761
762        relay.cmd.send(SessionCommand::Cancel).await.map_err(|_| {
763            error!("Relay channel closed for cancel: {}", session_id_str);
764            acp::Error::internal_error()
765        })?;
766
767        Ok(())
768    }
769
770    pub async fn set_session_config_option(
771        &self,
772        args: SetSessionConfigOptionRequest,
773    ) -> Result<SetSessionConfigOptionResponse, acp::Error> {
774        let session_id_str = args.session_id.0.to_string();
775        let config_id = args.config_id.0.to_string();
776        let value = args.value.0.to_string();
777
778        info!("set_session_config_option: session={}, config={}, value={}", session_id_str, config_id, value);
779
780        let setting = ConfigSetting::parse(&config_id, &value).map_err(|e| {
781            error!("{e}");
782            acp::Error::invalid_params()
783        })?;
784
785        let available = get_local_models().await;
786        let all_models = get_all_models(&available);
787
788        let snapshot =
789            self.registry.apply_config_change(&session_id_str, &setting, &available).await.ok_or_else(|| {
790                error!("Session not found: {}", session_id_str);
791                acp::Error::invalid_params()
792            })??;
793
794        let options = options_from_snapshot(&snapshot, &available, &all_models, self.oauth_credential_store.as_ref());
795        Ok(SetSessionConfigOptionResponse::new(options))
796    }
797
798    pub async fn on_mcp_request(&self, request: McpRequest) -> Result<(), acp::Error> {
799        info!("Received MCP ext request: {:?}", request);
800        match request {
801            McpRequest::Authenticate { session_id, server_name } => {
802                let relay = self.registry.relay(&session_id).await.ok_or_else(|| {
803                    error!("Session not found for authenticate_mcp_server: {}", session_id);
804                    acp::Error::invalid_params()
805                })?;
806
807                relay.mcp_request.send(McpRequest::Authenticate { session_id, server_name }).await.map_err(|_| {
808                    error!("MCP request channel closed for session");
809                    acp::Error::internal_error()
810                })?;
811            }
812        }
813
814        Ok(())
815    }
816}