Skip to main content

aether_cli/acp/
session_manager.rs

1use acp_utils::notifications::{AuthMethodsUpdatedParams, McpRequest};
2use acp_utils::server::AcpServerError;
3use aether_auth::OAuthCredentialStorage;
4use agent_client_protocol::schema::{
5    self as acp, AgentCapabilities, AuthMethod, AuthenticateRequest, AuthenticateResponse, AvailableCommandsUpdate,
6    ConfigOptionUpdate, Implementation, InitializeRequest, InitializeResponse, ListSessionsRequest,
7    ListSessionsResponse, LoadSessionRequest, LoadSessionResponse, McpCapabilities, NewSessionRequest,
8    NewSessionResponse, PromptCapabilities, PromptResponse, ProtocolVersion, SessionId, SessionNotification,
9    SessionUpdate, SetSessionConfigOptionRequest, SetSessionConfigOptionResponse,
10};
11use agent_client_protocol::{Client, ConnectionTo};
12use llm::catalog::{LlmModel, get_local_models};
13use llm::types::IsoString;
14use llm::{ContentBlock, ProviderConnectionOverrides, ReasoningEffort};
15use std::collections::HashSet;
16use std::path::Path;
17use std::sync::Arc;
18use tokio::sync::oneshot;
19use tracing::{error, info, warn};
20
21use super::config_setting::ConfigSetting;
22use super::mappers::{map_acp_mcp_servers, replay_to_client};
23use super::model_config::{
24    ValidatedMode, build_config_options_from_modes, model_exists, pick_default_model, supports_prompt_audio,
25    validated_modes_from_specs,
26};
27use super::relay::{SessionCommand, spawn_relay};
28use super::session::Session;
29use super::session_registry::{ConfigSnapshot, SessionRegistry};
30use super::session_store::{SessionMeta, SessionStore};
31use crate::settings_args::SettingsSourceArgs;
32use acp_utils::content::format_embedded_resource;
33use aether_core::agent_spec::AgentSpec;
34use aether_core::context::ext::ContextExt;
35use aether_project::{AetherSettings, AgentCatalog};
36use llm::Context;
37
38/// Initial session selection supplied when `aether acp` starts.
39#[derive(Clone, Debug, Default)]
40pub enum InitialSessionSelection {
41    #[default]
42    Default,
43    Agent(String),
44    Model {
45        model: String,
46        reasoning_effort: Option<ReasoningEffort>,
47    },
48}
49
50impl InitialSessionSelection {
51    pub fn agent(name: String) -> Self {
52        Self::Agent(name)
53    }
54
55    pub fn model(model: String, reasoning_effort: Option<ReasoningEffort>) -> Self {
56        Self::Model { model, reasoning_effort }
57    }
58}
59
60/// Manages ACP sessions, each session has its own agent and state
61pub struct SessionManager {
62    registry: Arc<SessionRegistry>,
63    session_store: Arc<SessionStore>,
64    oauth_credential_store: Arc<dyn OAuthCredentialStorage>,
65    initial_selection: InitialSessionSelection,
66    settings_source: SettingsSourceArgs,
67    provider_connections: ProviderConnectionOverrides,
68}
69
70pub(crate) struct SessionManagerConfig {
71    pub(crate) registry: Arc<SessionRegistry>,
72    pub(crate) session_store: Arc<SessionStore>,
73    pub(crate) oauth_credential_store: Arc<dyn OAuthCredentialStorage>,
74    pub(crate) initial_selection: InitialSessionSelection,
75    pub(crate) settings_source: SettingsSourceArgs,
76    pub(crate) provider_connections: ProviderConnectionOverrides,
77}
78
79struct SessionModeCatalog {
80    catalog: AgentCatalog,
81    modes: Vec<ValidatedMode>,
82    available: Vec<LlmModel>,
83}
84
85struct ResolvedInitialSession {
86    spec: AgentSpec,
87    selected_mode: Option<String>,
88}
89
90#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
91struct PromptModalities {
92    image: bool,
93    audio: bool,
94}
95
96impl PromptModalities {
97    fn from_content(content: &[ContentBlock]) -> Self {
98        Self {
99            image: content.iter().any(ContentBlock::is_image),
100            audio: content.iter().any(|block| matches!(block, ContentBlock::Audio { .. })),
101        }
102    }
103
104    fn is_empty(self) -> bool {
105        !self.image && !self.audio
106    }
107}
108
109impl SessionManager {
110    pub(crate) fn new(deps: SessionManagerConfig) -> Self {
111        Self {
112            registry: deps.registry,
113            session_store: deps.session_store,
114            oauth_credential_store: deps.oauth_credential_store,
115            initial_selection: deps.initial_selection,
116            settings_source: deps.settings_source,
117            provider_connections: deps.provider_connections,
118        }
119    }
120
121    fn apply_provider_connection_overrides(&self, mut spec: AgentSpec) -> AgentSpec {
122        spec.provider_connections.merge(self.provider_connections.clone());
123        spec
124    }
125
126    fn resolve_initial_session(
127        &self,
128        mode_catalog: &SessionModeCatalog,
129        default_model: &LlmModel,
130    ) -> Result<ResolvedInitialSession, acp::Error> {
131        let mut resolved = match &self.initial_selection {
132            InitialSessionSelection::Default => resolve_default_initial_session(mode_catalog, default_model),
133            InitialSessionSelection::Agent(agent) => {
134                if !mode_catalog.modes.iter().any(|mode| mode.name == *agent) {
135                    warn!("Unknown agent `{agent}` requested via --agent");
136                    return Err(acp::Error::invalid_params());
137                }
138                resolve_agent_spec(&mode_catalog.catalog, agent)
139                    .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(agent.clone()) })
140            }
141            InitialSessionSelection::Model { model, reasoning_effort } => {
142                let model = parse_available_model(model, &mode_catalog.available)?;
143                Ok(ResolvedInitialSession {
144                    spec: AgentSpec::default_spec(&model, *reasoning_effort, Vec::new()),
145                    selected_mode: None,
146                })
147            }
148        }?;
149        resolved.spec = self.apply_provider_connection_overrides(resolved.spec);
150        Ok(resolved)
151    }
152
153    async fn load_mode_catalog(&self, cwd: &Path) -> Result<SessionModeCatalog, acp::Error> {
154        let config = if let Some(source) = self.settings_source.source(cwd) {
155            AetherSettings::load(cwd, [source])
156        } else {
157            AetherSettings::load_default(cwd)
158        }
159        .map_err(|e| {
160            error!("Failed to load agent catalog: {e}");
161            acp::Error::invalid_params()
162        })?;
163        let catalog = if config.agents.is_empty() {
164            AgentCatalog::empty(cwd.to_path_buf())
165        } else {
166            AgentCatalog::from_settings(cwd, config).map_err(|e| {
167                error!("Failed to load agent catalog: {e}");
168                acp::Error::invalid_params()
169            })?
170        };
171
172        let available = get_local_models().await;
173        let specs: Vec<_> = catalog.user_invocable().cloned().collect();
174        let modes = validated_modes_from_specs(&specs, &available);
175
176        Ok(SessionModeCatalog { catalog, modes, available })
177    }
178
179    #[allow(clippy::too_many_arguments, clippy::similar_names)]
180    async fn register_session(
181        &self,
182        session: Session,
183        session_id: &str,
184        acp_session_id: &SessionId,
185        model: &str,
186        selected_mode: Option<String>,
187        reasoning_effort: Option<ReasoningEffort>,
188        modes: Vec<ValidatedMode>,
189        cx: &ConnectionTo<Client>,
190    ) -> Vec<acp::SessionConfigOption> {
191        let relay = spawn_relay(
192            session,
193            cx.clone(),
194            acp_session_id.clone(),
195            self.session_store.clone(),
196            Arc::clone(&self.oauth_credential_store),
197        );
198
199        self.registry
200            .insert(
201                session_id.to_string(),
202                relay,
203                model.to_string(),
204                selected_mode.clone(),
205                reasoning_effort,
206                modes.clone(),
207            )
208            .await;
209
210        let available = get_local_models().await;
211        let all_models = get_all_models(&available);
212        build_config_options_from_modes(
213            &modes,
214            &available,
215            selected_mode.as_deref(),
216            model,
217            reasoning_effort,
218            &all_models,
219            self.oauth_credential_store.as_ref(),
220        )
221    }
222
223    fn send_available_commands_notification(
224        available_commands: Vec<acp::AvailableCommand>,
225        acp_session_id: SessionId,
226        session_id: &str,
227        cx: &ConnectionTo<Client>,
228    ) {
229        if available_commands.is_empty() {
230            return;
231        }
232        let command_count = available_commands.len();
233        let notification = SessionNotification::new(
234            acp_session_id,
235            SessionUpdate::AvailableCommandsUpdate(AvailableCommandsUpdate::new(available_commands)),
236        );
237        if let Err(e) = cx.send_notification(notification).map_err(|e| AcpServerError::protocol("session/update", e)) {
238            error!("Failed to send available commands notification: {:?}", e);
239        } else {
240            info!("Sent available commands update for session {} ({} commands)", session_id, command_count);
241        }
242    }
243
244    /// Drain every session and stop its relay task. Blocks until every relay
245    /// has exited.
246    pub async fn shutdown_all_sessions(&self) {
247        self.registry.shutdown_all().await;
248    }
249}
250
251fn options_from_snapshot(
252    snapshot: &ConfigSnapshot,
253    available: &[LlmModel],
254    all_models: &[LlmModel],
255    credential_store: &dyn OAuthCredentialStorage,
256) -> Vec<acp::SessionConfigOption> {
257    build_config_options_from_modes(
258        &snapshot.modes,
259        available,
260        snapshot.selected_mode.as_deref(),
261        &snapshot.effective_model,
262        snapshot.reasoning_effort,
263        all_models,
264        credential_store,
265    )
266}
267
268/// Merge catalog `all()` with locally-discovered models for the `all_models`
269/// parameter of `build_model_config_option`.
270fn get_all_models(discovered: &[LlmModel]) -> Vec<LlmModel> {
271    let mut all = LlmModel::all().to_vec();
272    for m in discovered {
273        if !all.contains(m) {
274            all.push(m.clone());
275        }
276    }
277    all
278}
279
280fn build_auth_methods(store: &dyn OAuthCredentialStorage) -> Vec<AuthMethod> {
281    let mut seen = HashSet::new();
282    LlmModel::all()
283        .iter()
284        .filter_map(LlmModel::oauth_provider_id)
285        .filter(|id| seen.insert(*id))
286        .map(|id| {
287            let display = LlmModel::all()
288                .iter()
289                .find(|m| m.oauth_provider_id() == Some(id))
290                .map_or(id, |m| m.provider_display_name());
291            let mut method = acp::AuthMethodAgent::new(id, display);
292            if store.has_credential(id) {
293                method = method.description("authenticated");
294            }
295            AuthMethod::Agent(method)
296        })
297        .collect()
298}
299
300fn map_acp_to_content_blocks(blocks: Vec<acp::ContentBlock>) -> Vec<ContentBlock> {
301    blocks
302        .into_iter()
303        .map(|block| match block {
304            acp::ContentBlock::Text(t) => ContentBlock::text(t.text),
305            acp::ContentBlock::Image(img) => ContentBlock::Image { data: img.data, mime_type: img.mime_type },
306            acp::ContentBlock::Audio(aud) => ContentBlock::Audio { data: aud.data, mime_type: aud.mime_type },
307            acp::ContentBlock::Resource(r) => ContentBlock::text(format_embedded_resource(&r)),
308            acp::ContentBlock::ResourceLink(l) => ContentBlock::text(format!("[Resource: {}]", l.uri)),
309            _ => ContentBlock::text("[Unknown content]"),
310        })
311        .collect()
312}
313
314fn resolve_agent_spec(catalog: &AgentCatalog, mode_name: &str) -> Result<AgentSpec, acp::Error> {
315    catalog.resolve(mode_name).map_err(|e| {
316        error!("Failed to resolve runtime inputs for mode '{}': {e}", mode_name);
317        acp::Error::invalid_params()
318    })
319}
320
321fn resolve_default_initial_session(
322    mode_catalog: &SessionModeCatalog,
323    default_model: &LlmModel,
324) -> Result<ResolvedInitialSession, acp::Error> {
325    if let Some(mode) = mode_catalog.modes.first() {
326        return resolve_agent_spec(&mode_catalog.catalog, &mode.name)
327            .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(mode.name.clone()) });
328    }
329
330    Ok(ResolvedInitialSession { spec: AgentSpec::default_spec(default_model, None, Vec::new()), selected_mode: None })
331}
332
333fn parse_available_model(model: &str, available: &[LlmModel]) -> Result<LlmModel, acp::Error> {
334    let parsed = model.parse().map_err(|e: String| {
335        warn!("Failed to parse --model `{model}`: {e}");
336        acp::Error::invalid_params()
337    })?;
338
339    if model_exists(available, model) {
340        Ok(parsed)
341    } else {
342        warn!("Requested model `{model}` is not available");
343        Err(acp::Error::invalid_params())
344    }
345}
346
347fn prompt_capabilities_for_models(models: &[LlmModel]) -> PromptCapabilities {
348    PromptCapabilities::new()
349        .embedded_context(true)
350        .image(models.iter().any(LlmModel::supports_image))
351        .audio(models.iter().any(supports_prompt_audio))
352}
353
354fn selected_models(model_value: &str) -> Result<Vec<LlmModel>, acp::Error> {
355    model_value
356        .split(',')
357        .map(str::trim)
358        .filter(|part| !part.is_empty())
359        .map(|part| part.parse::<LlmModel>().map_err(|_| acp::Error::invalid_params()))
360        .collect()
361}
362
363fn validate_prompt_support(model_value: &str, content: &[ContentBlock]) -> Result<(), acp::Error> {
364    let modalities = PromptModalities::from_content(content);
365    if modalities.is_empty() {
366        return Ok(());
367    }
368
369    let selected = selected_models(model_value)?;
370    if modalities.image && selected.iter().any(|model| !model.supports_image()) {
371        return Err(acp::Error::invalid_params());
372    }
373    if modalities.audio && selected.iter().any(|model| !supports_prompt_audio(model)) {
374        return Err(acp::Error::invalid_params());
375    }
376
377    Ok(())
378}
379
380#[cfg(test)]
381#[allow(clippy::items_after_test_module)]
382mod tests {
383    use super::*;
384    use agent_client_protocol::schema::{InitializeRequest, ProtocolVersion};
385    use llm::catalog::BedrockModel;
386
387    const SONNET: &str = "anthropic:claude-sonnet-4-5";
388    const DEEPSEEK: &str = "deepseek:deepseek-chat";
389    const BEDROCK_PROFILE_ARN: &str =
390        "bedrock:arn:aws:bedrock:us-west-2:000000000000:application-inference-profile/000000000000";
391
392    fn mock_oauth_store() -> Arc<dyn OAuthCredentialStorage> {
393        Arc::new(aether_auth::FakeOAuthCredentialStore::new())
394    }
395
396    #[tokio::test]
397    async fn initialize_always_advertises_load_session_support() {
398        let session_store =
399            SessionStore::new().map_or_else(|e| panic!("Failed to initialize session store: {e}"), Arc::new);
400        let manager = SessionManager::new(SessionManagerConfig {
401            registry: Arc::new(SessionRegistry::new()),
402            session_store,
403            oauth_credential_store: mock_oauth_store(),
404            initial_selection: InitialSessionSelection::default(),
405            settings_source: SettingsSourceArgs::default(),
406            provider_connections: ProviderConnectionOverrides::default(),
407        });
408        let response =
409            manager.initialize(InitializeRequest::new(ProtocolVersion::LATEST)).await.expect("initialize succeeds");
410        let json = serde_json::to_string(&response).expect("response serializes");
411        assert!(json.contains("\"loadSession\":true"));
412    }
413
414    #[test]
415    fn prompt_capabilities_reflect_available_modalities() {
416        let image_only = prompt_capabilities_for_models(&["anthropic:claude-sonnet-4-5".parse().unwrap()]);
417        assert!(image_only.image);
418        assert!(!image_only.audio);
419
420        let audio_capable =
421            prompt_capabilities_for_models(&["gemini:gemini-live-2.5-flash-preview-native-audio".parse().unwrap()]);
422        assert!(!audio_capable.image);
423        assert!(audio_capable.audio);
424
425        let text_only = prompt_capabilities_for_models(&[DEEPSEEK.parse().unwrap()]);
426        assert!(!text_only.image);
427        assert!(!text_only.audio);
428    }
429
430    #[test]
431    fn validate_prompt_support_requires_all_selected_models_to_support_media() {
432        let image_content = vec![ContentBlock::Image { data: "aW1n".to_string(), mime_type: "image/png".to_string() }];
433        let audio_content =
434            vec![ContentBlock::Audio { data: "YXVkaW8=".to_string(), mime_type: "audio/wav".to_string() }];
435
436        assert!(validate_prompt_support(SONNET, &image_content).is_ok());
437        assert!(validate_prompt_support(DEEPSEEK, &image_content).is_err());
438        assert!(validate_prompt_support("gemini:gemini-live-2.5-flash-preview-native-audio", &audio_content,).is_ok());
439        assert!(validate_prompt_support(SONNET, &audio_content).is_err());
440        assert!(
441            validate_prompt_support("anthropic:claude-sonnet-4-5,deepseek:deepseek-chat", &image_content,).is_err()
442        );
443        assert!(
444            validate_prompt_support(
445                "gemini:gemini-live-2.5-flash-preview-native-audio,deepseek:deepseek-chat",
446                &audio_content,
447            )
448            .is_err()
449        );
450    }
451
452    #[test]
453    fn parse_available_model_accepts_bedrock_inference_profile_arn() {
454        let available: Vec<LlmModel> = Vec::new();
455        let parsed = parse_available_model(BEDROCK_PROFILE_ARN, &available)
456            .expect("Bedrock inference-profile ARN should be accepted");
457        assert!(matches!(parsed, LlmModel::Bedrock(BedrockModel::Profile(_))));
458    }
459
460    #[test]
461    fn parse_available_model_rejects_unknown_catalog_model() {
462        let available: Vec<LlmModel> = vec![DEEPSEEK.parse().unwrap()];
463        assert!(parse_available_model(SONNET, &available).is_err());
464    }
465
466    #[test]
467    fn parse_available_model_accepts_catalog_model_when_present() {
468        let sonnet: LlmModel = SONNET.parse().unwrap();
469        let available = vec![sonnet.clone()];
470        assert_eq!(parse_available_model(SONNET, &available).unwrap(), sonnet);
471    }
472}
473
474impl SessionManager {
475    pub async fn initialize(&self, args: InitializeRequest) -> Result<InitializeResponse, acp::Error> {
476        info!("Received initialize request: {:?}", args);
477        let auth_methods = build_auth_methods(self.oauth_credential_store.as_ref());
478        let available = get_local_models().await;
479        Ok(InitializeResponse::new(ProtocolVersion::V1)
480            .agent_info(Implementation::new("Aether", "0.1.0"))
481            .agent_capabilities(
482                AgentCapabilities::new()
483                    .load_session(true)
484                    .mcp_capabilities(McpCapabilities::new().http(true).sse(true))
485                    .session_capabilities(acp::SessionCapabilities::new().list(acp::SessionListCapabilities::new()))
486                    .prompt_capabilities(prompt_capabilities_for_models(&available)),
487            )
488            .auth_methods(auth_methods))
489    }
490
491    pub async fn authenticate(
492        &self,
493        args: AuthenticateRequest,
494        cx: &ConnectionTo<Client>,
495    ) -> Result<AuthenticateResponse, acp::Error> {
496        info!("Received authenticate request: {:?}", args);
497        let method_id = args.method_id.0.as_ref();
498        match method_id {
499            "codex" => {
500                llm::perform_codex_oauth_flow(self.oauth_credential_store.as_ref()).await.map_err(|e| {
501                    error!("OAuth flow failed for {method_id}: {e}");
502                    acp::Error::internal_error()
503                })?;
504            }
505            _ => return Err(acp::Error::invalid_params()),
506        }
507        let auth_methods = build_auth_methods(self.oauth_credential_store.as_ref());
508        if let Err(e) = cx
509            .send_notification(AuthMethodsUpdatedParams { auth_methods })
510            .map_err(|e| AcpServerError::protocol("_aether/auth_methods_updated", e))
511        {
512            error!("Failed to send auth methods updated notification: {:?}", e);
513        }
514
515        let available = get_local_models().await;
516        let all_models = get_all_models(&available);
517        let snapshots = self.registry.snapshot_all_configs().await;
518
519        for (id, snap) in snapshots {
520            let options = options_from_snapshot(&snap, &available, &all_models, self.oauth_credential_store.as_ref());
521            let notification = SessionNotification::new(
522                SessionId::new(id),
523                SessionUpdate::ConfigOptionUpdate(ConfigOptionUpdate::new(options)),
524            );
525            let _ = cx.send_notification(notification);
526        }
527
528        Ok(AuthenticateResponse::default())
529    }
530
531    pub async fn new_session(
532        &self,
533        mut args: NewSessionRequest,
534        cx: &ConnectionTo<Client>,
535    ) -> Result<NewSessionResponse, acp::Error> {
536        // Inside a sandbox container the client sends the *host* cwd, but the
537        // project is mounted at the container's working directory.
538        if std::env::var("AETHER_INSIDE_SANDBOX").is_ok() {
539            let container_cwd = std::env::current_dir().unwrap_or_else(|_| "/workspace".into());
540            info!("Sandbox: remapping cwd {:?} -> {:?}", args.cwd, container_cwd);
541            args.cwd = container_cwd;
542        }
543
544        info!("Creating new session with cwd: {:?}", args.cwd);
545        let session_id = uuid::Uuid::new_v4().to_string();
546        let acp_session_id = acp::SessionId::new(session_id.clone());
547
548        let mode_catalog = self.load_mode_catalog(&args.cwd).await?;
549        let default_model = pick_default_model(&mode_catalog.available).ok_or_else(|| {
550            error!("No models available — set an API key env var (e.g. ANTHROPIC_API_KEY)");
551            acp::Error::internal_error()
552        })?;
553
554        let ResolvedInitialSession { spec, selected_mode } =
555            self.resolve_initial_session(&mode_catalog, default_model)?;
556        let model_str = spec.model.clone();
557        let reasoning_effort = spec.reasoning_effort;
558
559        let session = Session::new(
560            spec,
561            args.cwd.clone(),
562            map_acp_mcp_servers(args.mcp_servers),
563            None,
564            Some(session_id.clone()),
565            Arc::clone(&self.oauth_credential_store),
566        )
567        .await
568        .map_err(|e| {
569            error!("Failed to create session: {}", e);
570            acp::Error::internal_error()
571        })?;
572
573        let available_commands = session.list_available_commands().await.map_err(|e| {
574            error!("Failed to list available commands: {}", e);
575            acp::Error::internal_error()
576        })?;
577
578        let meta = SessionMeta {
579            session_id: session_id.clone(),
580            cwd: args.cwd.clone(),
581            model: model_str.clone(),
582            selected_mode: selected_mode.clone(),
583            created_at: IsoString::now().0,
584        };
585        if let Err(e) = self.session_store.append_meta(&session_id, &meta) {
586            error!("Failed to write session meta: {e}");
587        }
588
589        let config_options = self
590            .register_session(
591                session,
592                &session_id,
593                &acp_session_id,
594                &model_str,
595                selected_mode,
596                reasoning_effort,
597                mode_catalog.modes,
598                cx,
599            )
600            .await;
601
602        info!("Session {} created successfully", session_id);
603
604        let response = NewSessionResponse::new(acp_session_id.clone()).config_options(config_options);
605
606        Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
607
608        Ok(response)
609    }
610
611    pub fn list_sessions(&self, args: &ListSessionsRequest) -> Result<ListSessionsResponse, acp::Error> {
612        info!("Listing sessions, cwd filter: {:?}", args.cwd);
613        let mut summaries = self.session_store.list();
614
615        if let Some(cwd) = args.cwd.as_ref() {
616            summaries.retain(|s| s.meta.cwd == *cwd);
617        }
618
619        let sessions: Vec<acp::SessionInfo> = summaries
620            .into_iter()
621            .map(|s| acp::SessionInfo::new(s.meta.session_id, s.meta.cwd).updated_at(s.meta.created_at).title(s.title))
622            .collect();
623
624        info!("Found {} sessions", sessions.len());
625        Ok(ListSessionsResponse::new(sessions))
626    }
627
628    pub async fn load_session(
629        &self,
630        args: LoadSessionRequest,
631        cx: &ConnectionTo<Client>,
632    ) -> Result<LoadSessionResponse, acp::Error> {
633        let session_id = args.session_id.0.to_string();
634        info!("Loading session: {session_id}");
635
636        let (meta, events) = self.session_store.load(&session_id).ok_or_else(|| {
637            error!("Session not found: {session_id}");
638            acp::Error::invalid_params()
639        })?;
640
641        let context = Context::from_events(&events);
642        let mode_catalog = self.load_mode_catalog(&args.cwd).await?;
643
644        let spec = self.apply_provider_connection_overrides(if let Some(mode_name) = meta.selected_mode.as_deref() {
645            resolve_agent_spec(&mode_catalog.catalog, mode_name)?
646        } else {
647            let parsed_model: LlmModel = meta.model.parse().map_err(|e: String| {
648                error!("Failed to parse restored model '{}': {e}", meta.model);
649                acp::Error::invalid_params()
650            })?;
651            AgentSpec::default_spec(&parsed_model, None, Vec::new())
652        });
653
654        let model = spec.model.clone();
655
656        let restored_messages: Vec<_> = context.messages().iter().filter(|m| !m.is_system()).cloned().collect();
657
658        let session = Session::new(
659            spec,
660            args.cwd.clone(),
661            map_acp_mcp_servers(args.mcp_servers),
662            Some(restored_messages),
663            Some(session_id.clone()),
664            Arc::clone(&self.oauth_credential_store),
665        )
666        .await
667        .map_err(|e| {
668            error!("Failed to create session for load: {e}");
669            acp::Error::internal_error()
670        })?;
671
672        let available_commands = session.list_available_commands().await.map_err(|e| {
673            error!("Failed to list available commands: {e}");
674            acp::Error::internal_error()
675        })?;
676
677        let acp_session_id = acp::SessionId::new(session_id.clone());
678
679        let config_options = self
680            .register_session(
681                session,
682                &session_id,
683                &acp_session_id,
684                &model,
685                meta.selected_mode,
686                None,
687                mode_catalog.modes,
688                cx,
689            )
690            .await;
691
692        info!("Session {session_id} loaded successfully");
693
694        let response = LoadSessionResponse::new().config_options(config_options);
695        replay_to_client(&events, cx, &acp_session_id).await;
696        Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
697
698        Ok(response)
699    }
700
701    pub async fn prompt(&self, args: acp::PromptRequest) -> Result<acp::PromptResponse, acp::Error> {
702        info!("Received prompt for session: {:?}", args.session_id);
703        let session_id_str = args.session_id.0.to_string();
704        let content = map_acp_to_content_blocks(args.prompt);
705
706        let model = self.registry.effective_model(&session_id_str).await.ok_or_else(|| {
707            error!("Session not found: {}", session_id_str);
708            acp::Error::invalid_params()
709        })?;
710        validate_prompt_support(&model, &content)?;
711
712        let dispatch = self.registry.begin_prompt(&session_id_str).await.ok_or_else(|| {
713            error!("Session not found: {}", session_id_str);
714            acp::Error::invalid_params()
715        })?;
716
717        let (result_tx, result_rx) = oneshot::channel();
718        dispatch
719            .relay_tx
720            .send(SessionCommand::Prompt {
721                content,
722                switch_model: dispatch.switch_model,
723                reasoning_effort: dispatch.reasoning_effort,
724                result_tx,
725            })
726            .await
727            .map_err(|_| {
728                error!("Relay channel closed for session {}", session_id_str);
729                acp::Error::internal_error()
730            })?;
731
732        let stop_reason = result_rx
733            .await
734            .map_err(|_| {
735                error!("Relay dropped result channel for session {}", session_id_str);
736                acp::Error::internal_error()
737            })?
738            .map_err(|e| {
739                error!("Relay error for session {}: {}", session_id_str, e);
740                acp::Error::internal_error()
741            })?;
742
743        info!("Prompt completed with stop reason: {:?}", stop_reason);
744        Ok(PromptResponse::new(stop_reason))
745    }
746
747    pub async fn cancel(&self, args: acp::CancelNotification) -> Result<(), acp::Error> {
748        info!("Received cancel for session: {:?}", args.session_id);
749        let session_id_str = args.session_id.0.to_string();
750        let relay = self.registry.relay(&session_id_str).await.ok_or_else(|| {
751            error!("Session not found for cancel: {}", session_id_str);
752            acp::Error::invalid_params()
753        })?;
754
755        relay.cmd.send(SessionCommand::Cancel).await.map_err(|_| {
756            error!("Relay channel closed for cancel: {}", session_id_str);
757            acp::Error::internal_error()
758        })?;
759
760        Ok(())
761    }
762
763    pub async fn set_session_config_option(
764        &self,
765        args: SetSessionConfigOptionRequest,
766    ) -> Result<SetSessionConfigOptionResponse, acp::Error> {
767        let session_id_str = args.session_id.0.to_string();
768        let config_id = args.config_id.0.to_string();
769        let value = args.value.0.to_string();
770
771        info!("set_session_config_option: session={}, config={}, value={}", session_id_str, config_id, value);
772
773        let setting = ConfigSetting::parse(&config_id, &value).map_err(|e| {
774            error!("{e}");
775            acp::Error::invalid_params()
776        })?;
777
778        let available = get_local_models().await;
779        let all_models = get_all_models(&available);
780
781        let snapshot =
782            self.registry.apply_config_change(&session_id_str, &setting, &available).await.ok_or_else(|| {
783                error!("Session not found: {}", session_id_str);
784                acp::Error::invalid_params()
785            })??;
786
787        let options = options_from_snapshot(&snapshot, &available, &all_models, self.oauth_credential_store.as_ref());
788        Ok(SetSessionConfigOptionResponse::new(options))
789    }
790
791    pub async fn on_mcp_request(&self, request: McpRequest) -> Result<(), acp::Error> {
792        info!("Received MCP ext request: {:?}", request);
793        match request {
794            McpRequest::Authenticate { session_id, server_name } => {
795                let relay = self.registry.relay(&session_id).await.ok_or_else(|| {
796                    error!("Session not found for authenticate_mcp_server: {}", session_id);
797                    acp::Error::invalid_params()
798                })?;
799
800                relay.mcp_request.send(McpRequest::Authenticate { session_id, server_name }).await.map_err(|_| {
801                    error!("MCP request channel closed for session");
802                    acp::Error::internal_error()
803                })?;
804            }
805        }
806
807        Ok(())
808    }
809}