Skip to main content

aether_cli/acp/
session_manager.rs

1use acp_utils::notifications::{AuthMethodsUpdatedParams, McpRequest};
2use acp_utils::server::AcpServerError;
3use aether_auth::OAuthCredentialStorage;
4use agent_client_protocol::schema::{
5    self as acp, AgentCapabilities, AuthMethod, AuthenticateRequest, AuthenticateResponse, AvailableCommandsUpdate,
6    ConfigOptionUpdate, Implementation, InitializeRequest, InitializeResponse, ListSessionsRequest,
7    ListSessionsResponse, LoadSessionRequest, LoadSessionResponse, McpCapabilities, NewSessionRequest,
8    NewSessionResponse, PromptCapabilities, PromptResponse, ProtocolVersion, SessionId, SessionNotification,
9    SessionUpdate, SetSessionConfigOptionRequest, SetSessionConfigOptionResponse,
10};
11use agent_client_protocol::{Client, ConnectionTo};
12use llm::catalog::{LlmModel, get_local_models};
13use llm::types::IsoString;
14use llm::{ContentBlock, ProviderConnectionOverrides, ReasoningEffort};
15use std::collections::HashSet;
16use std::path::Path;
17use std::sync::Arc;
18use tokio::sync::oneshot;
19use tracing::{error, info, warn};
20
21use super::config_setting::ConfigSetting;
22use super::mappers::{map_acp_mcp_servers, replay_to_client};
23use super::model_config::{
24    ValidatedMode, build_config_options_from_modes, model_exists, pick_default_model, supports_prompt_audio,
25    validated_modes_from_specs,
26};
27use super::relay::{SessionCommand, spawn_relay};
28use super::session::Session;
29use super::session_registry::{ConfigSnapshot, SessionRegistry};
30use super::session_store::{SessionMeta, SessionStore};
31use crate::settings_args::SettingsSourceArgs;
32use acp_utils::content::format_embedded_resource;
33use aether_core::agent_spec::AgentSpec;
34use aether_core::context::ext::ContextExt;
35use aether_project::{AetherSettings, AgentCatalog};
36use llm::Context;
37
38/// Initial session selection supplied when `aether acp` starts.
39#[derive(Clone, Debug, Default)]
40pub enum InitialSessionSelection {
41    #[default]
42    Default,
43    Agent(String),
44    Model {
45        model: String,
46        reasoning_effort: Option<ReasoningEffort>,
47    },
48}
49
50impl InitialSessionSelection {
51    pub fn agent(name: String) -> Self {
52        Self::Agent(name)
53    }
54
55    pub fn model(model: String, reasoning_effort: Option<ReasoningEffort>) -> Self {
56        Self::Model { model, reasoning_effort }
57    }
58}
59
60/// Manages ACP sessions, each session has its own agent and state
61pub struct SessionManager {
62    registry: Arc<SessionRegistry>,
63    session_store: Arc<SessionStore>,
64    oauth_credential_store: Arc<dyn OAuthCredentialStorage>,
65    initial_selection: InitialSessionSelection,
66    settings_source: SettingsSourceArgs,
67    provider_connections: ProviderConnectionOverrides,
68}
69
70pub(crate) struct SessionManagerConfig {
71    pub(crate) registry: Arc<SessionRegistry>,
72    pub(crate) session_store: Arc<SessionStore>,
73    pub(crate) oauth_credential_store: Arc<dyn OAuthCredentialStorage>,
74    pub(crate) initial_selection: InitialSessionSelection,
75    pub(crate) settings_source: SettingsSourceArgs,
76    pub(crate) provider_connections: ProviderConnectionOverrides,
77}
78
79struct SessionModeCatalog {
80    catalog: AgentCatalog,
81    modes: Vec<ValidatedMode>,
82    available: Vec<LlmModel>,
83}
84
85struct ResolvedInitialSession {
86    spec: AgentSpec,
87    selected_mode: Option<String>,
88}
89
90#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
91struct PromptModalities {
92    image: bool,
93    audio: bool,
94}
95
96impl PromptModalities {
97    fn from_content(content: &[ContentBlock]) -> Self {
98        Self {
99            image: content.iter().any(ContentBlock::is_image),
100            audio: content.iter().any(|block| matches!(block, ContentBlock::Audio { .. })),
101        }
102    }
103
104    fn is_empty(self) -> bool {
105        !self.image && !self.audio
106    }
107}
108
109impl SessionManager {
110    pub(crate) fn new(deps: SessionManagerConfig) -> Self {
111        Self {
112            registry: deps.registry,
113            session_store: deps.session_store,
114            oauth_credential_store: deps.oauth_credential_store,
115            initial_selection: deps.initial_selection,
116            settings_source: deps.settings_source,
117            provider_connections: deps.provider_connections,
118        }
119    }
120
121    fn apply_provider_connection_overrides(&self, mut spec: AgentSpec) -> AgentSpec {
122        spec.provider_connections.merge(self.provider_connections.clone());
123        spec
124    }
125
126    fn resolve_initial_session(
127        &self,
128        mode_catalog: &SessionModeCatalog,
129        default_model: &LlmModel,
130    ) -> Result<ResolvedInitialSession, acp::Error> {
131        let mut resolved = match &self.initial_selection {
132            InitialSessionSelection::Default => resolve_default_initial_session(mode_catalog, default_model),
133            InitialSessionSelection::Agent(agent) => {
134                if !mode_catalog.modes.iter().any(|mode| mode.name == *agent) {
135                    warn!("Unknown agent `{agent}` requested via --agent");
136                    return Err(acp::Error::invalid_params());
137                }
138                resolve_agent_spec(&mode_catalog.catalog, agent)
139                    .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(agent.clone()) })
140            }
141            InitialSessionSelection::Model { model, reasoning_effort } => {
142                let model = parse_available_model(model, &mode_catalog.available)?;
143                Ok(ResolvedInitialSession {
144                    spec: AgentSpec::default_spec(&model, *reasoning_effort, Vec::new()),
145                    selected_mode: None,
146                })
147            }
148        }?;
149        resolved.spec = self.apply_provider_connection_overrides(resolved.spec);
150        Ok(resolved)
151    }
152
153    async fn load_mode_catalog(&self, cwd: &Path) -> Result<SessionModeCatalog, acp::Error> {
154        let config = if let Some(source) = self.settings_source.source(cwd) {
155            AetherSettings::load(cwd, [source])
156        } else {
157            AetherSettings::load_default(cwd)
158        }
159        .map_err(|e| {
160            error!("Failed to load agent catalog: {e}");
161            acp::Error::invalid_params()
162        })?;
163        let catalog = if config.agents.is_empty() {
164            AgentCatalog::empty(cwd.to_path_buf())
165        } else {
166            AgentCatalog::from_settings(cwd, config).map_err(|e| {
167                error!("Failed to load agent catalog: {e}");
168                acp::Error::invalid_params()
169            })?
170        };
171
172        let available = get_local_models().await;
173        let specs: Vec<_> = catalog.user_invocable().cloned().collect();
174        let modes = validated_modes_from_specs(&specs, &available);
175
176        Ok(SessionModeCatalog { catalog, modes, available })
177    }
178
179    #[allow(clippy::too_many_arguments, clippy::similar_names)]
180    async fn register_session(
181        &self,
182        session: Session,
183        session_id: &str,
184        acp_session_id: &SessionId,
185        model: &str,
186        selected_mode: Option<String>,
187        reasoning_effort: Option<ReasoningEffort>,
188        modes: Vec<ValidatedMode>,
189        cx: &ConnectionTo<Client>,
190    ) -> Vec<acp::SessionConfigOption> {
191        let relay = spawn_relay(
192            session,
193            cx.clone(),
194            acp_session_id.clone(),
195            self.session_store.clone(),
196            Arc::clone(&self.oauth_credential_store),
197        );
198
199        self.registry
200            .insert(
201                session_id.to_string(),
202                relay,
203                model.to_string(),
204                selected_mode.clone(),
205                reasoning_effort,
206                modes.clone(),
207            )
208            .await;
209
210        let available = get_local_models().await;
211        let all_models = get_all_models(&available);
212        build_config_options_from_modes(
213            &modes,
214            &available,
215            selected_mode.as_deref(),
216            model,
217            reasoning_effort,
218            &all_models,
219            self.oauth_credential_store.as_ref(),
220        )
221    }
222
223    fn send_available_commands_notification(
224        available_commands: Vec<acp::AvailableCommand>,
225        acp_session_id: SessionId,
226        session_id: &str,
227        cx: &ConnectionTo<Client>,
228    ) {
229        if available_commands.is_empty() {
230            return;
231        }
232        let command_count = available_commands.len();
233        let notification = SessionNotification::new(
234            acp_session_id,
235            SessionUpdate::AvailableCommandsUpdate(AvailableCommandsUpdate::new(available_commands)),
236        );
237        if let Err(e) = cx.send_notification(notification).map_err(|e| AcpServerError::protocol("session/update", e)) {
238            error!("Failed to send available commands notification: {:?}", e);
239        } else {
240            info!("Sent available commands update for session {} ({} commands)", session_id, command_count);
241        }
242    }
243
244    /// Drain every session and stop its relay task. Blocks until every relay
245    /// has exited.
246    pub async fn shutdown_all_sessions(&self) {
247        self.registry.shutdown_all().await;
248    }
249}
250
251fn options_from_snapshot(
252    snapshot: &ConfigSnapshot,
253    available: &[LlmModel],
254    all_models: &[LlmModel],
255    credential_store: &dyn OAuthCredentialStorage,
256) -> Vec<acp::SessionConfigOption> {
257    build_config_options_from_modes(
258        &snapshot.modes,
259        available,
260        snapshot.selected_mode.as_deref(),
261        &snapshot.effective_model,
262        snapshot.reasoning_effort,
263        all_models,
264        credential_store,
265    )
266}
267
268/// Merge catalog `all()` with locally-discovered models for the `all_models`
269/// parameter of `build_model_config_option`.
270fn get_all_models(discovered: &[LlmModel]) -> Vec<LlmModel> {
271    let mut all = LlmModel::all().to_vec();
272    for m in discovered {
273        if !all.contains(m) {
274            all.push(m.clone());
275        }
276    }
277    all
278}
279
280fn build_auth_methods(store: &dyn OAuthCredentialStorage) -> Vec<AuthMethod> {
281    let mut seen = HashSet::new();
282    LlmModel::all()
283        .iter()
284        .filter_map(LlmModel::oauth_provider_id)
285        .filter(|id| seen.insert(*id))
286        .map(|id| {
287            let display = LlmModel::all()
288                .iter()
289                .find(|m| m.oauth_provider_id() == Some(id))
290                .map_or(id, |m| m.provider_display_name());
291            let mut method = acp::AuthMethodAgent::new(id, display);
292            if store.has_credential(id) {
293                method = method.description("authenticated");
294            }
295            AuthMethod::Agent(method)
296        })
297        .collect()
298}
299
300fn map_acp_to_content_blocks(blocks: Vec<acp::ContentBlock>) -> Vec<ContentBlock> {
301    blocks
302        .into_iter()
303        .map(|block| match block {
304            acp::ContentBlock::Text(t) => ContentBlock::text(t.text),
305            acp::ContentBlock::Image(img) => ContentBlock::Image { data: img.data, mime_type: img.mime_type },
306            acp::ContentBlock::Audio(aud) => ContentBlock::Audio { data: aud.data, mime_type: aud.mime_type },
307            acp::ContentBlock::Resource(r) => ContentBlock::text(format_embedded_resource(&r)),
308            acp::ContentBlock::ResourceLink(l) => ContentBlock::text(format!("[Resource: {}]", l.uri)),
309            _ => ContentBlock::text("[Unknown content]"),
310        })
311        .collect()
312}
313
314fn resolve_agent_spec(catalog: &AgentCatalog, mode_name: &str) -> Result<AgentSpec, acp::Error> {
315    catalog.resolve(mode_name).map_err(|e| {
316        error!("Failed to resolve runtime inputs for mode '{}': {e}", mode_name);
317        acp::Error::invalid_params()
318    })
319}
320
321fn resolve_default_initial_session(
322    mode_catalog: &SessionModeCatalog,
323    default_model: &LlmModel,
324) -> Result<ResolvedInitialSession, acp::Error> {
325    if let Some(mode) = mode_catalog.modes.first() {
326        return resolve_agent_spec(&mode_catalog.catalog, &mode.name)
327            .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(mode.name.clone()) });
328    }
329
330    Ok(ResolvedInitialSession { spec: AgentSpec::default_spec(default_model, None, Vec::new()), selected_mode: None })
331}
332
333fn parse_available_model(model: &str, available: &[LlmModel]) -> Result<LlmModel, acp::Error> {
334    let parsed = model.parse().map_err(|e: String| {
335        warn!("Failed to parse --model `{model}`: {e}");
336        acp::Error::invalid_params()
337    })?;
338
339    if model_exists(available, model) {
340        Ok(parsed)
341    } else {
342        warn!("Requested model `{model}` is not available");
343        Err(acp::Error::invalid_params())
344    }
345}
346
347fn prompt_capabilities_for_models(models: &[LlmModel]) -> PromptCapabilities {
348    PromptCapabilities::new()
349        .embedded_context(true)
350        .image(models.iter().any(LlmModel::supports_image))
351        .audio(models.iter().any(supports_prompt_audio))
352}
353
354fn selected_models(model_value: &str) -> Result<Vec<LlmModel>, acp::Error> {
355    model_value
356        .split(',')
357        .map(str::trim)
358        .filter(|part| !part.is_empty())
359        .map(|part| part.parse::<LlmModel>().map_err(|_| acp::Error::invalid_params()))
360        .collect()
361}
362
363fn validate_prompt_support(model_value: &str, content: &[ContentBlock]) -> Result<(), acp::Error> {
364    let modalities = PromptModalities::from_content(content);
365    if modalities.is_empty() {
366        return Ok(());
367    }
368
369    let selected = selected_models(model_value)?;
370    if modalities.image && selected.iter().any(|model| !model.supports_image()) {
371        return Err(acp::Error::invalid_params());
372    }
373    if modalities.audio && selected.iter().any(|model| !supports_prompt_audio(model)) {
374        return Err(acp::Error::invalid_params());
375    }
376
377    Ok(())
378}
379
380#[cfg(test)]
381#[allow(clippy::items_after_test_module)]
382mod tests {
383    use super::*;
384    use agent_client_protocol::schema::{InitializeRequest, ProtocolVersion};
385
386    const SONNET: &str = "anthropic:claude-sonnet-4-5";
387    const DEEPSEEK: &str = "deepseek:deepseek-chat";
388    const BEDROCK_ARN_AS_MODEL_REJECTED: &str =
389        "bedrock:arn:aws:bedrock:us-west-2:000000000000:application-inference-profile/000000000000";
390
391    fn mock_oauth_store() -> Arc<dyn OAuthCredentialStorage> {
392        Arc::new(aether_auth::FakeOAuthCredentialStore::new())
393    }
394
395    #[tokio::test]
396    async fn initialize_always_advertises_load_session_support() {
397        let session_store =
398            SessionStore::new().map_or_else(|e| panic!("Failed to initialize session store: {e}"), Arc::new);
399        let manager = SessionManager::new(SessionManagerConfig {
400            registry: Arc::new(SessionRegistry::new()),
401            session_store,
402            oauth_credential_store: mock_oauth_store(),
403            initial_selection: InitialSessionSelection::default(),
404            settings_source: SettingsSourceArgs::default(),
405            provider_connections: ProviderConnectionOverrides::default(),
406        });
407        let response =
408            manager.initialize(InitializeRequest::new(ProtocolVersion::LATEST)).await.expect("initialize succeeds");
409        let json = serde_json::to_string(&response).expect("response serializes");
410        assert!(json.contains("\"loadSession\":true"));
411    }
412
413    #[test]
414    fn prompt_capabilities_reflect_available_modalities() {
415        let image_only = prompt_capabilities_for_models(&["anthropic:claude-sonnet-4-5".parse().unwrap()]);
416        assert!(image_only.image);
417        assert!(!image_only.audio);
418
419        let audio_capable =
420            prompt_capabilities_for_models(&["gemini:gemini-live-2.5-flash-preview-native-audio".parse().unwrap()]);
421        assert!(!audio_capable.image);
422        assert!(audio_capable.audio);
423
424        let text_only = prompt_capabilities_for_models(&[DEEPSEEK.parse().unwrap()]);
425        assert!(!text_only.image);
426        assert!(!text_only.audio);
427    }
428
429    #[test]
430    fn validate_prompt_support_requires_all_selected_models_to_support_media() {
431        let image_content = vec![ContentBlock::Image { data: "aW1n".to_string(), mime_type: "image/png".to_string() }];
432        let audio_content =
433            vec![ContentBlock::Audio { data: "YXVkaW8=".to_string(), mime_type: "audio/wav".to_string() }];
434
435        assert!(validate_prompt_support(SONNET, &image_content).is_ok());
436        assert!(validate_prompt_support(DEEPSEEK, &image_content).is_err());
437        assert!(validate_prompt_support("gemini:gemini-live-2.5-flash-preview-native-audio", &audio_content,).is_ok());
438        assert!(validate_prompt_support(SONNET, &audio_content).is_err());
439        assert!(
440            validate_prompt_support("anthropic:claude-sonnet-4-5,deepseek:deepseek-chat", &image_content,).is_err()
441        );
442        assert!(
443            validate_prompt_support(
444                "gemini:gemini-live-2.5-flash-preview-native-audio,deepseek:deepseek-chat",
445                &audio_content,
446            )
447            .is_err()
448        );
449    }
450
451    #[test]
452    fn parse_available_model_rejects_bedrock_inference_profile_arn() {
453        let available: Vec<LlmModel> = Vec::new();
454        let error = parse_available_model(BEDROCK_ARN_AS_MODEL_REJECTED, &available).unwrap_err();
455        assert_eq!(error, acp::Error::invalid_params());
456    }
457
458    #[test]
459    fn parse_available_model_rejects_unknown_catalog_model() {
460        let available: Vec<LlmModel> = vec![DEEPSEEK.parse().unwrap()];
461        assert!(parse_available_model(SONNET, &available).is_err());
462    }
463
464    #[test]
465    fn parse_available_model_accepts_catalog_model_when_present() {
466        let sonnet: LlmModel = SONNET.parse().unwrap();
467        let available = vec![sonnet.clone()];
468        assert_eq!(parse_available_model(SONNET, &available).unwrap(), sonnet);
469    }
470}
471
472impl SessionManager {
473    pub async fn initialize(&self, args: InitializeRequest) -> Result<InitializeResponse, acp::Error> {
474        info!("Received initialize request: {:?}", args);
475        let auth_methods = build_auth_methods(self.oauth_credential_store.as_ref());
476        let available = get_local_models().await;
477        Ok(InitializeResponse::new(ProtocolVersion::V1)
478            .agent_info(Implementation::new("Aether", "0.1.0"))
479            .agent_capabilities(
480                AgentCapabilities::new()
481                    .load_session(true)
482                    .mcp_capabilities(McpCapabilities::new().http(true).sse(true))
483                    .session_capabilities(acp::SessionCapabilities::new().list(acp::SessionListCapabilities::new()))
484                    .prompt_capabilities(prompt_capabilities_for_models(&available)),
485            )
486            .auth_methods(auth_methods))
487    }
488
489    pub async fn authenticate(
490        &self,
491        args: AuthenticateRequest,
492        cx: &ConnectionTo<Client>,
493    ) -> Result<AuthenticateResponse, acp::Error> {
494        info!("Received authenticate request: {:?}", args);
495        let method_id = args.method_id.0.as_ref();
496        match method_id {
497            "codex" => {
498                llm::perform_codex_oauth_flow(self.oauth_credential_store.as_ref()).await.map_err(|e| {
499                    error!("OAuth flow failed for {method_id}: {e}");
500                    acp::Error::internal_error()
501                })?;
502            }
503            _ => return Err(acp::Error::invalid_params()),
504        }
505        let auth_methods = build_auth_methods(self.oauth_credential_store.as_ref());
506        if let Err(e) = cx
507            .send_notification(AuthMethodsUpdatedParams { auth_methods })
508            .map_err(|e| AcpServerError::protocol("_aether/auth_methods_updated", e))
509        {
510            error!("Failed to send auth methods updated notification: {:?}", e);
511        }
512
513        let available = get_local_models().await;
514        let all_models = get_all_models(&available);
515        let snapshots = self.registry.snapshot_all_configs().await;
516
517        for (id, snap) in snapshots {
518            let options = options_from_snapshot(&snap, &available, &all_models, self.oauth_credential_store.as_ref());
519            let notification = SessionNotification::new(
520                SessionId::new(id),
521                SessionUpdate::ConfigOptionUpdate(ConfigOptionUpdate::new(options)),
522            );
523            let _ = cx.send_notification(notification);
524        }
525
526        Ok(AuthenticateResponse::default())
527    }
528
529    pub async fn new_session(
530        &self,
531        mut args: NewSessionRequest,
532        cx: &ConnectionTo<Client>,
533    ) -> Result<NewSessionResponse, acp::Error> {
534        // Inside a sandbox container the client sends the *host* cwd, but the
535        // project is mounted at the container's working directory.
536        if std::env::var("AETHER_INSIDE_SANDBOX").is_ok() {
537            let container_cwd = std::env::current_dir().unwrap_or_else(|_| "/workspace".into());
538            info!("Sandbox: remapping cwd {:?} -> {:?}", args.cwd, container_cwd);
539            args.cwd = container_cwd;
540        }
541
542        info!("Creating new session with cwd: {:?}", args.cwd);
543        let session_id = uuid::Uuid::new_v4().to_string();
544        let acp_session_id = acp::SessionId::new(session_id.clone());
545
546        let mode_catalog = self.load_mode_catalog(&args.cwd).await?;
547        let default_model = pick_default_model(&mode_catalog.available).ok_or_else(|| {
548            error!("No models available — set an API key env var (e.g. ANTHROPIC_API_KEY)");
549            acp::Error::internal_error()
550        })?;
551
552        let ResolvedInitialSession { spec, selected_mode } =
553            self.resolve_initial_session(&mode_catalog, default_model)?;
554        let model_str = spec.model.clone();
555        let reasoning_effort = spec.reasoning_effort;
556
557        let session = Session::new(
558            spec,
559            args.cwd.clone(),
560            map_acp_mcp_servers(args.mcp_servers),
561            None,
562            Some(session_id.clone()),
563            Arc::clone(&self.oauth_credential_store),
564        )
565        .await
566        .map_err(|e| {
567            error!("Failed to create session: {}", e);
568            acp::Error::internal_error()
569        })?;
570
571        let available_commands = session.list_available_commands().await.map_err(|e| {
572            error!("Failed to list available commands: {}", e);
573            acp::Error::internal_error()
574        })?;
575
576        let meta = SessionMeta {
577            session_id: session_id.clone(),
578            cwd: args.cwd.clone(),
579            model: model_str.clone(),
580            selected_mode: selected_mode.clone(),
581            created_at: IsoString::now().0,
582        };
583        if let Err(e) = self.session_store.append_meta(&session_id, &meta) {
584            error!("Failed to write session meta: {e}");
585        }
586
587        let config_options = self
588            .register_session(
589                session,
590                &session_id,
591                &acp_session_id,
592                &model_str,
593                selected_mode,
594                reasoning_effort,
595                mode_catalog.modes,
596                cx,
597            )
598            .await;
599
600        info!("Session {} created successfully", session_id);
601
602        let response = NewSessionResponse::new(acp_session_id.clone()).config_options(config_options);
603
604        Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
605
606        Ok(response)
607    }
608
609    pub fn list_sessions(&self, args: &ListSessionsRequest) -> Result<ListSessionsResponse, acp::Error> {
610        info!("Listing sessions, cwd filter: {:?}", args.cwd);
611        let mut summaries = self.session_store.list();
612
613        if let Some(cwd) = args.cwd.as_ref() {
614            summaries.retain(|s| s.meta.cwd == *cwd);
615        }
616
617        let sessions: Vec<acp::SessionInfo> = summaries
618            .into_iter()
619            .map(|s| acp::SessionInfo::new(s.meta.session_id, s.meta.cwd).updated_at(s.meta.created_at).title(s.title))
620            .collect();
621
622        info!("Found {} sessions", sessions.len());
623        Ok(ListSessionsResponse::new(sessions))
624    }
625
626    pub async fn load_session(
627        &self,
628        args: LoadSessionRequest,
629        cx: &ConnectionTo<Client>,
630    ) -> Result<LoadSessionResponse, acp::Error> {
631        let session_id = args.session_id.0.to_string();
632        info!("Loading session: {session_id}");
633
634        let (meta, events) = self.session_store.load(&session_id).ok_or_else(|| {
635            error!("Session not found: {session_id}");
636            acp::Error::invalid_params()
637        })?;
638
639        let context = Context::from_events(&events);
640        let mode_catalog = self.load_mode_catalog(&args.cwd).await?;
641
642        let spec = self.apply_provider_connection_overrides(if let Some(mode_name) = meta.selected_mode.as_deref() {
643            resolve_agent_spec(&mode_catalog.catalog, mode_name)?
644        } else {
645            let parsed_model: LlmModel = meta.model.parse().map_err(|e: String| {
646                error!("Failed to parse restored model '{}': {e}", meta.model);
647                acp::Error::invalid_params()
648            })?;
649            AgentSpec::default_spec(&parsed_model, None, Vec::new())
650        });
651
652        let model = spec.model.clone();
653
654        let restored_messages: Vec<_> = context.messages().iter().filter(|m| !m.is_system()).cloned().collect();
655
656        let session = Session::new(
657            spec,
658            args.cwd.clone(),
659            map_acp_mcp_servers(args.mcp_servers),
660            Some(restored_messages),
661            Some(session_id.clone()),
662            Arc::clone(&self.oauth_credential_store),
663        )
664        .await
665        .map_err(|e| {
666            error!("Failed to create session for load: {e}");
667            acp::Error::internal_error()
668        })?;
669
670        let available_commands = session.list_available_commands().await.map_err(|e| {
671            error!("Failed to list available commands: {e}");
672            acp::Error::internal_error()
673        })?;
674
675        let acp_session_id = acp::SessionId::new(session_id.clone());
676
677        let config_options = self
678            .register_session(
679                session,
680                &session_id,
681                &acp_session_id,
682                &model,
683                meta.selected_mode,
684                None,
685                mode_catalog.modes,
686                cx,
687            )
688            .await;
689
690        info!("Session {session_id} loaded successfully");
691
692        let response = LoadSessionResponse::new().config_options(config_options);
693        replay_to_client(&events, cx, &acp_session_id).await;
694        Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
695
696        Ok(response)
697    }
698
699    pub async fn prompt(&self, args: acp::PromptRequest) -> Result<acp::PromptResponse, acp::Error> {
700        info!("Received prompt for session: {:?}", args.session_id);
701        let session_id_str = args.session_id.0.to_string();
702        let content = map_acp_to_content_blocks(args.prompt);
703
704        let model = self.registry.effective_model(&session_id_str).await.ok_or_else(|| {
705            error!("Session not found: {}", session_id_str);
706            acp::Error::invalid_params()
707        })?;
708        validate_prompt_support(&model, &content)?;
709
710        let dispatch = self.registry.begin_prompt(&session_id_str).await.ok_or_else(|| {
711            error!("Session not found: {}", session_id_str);
712            acp::Error::invalid_params()
713        })?;
714
715        let (result_tx, result_rx) = oneshot::channel();
716        dispatch
717            .relay_tx
718            .send(SessionCommand::Prompt {
719                content,
720                switch_model: dispatch.switch_model,
721                reasoning_effort: dispatch.reasoning_effort,
722                result_tx,
723            })
724            .await
725            .map_err(|_| {
726                error!("Relay channel closed for session {}", session_id_str);
727                acp::Error::internal_error()
728            })?;
729
730        let stop_reason = result_rx
731            .await
732            .map_err(|_| {
733                error!("Relay dropped result channel for session {}", session_id_str);
734                acp::Error::internal_error()
735            })?
736            .map_err(|e| {
737                error!("Relay error for session {}: {}", session_id_str, e);
738                acp::Error::internal_error()
739            })?;
740
741        info!("Prompt completed with stop reason: {:?}", stop_reason);
742        Ok(PromptResponse::new(stop_reason))
743    }
744
745    pub async fn cancel(&self, args: acp::CancelNotification) -> Result<(), acp::Error> {
746        info!("Received cancel for session: {:?}", args.session_id);
747        let session_id_str = args.session_id.0.to_string();
748        let relay = self.registry.relay(&session_id_str).await.ok_or_else(|| {
749            error!("Session not found for cancel: {}", session_id_str);
750            acp::Error::invalid_params()
751        })?;
752
753        relay.cmd.send(SessionCommand::Cancel).await.map_err(|_| {
754            error!("Relay channel closed for cancel: {}", session_id_str);
755            acp::Error::internal_error()
756        })?;
757
758        Ok(())
759    }
760
761    pub async fn set_session_config_option(
762        &self,
763        args: SetSessionConfigOptionRequest,
764    ) -> Result<SetSessionConfigOptionResponse, acp::Error> {
765        let session_id_str = args.session_id.0.to_string();
766        let config_id = args.config_id.0.to_string();
767        let value = args.value.0.to_string();
768
769        info!("set_session_config_option: session={}, config={}, value={}", session_id_str, config_id, value);
770
771        let setting = ConfigSetting::parse(&config_id, &value).map_err(|e| {
772            error!("{e}");
773            acp::Error::invalid_params()
774        })?;
775
776        let available = get_local_models().await;
777        let all_models = get_all_models(&available);
778
779        let snapshot =
780            self.registry.apply_config_change(&session_id_str, &setting, &available).await.ok_or_else(|| {
781                error!("Session not found: {}", session_id_str);
782                acp::Error::invalid_params()
783            })??;
784
785        let options = options_from_snapshot(&snapshot, &available, &all_models, self.oauth_credential_store.as_ref());
786        Ok(SetSessionConfigOptionResponse::new(options))
787    }
788
789    pub async fn on_mcp_request(&self, request: McpRequest) -> Result<(), acp::Error> {
790        info!("Received MCP ext request: {:?}", request);
791        match request {
792            McpRequest::Authenticate { session_id, server_name } => {
793                let relay = self.registry.relay(&session_id).await.ok_or_else(|| {
794                    error!("Session not found for authenticate_mcp_server: {}", session_id);
795                    acp::Error::invalid_params()
796                })?;
797
798                relay.mcp_request.send(McpRequest::Authenticate { session_id, server_name }).await.map_err(|_| {
799                    error!("MCP request channel closed for session");
800                    acp::Error::internal_error()
801                })?;
802            }
803        }
804
805        Ok(())
806    }
807}