Skip to main content

aether_cli/acp/
session_manager.rs

1use acp_utils::notifications::{
2    AuthMethodsUpdatedParams, McpRequest, PromptSearchParams, PromptSearchResponse, prompt_search_capability,
3};
4use acp_utils::server::AcpServerError;
5use aether_auth::OAuthCredentialStorage;
6use agent_client_protocol::schema::{
7    self as acp, AgentCapabilities, AuthMethod, AuthenticateRequest, AuthenticateResponse, AvailableCommandsUpdate,
8    ConfigOptionUpdate, Implementation, InitializeRequest, InitializeResponse, ListSessionsRequest,
9    ListSessionsResponse, LoadSessionRequest, LoadSessionResponse, McpCapabilities, NewSessionRequest,
10    NewSessionResponse, PromptCapabilities, PromptResponse, ProtocolVersion, SessionId, SessionNotification,
11    SessionUpdate, SetSessionConfigOptionRequest, SetSessionConfigOptionResponse,
12};
13use agent_client_protocol::{Client, ConnectionTo};
14use llm::catalog::{LlmModel, get_local_models};
15use llm::types::IsoString;
16use llm::{ContentBlock, ProviderConnectionOverrides, ReasoningEffort};
17use std::collections::HashSet;
18use std::path::Path;
19use std::sync::Arc;
20use tokio::sync::oneshot;
21use tracing::{error, info, warn};
22
23use super::config_setting::ConfigSetting;
24use super::mappers::{map_acp_mcp_servers, replay_to_client};
25use super::model_config::{
26    ValidatedMode, build_config_options_from_modes, model_exists, pick_default_model, supports_prompt_audio,
27    validated_modes_from_specs,
28};
29use super::relay::{SessionCommand, spawn_relay};
30use super::session::Session;
31use super::session_registry::{ConfigSnapshot, SessionRegistry};
32use super::session_store::{SessionMeta, SessionStore};
33use crate::settings_args::SettingsSourceArgs;
34use acp_utils::content::format_embedded_resource;
35use aether_core::agent_spec::AgentSpec;
36use aether_core::context::ext::ContextExt;
37use aether_project::{AetherSettings, AgentCatalog};
38use llm::Context;
39
40/// Initial session selection supplied when `aether acp` starts.
41#[derive(Clone, Debug, Default)]
42pub enum InitialSessionSelection {
43    #[default]
44    Default,
45    Agent(String),
46    Model {
47        model: String,
48        reasoning_effort: Option<ReasoningEffort>,
49    },
50}
51
52impl InitialSessionSelection {
53    pub fn agent(name: String) -> Self {
54        Self::Agent(name)
55    }
56
57    pub fn model(model: String, reasoning_effort: Option<ReasoningEffort>) -> Self {
58        Self::Model { model, reasoning_effort }
59    }
60}
61
62/// Manages ACP sessions, each session has its own agent and state
63pub struct SessionManager {
64    registry: Arc<SessionRegistry>,
65    session_store: Arc<SessionStore>,
66    oauth_credential_store: Arc<dyn OAuthCredentialStorage>,
67    initial_selection: InitialSessionSelection,
68    settings_source: SettingsSourceArgs,
69    provider_connections: ProviderConnectionOverrides,
70}
71
72pub(crate) struct SessionManagerConfig {
73    pub(crate) registry: Arc<SessionRegistry>,
74    pub(crate) session_store: Arc<SessionStore>,
75    pub(crate) oauth_credential_store: Arc<dyn OAuthCredentialStorage>,
76    pub(crate) initial_selection: InitialSessionSelection,
77    pub(crate) settings_source: SettingsSourceArgs,
78    pub(crate) provider_connections: ProviderConnectionOverrides,
79}
80
81struct SessionModeCatalog {
82    catalog: AgentCatalog,
83    modes: Vec<ValidatedMode>,
84    available: Vec<LlmModel>,
85}
86
87struct ResolvedInitialSession {
88    spec: AgentSpec,
89    selected_mode: Option<String>,
90}
91
92#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
93struct PromptModalities {
94    image: bool,
95    audio: bool,
96}
97
98impl PromptModalities {
99    fn from_content(content: &[ContentBlock]) -> Self {
100        Self {
101            image: content.iter().any(ContentBlock::is_image),
102            audio: content.iter().any(|block| matches!(block, ContentBlock::Audio { .. })),
103        }
104    }
105
106    fn is_empty(self) -> bool {
107        !self.image && !self.audio
108    }
109}
110
111impl SessionManager {
112    pub(crate) fn new(deps: SessionManagerConfig) -> Self {
113        Self {
114            registry: deps.registry,
115            session_store: deps.session_store,
116            oauth_credential_store: deps.oauth_credential_store,
117            initial_selection: deps.initial_selection,
118            settings_source: deps.settings_source,
119            provider_connections: deps.provider_connections,
120        }
121    }
122
123    fn apply_provider_connection_overrides(&self, mut spec: AgentSpec) -> AgentSpec {
124        spec.provider_connections.merge(self.provider_connections.clone());
125        spec
126    }
127
128    fn resolve_initial_session(
129        &self,
130        mode_catalog: &SessionModeCatalog,
131        default_model: &LlmModel,
132    ) -> Result<ResolvedInitialSession, acp::Error> {
133        let mut resolved = match &self.initial_selection {
134            InitialSessionSelection::Default => resolve_default_initial_session(mode_catalog, default_model),
135            InitialSessionSelection::Agent(agent) => {
136                if !mode_catalog.modes.iter().any(|mode| mode.name == *agent) {
137                    warn!("Unknown agent `{agent}` requested via --agent");
138                    return Err(acp::Error::invalid_params());
139                }
140                resolve_agent_spec(&mode_catalog.catalog, agent)
141                    .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(agent.clone()) })
142            }
143            InitialSessionSelection::Model { model, reasoning_effort } => {
144                let model = parse_available_model(model, &mode_catalog.available)?;
145                Ok(ResolvedInitialSession {
146                    spec: AgentSpec::default_spec(&model, *reasoning_effort, Vec::new()),
147                    selected_mode: None,
148                })
149            }
150        }?;
151        resolved.spec = self.apply_provider_connection_overrides(resolved.spec);
152        Ok(resolved)
153    }
154
155    async fn load_mode_catalog(&self, cwd: &Path) -> Result<SessionModeCatalog, acp::Error> {
156        let config = if let Some(source) = self.settings_source.source(cwd) {
157            AetherSettings::load(cwd, [source])
158        } else {
159            AetherSettings::load_default(cwd)
160        }
161        .map_err(|e| {
162            error!("Failed to load agent catalog: {e}");
163            acp::Error::invalid_params()
164        })?;
165        let catalog = if config.agents.is_empty() {
166            AgentCatalog::empty(cwd.to_path_buf())
167        } else {
168            AgentCatalog::from_settings(cwd, config).map_err(|e| {
169                error!("Failed to load agent catalog: {e}");
170                acp::Error::invalid_params()
171            })?
172        };
173
174        let available = get_local_models().await;
175        let specs: Vec<_> = catalog.user_invocable().cloned().collect();
176        let modes = validated_modes_from_specs(&specs, &available);
177
178        Ok(SessionModeCatalog { catalog, modes, available })
179    }
180
181    #[allow(clippy::too_many_arguments, clippy::similar_names)]
182    async fn register_session(
183        &self,
184        session: Session,
185        session_id: &str,
186        acp_session_id: &SessionId,
187        model: &str,
188        selected_mode: Option<String>,
189        reasoning_effort: Option<ReasoningEffort>,
190        modes: Vec<ValidatedMode>,
191        cx: &ConnectionTo<Client>,
192    ) -> Vec<acp::SessionConfigOption> {
193        let relay = spawn_relay(
194            session,
195            cx.clone(),
196            acp_session_id.clone(),
197            self.session_store.clone(),
198            Arc::clone(&self.oauth_credential_store),
199        );
200
201        self.registry
202            .insert(
203                session_id.to_string(),
204                relay,
205                model.to_string(),
206                selected_mode.clone(),
207                reasoning_effort,
208                modes.clone(),
209            )
210            .await;
211
212        let available = get_local_models().await;
213        let all_models = get_all_models(&available);
214        build_config_options_from_modes(
215            &modes,
216            &available,
217            selected_mode.as_deref(),
218            model,
219            reasoning_effort,
220            &all_models,
221            self.oauth_credential_store.as_ref(),
222        )
223    }
224
225    fn send_available_commands_notification(
226        available_commands: Vec<acp::AvailableCommand>,
227        acp_session_id: SessionId,
228        session_id: &str,
229        cx: &ConnectionTo<Client>,
230    ) {
231        if available_commands.is_empty() {
232            return;
233        }
234        let command_count = available_commands.len();
235        let notification = SessionNotification::new(
236            acp_session_id,
237            SessionUpdate::AvailableCommandsUpdate(AvailableCommandsUpdate::new(available_commands)),
238        );
239        if let Err(e) = cx.send_notification(notification).map_err(|e| AcpServerError::protocol("session/update", e)) {
240            error!("Failed to send available commands notification: {:?}", e);
241        } else {
242            info!("Sent available commands update for session {} ({} commands)", session_id, command_count);
243        }
244    }
245
246    /// Drain every session and stop its relay task. Blocks until every relay
247    /// has exited.
248    pub async fn shutdown_all_sessions(&self) {
249        self.registry.shutdown_all().await;
250    }
251}
252
253fn options_from_snapshot(
254    snapshot: &ConfigSnapshot,
255    available: &[LlmModel],
256    all_models: &[LlmModel],
257    credential_store: &dyn OAuthCredentialStorage,
258) -> Vec<acp::SessionConfigOption> {
259    build_config_options_from_modes(
260        &snapshot.modes,
261        available,
262        snapshot.selected_mode.as_deref(),
263        &snapshot.effective_model,
264        snapshot.reasoning_effort,
265        all_models,
266        credential_store,
267    )
268}
269
270/// Merge catalog `all()` with locally-discovered models for the `all_models`
271/// parameter of `build_model_config_option`.
272fn get_all_models(discovered: &[LlmModel]) -> Vec<LlmModel> {
273    let mut all = LlmModel::all().to_vec();
274    for m in discovered {
275        if !all.contains(m) {
276            all.push(m.clone());
277        }
278    }
279    all
280}
281
282fn build_auth_methods(store: &dyn OAuthCredentialStorage) -> Vec<AuthMethod> {
283    let mut seen = HashSet::new();
284    LlmModel::all()
285        .iter()
286        .filter_map(LlmModel::oauth_provider_id)
287        .filter(|id| seen.insert(*id))
288        .map(|id| {
289            let display = LlmModel::all()
290                .iter()
291                .find(|m| m.oauth_provider_id() == Some(id))
292                .map_or(id, |m| m.provider_display_name());
293            let mut method = acp::AuthMethodAgent::new(id, display);
294            if store.has_credential(id) {
295                method = method.description("authenticated");
296            }
297            AuthMethod::Agent(method)
298        })
299        .collect()
300}
301
302fn map_acp_to_content_blocks(blocks: Vec<acp::ContentBlock>) -> Vec<ContentBlock> {
303    blocks
304        .into_iter()
305        .map(|block| match block {
306            acp::ContentBlock::Text(t) => ContentBlock::text(t.text),
307            acp::ContentBlock::Image(img) => ContentBlock::Image { data: img.data, mime_type: img.mime_type },
308            acp::ContentBlock::Audio(aud) => ContentBlock::Audio { data: aud.data, mime_type: aud.mime_type },
309            acp::ContentBlock::Resource(r) => ContentBlock::text(format_embedded_resource(&r)),
310            acp::ContentBlock::ResourceLink(l) => ContentBlock::text(format!("[Resource: {}]", l.uri)),
311            _ => ContentBlock::text("[Unknown content]"),
312        })
313        .collect()
314}
315
316fn resolve_agent_spec(catalog: &AgentCatalog, mode_name: &str) -> Result<AgentSpec, acp::Error> {
317    catalog.resolve(mode_name).map_err(|e| {
318        error!("Failed to resolve runtime inputs for mode '{}': {e}", mode_name);
319        acp::Error::invalid_params()
320    })
321}
322
323fn resolve_default_initial_session(
324    mode_catalog: &SessionModeCatalog,
325    default_model: &LlmModel,
326) -> Result<ResolvedInitialSession, acp::Error> {
327    if let Some(mode) = mode_catalog.modes.first() {
328        return resolve_agent_spec(&mode_catalog.catalog, &mode.name)
329            .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(mode.name.clone()) });
330    }
331
332    Ok(ResolvedInitialSession { spec: AgentSpec::default_spec(default_model, None, Vec::new()), selected_mode: None })
333}
334
335fn parse_available_model(model: &str, available: &[LlmModel]) -> Result<LlmModel, acp::Error> {
336    let parsed = model.parse().map_err(|e: String| {
337        warn!("Failed to parse --model `{model}`: {e}");
338        acp::Error::invalid_params()
339    })?;
340
341    if model_exists(available, model) {
342        Ok(parsed)
343    } else {
344        warn!("Requested model `{model}` is not available");
345        Err(acp::Error::invalid_params())
346    }
347}
348
349fn prompt_capabilities_for_models(models: &[LlmModel]) -> PromptCapabilities {
350    PromptCapabilities::new()
351        .embedded_context(true)
352        .image(models.iter().any(LlmModel::supports_image))
353        .audio(models.iter().any(supports_prompt_audio))
354}
355
356fn selected_models(model_value: &str) -> Result<Vec<LlmModel>, acp::Error> {
357    model_value
358        .split(',')
359        .map(str::trim)
360        .filter(|part| !part.is_empty())
361        .map(|part| part.parse::<LlmModel>().map_err(|_| acp::Error::invalid_params()))
362        .collect()
363}
364
365fn validate_prompt_support(model_value: &str, content: &[ContentBlock]) -> Result<(), acp::Error> {
366    let modalities = PromptModalities::from_content(content);
367    if modalities.is_empty() {
368        return Ok(());
369    }
370
371    let selected = selected_models(model_value)?;
372    if modalities.image && selected.iter().any(|model| !model.supports_image()) {
373        return Err(acp::Error::invalid_params());
374    }
375    if modalities.audio && selected.iter().any(|model| !supports_prompt_audio(model)) {
376        return Err(acp::Error::invalid_params());
377    }
378
379    Ok(())
380}
381
382#[cfg(test)]
383#[allow(clippy::items_after_test_module)]
384mod tests {
385    use super::*;
386    use agent_client_protocol::schema::{InitializeRequest, ProtocolVersion};
387
388    const SONNET: &str = "anthropic:claude-sonnet-4-5";
389    const DEEPSEEK: &str = "deepseek:deepseek-chat";
390    const BEDROCK_ARN_AS_MODEL_REJECTED: &str =
391        "bedrock:arn:aws:bedrock:us-west-2:000000000000:application-inference-profile/000000000000";
392
393    fn mock_oauth_store() -> Arc<dyn OAuthCredentialStorage> {
394        Arc::new(aether_auth::FakeOAuthCredentialStore::new())
395    }
396
397    #[tokio::test]
398    async fn initialize_always_advertises_load_session_support() {
399        let session_store =
400            SessionStore::new().map_or_else(|e| panic!("Failed to initialize session store: {e}"), Arc::new);
401        let manager = SessionManager::new(SessionManagerConfig {
402            registry: Arc::new(SessionRegistry::new()),
403            session_store,
404            oauth_credential_store: mock_oauth_store(),
405            initial_selection: InitialSessionSelection::default(),
406            settings_source: SettingsSourceArgs::default(),
407            provider_connections: ProviderConnectionOverrides::default(),
408        });
409        let response =
410            manager.initialize(InitializeRequest::new(ProtocolVersion::LATEST)).await.expect("initialize succeeds");
411        let json = serde_json::to_string(&response).expect("response serializes");
412        assert!(json.contains("\"loadSession\":true"));
413    }
414
415    #[tokio::test]
416    async fn initialize_advertises_prompt_search_capability() {
417        let session_store =
418            SessionStore::new().map_or_else(|e| panic!("Failed to initialize session store: {e}"), Arc::new);
419        let manager = SessionManager::new(SessionManagerConfig {
420            registry: Arc::new(SessionRegistry::new()),
421            session_store,
422            oauth_credential_store: mock_oauth_store(),
423            initial_selection: InitialSessionSelection::default(),
424            settings_source: SettingsSourceArgs::default(),
425            provider_connections: ProviderConnectionOverrides::default(),
426        });
427
428        let response =
429            manager.initialize(InitializeRequest::new(ProtocolVersion::LATEST)).await.expect("initialize succeeds");
430
431        assert!(prompt_search_capability::is_advertised(response.agent_capabilities.prompt_capabilities.meta.as_ref()));
432    }
433    #[test]
434    fn prompt_capabilities_reflect_available_modalities() {
435        let image_only = prompt_capabilities_for_models(&["anthropic:claude-sonnet-4-5".parse().unwrap()]);
436        assert!(image_only.image);
437        assert!(!image_only.audio);
438
439        let audio_capable =
440            prompt_capabilities_for_models(&["gemini:gemini-live-2.5-flash-preview-native-audio".parse().unwrap()]);
441        assert!(!audio_capable.image);
442        assert!(audio_capable.audio);
443
444        let text_only = prompt_capabilities_for_models(&[DEEPSEEK.parse().unwrap()]);
445        assert!(!text_only.image);
446        assert!(!text_only.audio);
447    }
448
449    #[test]
450    fn validate_prompt_support_requires_all_selected_models_to_support_media() {
451        let image_content = vec![ContentBlock::Image { data: "aW1n".to_string(), mime_type: "image/png".to_string() }];
452        let audio_content =
453            vec![ContentBlock::Audio { data: "YXVkaW8=".to_string(), mime_type: "audio/wav".to_string() }];
454
455        assert!(validate_prompt_support(SONNET, &image_content).is_ok());
456        assert!(validate_prompt_support(DEEPSEEK, &image_content).is_err());
457        assert!(validate_prompt_support("gemini:gemini-live-2.5-flash-preview-native-audio", &audio_content,).is_ok());
458        assert!(validate_prompt_support(SONNET, &audio_content).is_err());
459        assert!(
460            validate_prompt_support("anthropic:claude-sonnet-4-5,deepseek:deepseek-chat", &image_content,).is_err()
461        );
462        assert!(
463            validate_prompt_support(
464                "gemini:gemini-live-2.5-flash-preview-native-audio,deepseek:deepseek-chat",
465                &audio_content,
466            )
467            .is_err()
468        );
469    }
470
471    #[test]
472    fn parse_available_model_rejects_bedrock_inference_profile_arn() {
473        let available: Vec<LlmModel> = Vec::new();
474        let error = parse_available_model(BEDROCK_ARN_AS_MODEL_REJECTED, &available).unwrap_err();
475        assert_eq!(error, acp::Error::invalid_params());
476    }
477
478    #[test]
479    fn parse_available_model_rejects_unknown_catalog_model() {
480        let available: Vec<LlmModel> = vec![DEEPSEEK.parse().unwrap()];
481        assert!(parse_available_model(SONNET, &available).is_err());
482    }
483
484    #[test]
485    fn parse_available_model_accepts_catalog_model_when_present() {
486        let sonnet: LlmModel = SONNET.parse().unwrap();
487        let available = vec![sonnet.clone()];
488        assert_eq!(parse_available_model(SONNET, &available).unwrap(), sonnet);
489    }
490}
491
492impl SessionManager {
493    pub async fn initialize(&self, args: InitializeRequest) -> Result<InitializeResponse, acp::Error> {
494        info!("Received initialize request: {:?}", args);
495        let auth_methods = build_auth_methods(self.oauth_credential_store.as_ref());
496        let available = get_local_models().await;
497        let prompt_capabilities =
498            prompt_capabilities_for_models(&available).meta(Some(prompt_search_capability::to_meta()));
499
500        Ok(InitializeResponse::new(ProtocolVersion::V1)
501            .agent_info(Implementation::new("Aether", "0.1.0"))
502            .agent_capabilities(
503                AgentCapabilities::new()
504                    .load_session(true)
505                    .mcp_capabilities(McpCapabilities::new().http(true).sse(true))
506                    .session_capabilities(acp::SessionCapabilities::new().list(acp::SessionListCapabilities::new()))
507                    .prompt_capabilities(prompt_capabilities),
508            )
509            .auth_methods(auth_methods))
510    }
511
512    pub fn search_prompts(&self, params: &PromptSearchParams) -> Result<PromptSearchResponse, acp::Error> {
513        self.session_store.search_prompts(params).map_err(|e| {
514            error!("Prompt search failed: {e}");
515            acp::Error::internal_error()
516        })
517    }
518
519    pub async fn authenticate(
520        &self,
521        args: AuthenticateRequest,
522        cx: &ConnectionTo<Client>,
523    ) -> Result<AuthenticateResponse, acp::Error> {
524        info!("Received authenticate request: {:?}", args);
525        let method_id = args.method_id.0.as_ref();
526        match method_id {
527            "codex" => {
528                llm::perform_codex_oauth_flow(self.oauth_credential_store.as_ref()).await.map_err(|e| {
529                    error!("OAuth flow failed for {method_id}: {e}");
530                    acp::Error::internal_error()
531                })?;
532            }
533            _ => return Err(acp::Error::invalid_params()),
534        }
535        let auth_methods = build_auth_methods(self.oauth_credential_store.as_ref());
536        if let Err(e) = cx
537            .send_notification(AuthMethodsUpdatedParams { auth_methods })
538            .map_err(|e| AcpServerError::protocol("_aether/auth_methods_updated", e))
539        {
540            error!("Failed to send auth methods updated notification: {:?}", e);
541        }
542
543        let available = get_local_models().await;
544        let all_models = get_all_models(&available);
545        let snapshots = self.registry.snapshot_all_configs().await;
546
547        for (id, snap) in snapshots {
548            let options = options_from_snapshot(&snap, &available, &all_models, self.oauth_credential_store.as_ref());
549            let notification = SessionNotification::new(
550                SessionId::new(id),
551                SessionUpdate::ConfigOptionUpdate(ConfigOptionUpdate::new(options)),
552            );
553            let _ = cx.send_notification(notification);
554        }
555
556        Ok(AuthenticateResponse::default())
557    }
558
559    pub async fn new_session(
560        &self,
561        mut args: NewSessionRequest,
562        cx: &ConnectionTo<Client>,
563    ) -> Result<NewSessionResponse, acp::Error> {
564        // Inside a sandbox container the client sends the *host* cwd, but the
565        // project is mounted at the container's working directory.
566        if std::env::var("AETHER_INSIDE_SANDBOX").is_ok() {
567            let container_cwd = std::env::current_dir().unwrap_or_else(|_| "/workspace".into());
568            info!("Sandbox: remapping cwd {:?} -> {:?}", args.cwd, container_cwd);
569            args.cwd = container_cwd;
570        }
571
572        info!("Creating new session with cwd: {:?}", args.cwd);
573        let session_id = uuid::Uuid::new_v4().to_string();
574        let acp_session_id = acp::SessionId::new(session_id.clone());
575
576        let mode_catalog = self.load_mode_catalog(&args.cwd).await?;
577        let default_model = pick_default_model(&mode_catalog.available).ok_or_else(|| {
578            error!("No models available — set an API key env var (e.g. ANTHROPIC_API_KEY)");
579            acp::Error::internal_error()
580        })?;
581
582        let ResolvedInitialSession { spec, selected_mode } =
583            self.resolve_initial_session(&mode_catalog, default_model)?;
584        let model_str = spec.model.clone();
585        let reasoning_effort = spec.reasoning_effort;
586
587        let session = Session::new(
588            spec,
589            args.cwd.clone(),
590            map_acp_mcp_servers(args.mcp_servers),
591            None,
592            Some(session_id.clone()),
593            Arc::clone(&self.oauth_credential_store),
594        )
595        .await
596        .map_err(|e| {
597            error!("Failed to create session: {}", e);
598            acp::Error::internal_error()
599        })?;
600
601        let available_commands = session.list_available_commands().await.map_err(|e| {
602            error!("Failed to list available commands: {}", e);
603            acp::Error::internal_error()
604        })?;
605
606        let meta = SessionMeta {
607            session_id: session_id.clone(),
608            cwd: args.cwd.clone(),
609            model: model_str.clone(),
610            selected_mode: selected_mode.clone(),
611            created_at: IsoString::now().0,
612        };
613        if let Err(e) = self.session_store.append_meta(&session_id, &meta) {
614            error!("Failed to write session meta: {e}");
615        }
616
617        let config_options = self
618            .register_session(
619                session,
620                &session_id,
621                &acp_session_id,
622                &model_str,
623                selected_mode,
624                reasoning_effort,
625                mode_catalog.modes,
626                cx,
627            )
628            .await;
629
630        info!("Session {} created successfully", session_id);
631
632        let response = NewSessionResponse::new(acp_session_id.clone()).config_options(config_options);
633
634        Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
635
636        Ok(response)
637    }
638
639    pub fn list_sessions(&self, args: &ListSessionsRequest) -> Result<ListSessionsResponse, acp::Error> {
640        info!("Listing sessions, cwd filter: {:?}", args.cwd);
641        let mut summaries = self.session_store.list();
642
643        if let Some(cwd) = args.cwd.as_ref() {
644            summaries.retain(|s| s.meta.cwd == *cwd);
645        }
646
647        let sessions: Vec<acp::SessionInfo> = summaries
648            .into_iter()
649            .map(|s| acp::SessionInfo::new(s.meta.session_id, s.meta.cwd).updated_at(s.meta.created_at).title(s.title))
650            .collect();
651
652        info!("Found {} sessions", sessions.len());
653        Ok(ListSessionsResponse::new(sessions))
654    }
655
656    pub async fn load_session(
657        &self,
658        args: LoadSessionRequest,
659        cx: &ConnectionTo<Client>,
660    ) -> Result<LoadSessionResponse, acp::Error> {
661        let session_id = args.session_id.0.to_string();
662        info!("Loading session: {session_id}");
663
664        let (meta, events) = self.session_store.load(&session_id).ok_or_else(|| {
665            error!("Session not found: {session_id}");
666            acp::Error::invalid_params()
667        })?;
668
669        let context = Context::from_events(&events);
670        let mode_catalog = self.load_mode_catalog(&args.cwd).await?;
671
672        let spec = self.apply_provider_connection_overrides(if let Some(mode_name) = meta.selected_mode.as_deref() {
673            resolve_agent_spec(&mode_catalog.catalog, mode_name)?
674        } else {
675            let parsed_model: LlmModel = meta.model.parse().map_err(|e: String| {
676                error!("Failed to parse restored model '{}': {e}", meta.model);
677                acp::Error::invalid_params()
678            })?;
679            AgentSpec::default_spec(&parsed_model, None, Vec::new())
680        });
681
682        let model = spec.model.clone();
683
684        let restored_messages: Vec<_> = context.messages().iter().filter(|m| !m.is_system()).cloned().collect();
685
686        let session = Session::new(
687            spec,
688            args.cwd.clone(),
689            map_acp_mcp_servers(args.mcp_servers),
690            Some(restored_messages),
691            Some(session_id.clone()),
692            Arc::clone(&self.oauth_credential_store),
693        )
694        .await
695        .map_err(|e| {
696            error!("Failed to create session for load: {e}");
697            acp::Error::internal_error()
698        })?;
699
700        let available_commands = session.list_available_commands().await.map_err(|e| {
701            error!("Failed to list available commands: {e}");
702            acp::Error::internal_error()
703        })?;
704
705        let acp_session_id = acp::SessionId::new(session_id.clone());
706
707        let config_options = self
708            .register_session(
709                session,
710                &session_id,
711                &acp_session_id,
712                &model,
713                meta.selected_mode,
714                None,
715                mode_catalog.modes,
716                cx,
717            )
718            .await;
719
720        info!("Session {session_id} loaded successfully");
721
722        let response = LoadSessionResponse::new().config_options(config_options);
723        replay_to_client(&events, cx, &acp_session_id).await;
724        Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
725
726        Ok(response)
727    }
728
729    pub async fn prompt(&self, args: acp::PromptRequest) -> Result<acp::PromptResponse, acp::Error> {
730        info!("Received prompt for session: {:?}", args.session_id);
731        let session_id_str = args.session_id.0.to_string();
732        let content = map_acp_to_content_blocks(args.prompt);
733
734        let model = self.registry.effective_model(&session_id_str).await.ok_or_else(|| {
735            error!("Session not found: {}", session_id_str);
736            acp::Error::invalid_params()
737        })?;
738        validate_prompt_support(&model, &content)?;
739
740        let dispatch = self.registry.begin_prompt(&session_id_str).await.ok_or_else(|| {
741            error!("Session not found: {}", session_id_str);
742            acp::Error::invalid_params()
743        })?;
744
745        let (result_tx, result_rx) = oneshot::channel();
746        dispatch
747            .relay_tx
748            .send(SessionCommand::Prompt {
749                content,
750                switch_model: dispatch.switch_model,
751                reasoning_effort: dispatch.reasoning_effort,
752                result_tx,
753            })
754            .await
755            .map_err(|_| {
756                error!("Relay channel closed for session {}", session_id_str);
757                acp::Error::internal_error()
758            })?;
759
760        let stop_reason = result_rx
761            .await
762            .map_err(|_| {
763                error!("Relay dropped result channel for session {}", session_id_str);
764                acp::Error::internal_error()
765            })?
766            .map_err(|e| {
767                error!("Relay error for session {}: {}", session_id_str, e);
768                acp::Error::internal_error()
769            })?;
770
771        info!("Prompt completed with stop reason: {:?}", stop_reason);
772        Ok(PromptResponse::new(stop_reason))
773    }
774
775    pub async fn cancel(&self, args: acp::CancelNotification) -> Result<(), acp::Error> {
776        info!("Received cancel for session: {:?}", args.session_id);
777        let session_id_str = args.session_id.0.to_string();
778        let relay = self.registry.relay(&session_id_str).await.ok_or_else(|| {
779            error!("Session not found for cancel: {}", session_id_str);
780            acp::Error::invalid_params()
781        })?;
782
783        relay.cmd.send(SessionCommand::Cancel).await.map_err(|_| {
784            error!("Relay channel closed for cancel: {}", session_id_str);
785            acp::Error::internal_error()
786        })?;
787
788        Ok(())
789    }
790
791    pub async fn set_session_config_option(
792        &self,
793        args: SetSessionConfigOptionRequest,
794    ) -> Result<SetSessionConfigOptionResponse, acp::Error> {
795        let session_id_str = args.session_id.0.to_string();
796        let config_id = args.config_id.0.to_string();
797        let value = args.value.0.to_string();
798
799        info!("set_session_config_option: session={}, config={}, value={}", session_id_str, config_id, value);
800
801        let setting = ConfigSetting::parse(&config_id, &value).map_err(|e| {
802            error!("{e}");
803            acp::Error::invalid_params()
804        })?;
805
806        let available = get_local_models().await;
807        let all_models = get_all_models(&available);
808
809        let snapshot =
810            self.registry.apply_config_change(&session_id_str, &setting, &available).await.ok_or_else(|| {
811                error!("Session not found: {}", session_id_str);
812                acp::Error::invalid_params()
813            })??;
814
815        let options = options_from_snapshot(&snapshot, &available, &all_models, self.oauth_credential_store.as_ref());
816        Ok(SetSessionConfigOptionResponse::new(options))
817    }
818
819    pub async fn on_mcp_request(&self, request: McpRequest) -> Result<(), acp::Error> {
820        info!("Received MCP ext request: {:?}", request);
821        match request {
822            McpRequest::Authenticate { session_id, server_name } => {
823                let relay = self.registry.relay(&session_id).await.ok_or_else(|| {
824                    error!("Session not found for authenticate_mcp_server: {}", session_id);
825                    acp::Error::invalid_params()
826                })?;
827
828                relay.mcp_request.send(McpRequest::Authenticate { session_id, server_name }).await.map_err(|_| {
829                    error!("MCP request channel closed for session");
830                    acp::Error::internal_error()
831                })?;
832            }
833        }
834
835        Ok(())
836    }
837}