Skip to main content

aether_cli/acp/
session_manager.rs

1use acp_utils::notifications::{AuthMethodsUpdatedParams, McpRequest};
2use acp_utils::server::AcpServerError;
3use agent_client_protocol::schema::{
4    self as acp, AgentCapabilities, AuthMethod, AuthenticateRequest, AuthenticateResponse, AvailableCommandsUpdate,
5    ConfigOptionUpdate, Implementation, InitializeRequest, InitializeResponse, ListSessionsRequest,
6    ListSessionsResponse, LoadSessionRequest, LoadSessionResponse, McpCapabilities, NewSessionRequest,
7    NewSessionResponse, PromptCapabilities, PromptResponse, ProtocolVersion, SessionId, SessionNotification,
8    SessionUpdate, SetSessionConfigOptionRequest, SetSessionConfigOptionResponse,
9};
10use agent_client_protocol::{Client, ConnectionTo};
11use llm::catalog::{LlmModel, get_local_models};
12use llm::oauth::OAuthCredentialStore;
13use llm::types::IsoString;
14use llm::{ContentBlock, ReasoningEffort};
15use std::collections::HashSet;
16use std::path::Path;
17use std::sync::Arc;
18use tokio::spawn;
19use tokio::sync::oneshot;
20use tracing::{error, info, warn};
21
22use super::config_setting::ConfigSetting;
23use super::mappers::{map_acp_mcp_servers, replay_to_client};
24use super::model_config::{
25    ValidatedMode, build_config_options_from_modes, pick_default_model, supports_prompt_audio,
26    validated_modes_from_specs,
27};
28use super::relay::{SessionCommand, spawn_relay};
29use super::session::Session;
30use super::session_registry::{ConfigSnapshot, SessionRegistry};
31use super::session_store::{SessionMeta, SessionStore};
32use acp_utils::content::format_embedded_resource;
33use aether_core::agent_spec::AgentSpec;
34use aether_core::context::ext::ContextExt;
35use aether_project::{AetherSettings, AetherSettingsSource, AgentCatalog};
36use llm::Context;
37
38/// Initial session selection supplied when `aether acp` starts.
39#[derive(Clone, Debug, Default)]
40pub enum InitialSessionSelection {
41    #[default]
42    Default,
43    Agent(String),
44    Model {
45        model: String,
46        reasoning_effort: Option<ReasoningEffort>,
47    },
48}
49
50impl InitialSessionSelection {
51    pub fn agent(name: String) -> Self {
52        Self::Agent(name)
53    }
54
55    pub fn model(model: String, reasoning_effort: Option<ReasoningEffort>) -> Self {
56        Self::Model { model, reasoning_effort }
57    }
58}
59
60/// Manages ACP sessions, each session has its own agent and state
61pub struct SessionManager {
62    registry: Arc<SessionRegistry>,
63    session_store: Arc<SessionStore>,
64    has_oauth_credential: fn(&str) -> bool,
65    initial_selection: InitialSessionSelection,
66    settings_source: Option<AetherSettingsSource>,
67}
68
69pub(crate) struct SessionManagerConfig {
70    pub(crate) registry: Arc<SessionRegistry>,
71    pub(crate) session_store: Arc<SessionStore>,
72    pub(crate) has_oauth_credential: fn(&str) -> bool,
73    pub(crate) initial_selection: InitialSessionSelection,
74    pub(crate) settings_source: Option<AetherSettingsSource>,
75}
76
77struct SessionModeCatalog {
78    catalog: AgentCatalog,
79    modes: Vec<ValidatedMode>,
80    available: Vec<LlmModel>,
81}
82
83struct ResolvedInitialSession {
84    spec: AgentSpec,
85    selected_mode: Option<String>,
86}
87
88#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
89struct PromptModalities {
90    image: bool,
91    audio: bool,
92}
93
94impl PromptModalities {
95    fn from_content(content: &[ContentBlock]) -> Self {
96        Self {
97            image: content.iter().any(ContentBlock::is_image),
98            audio: content.iter().any(|block| matches!(block, ContentBlock::Audio { .. })),
99        }
100    }
101
102    fn is_empty(self) -> bool {
103        !self.image && !self.audio
104    }
105}
106
107impl SessionManager {
108    pub(crate) fn new(deps: SessionManagerConfig) -> Self {
109        Self {
110            registry: deps.registry,
111            session_store: deps.session_store,
112            has_oauth_credential: deps.has_oauth_credential,
113            initial_selection: deps.initial_selection,
114            settings_source: deps.settings_source,
115        }
116    }
117
118    fn resolve_initial_session(
119        &self,
120        mode_catalog: &SessionModeCatalog,
121        default_model: &LlmModel,
122    ) -> Result<ResolvedInitialSession, acp::Error> {
123        match &self.initial_selection {
124            InitialSessionSelection::Default => resolve_default_initial_session(mode_catalog, default_model),
125            InitialSessionSelection::Agent(agent) => {
126                if !mode_catalog.modes.iter().any(|mode| mode.name == *agent) {
127                    warn!("Unknown agent `{agent}` requested via --agent");
128                    return Err(acp::Error::invalid_params());
129                }
130                resolve_agent_spec(&mode_catalog.catalog, agent)
131                    .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(agent.clone()) })
132            }
133            InitialSessionSelection::Model { model, reasoning_effort } => {
134                let model = parse_available_model(model, &mode_catalog.available)?;
135                Ok(ResolvedInitialSession {
136                    spec: AgentSpec::default_spec(&model, *reasoning_effort, Vec::new()),
137                    selected_mode: None,
138                })
139            }
140        }
141    }
142
143    async fn load_mode_catalog(&self, cwd: &Path) -> Result<SessionModeCatalog, acp::Error> {
144        let config = if let Some(source) = self.settings_source.clone() {
145            AetherSettings::load(cwd, [source])
146        } else {
147            AetherSettings::load_default(cwd)
148        }
149        .map_err(|e| {
150            error!("Failed to load agent catalog: {e}");
151            acp::Error::invalid_params()
152        })?;
153        let catalog = if config.agents.is_empty() {
154            AgentCatalog::empty(cwd.to_path_buf())
155        } else {
156            AgentCatalog::from_settings(cwd, config).map_err(|e| {
157                error!("Failed to load agent catalog: {e}");
158                acp::Error::invalid_params()
159            })?
160        };
161
162        let available = get_local_models().await;
163        let specs: Vec<_> = catalog.user_invocable().cloned().collect();
164        let modes = validated_modes_from_specs(&specs, &available);
165
166        Ok(SessionModeCatalog { catalog, modes, available })
167    }
168
169    #[allow(clippy::too_many_arguments, clippy::similar_names)]
170    async fn register_session(
171        &self,
172        session: Session,
173        session_id: &str,
174        acp_session_id: &SessionId,
175        model: &str,
176        selected_mode: Option<String>,
177        reasoning_effort: Option<ReasoningEffort>,
178        modes: Vec<ValidatedMode>,
179        cx: &ConnectionTo<Client>,
180    ) -> Vec<acp::SessionConfigOption> {
181        let relay = spawn_relay(session, cx.clone(), acp_session_id.clone(), self.session_store.clone());
182
183        self.registry
184            .insert(
185                session_id.to_string(),
186                relay,
187                model.to_string(),
188                selected_mode.clone(),
189                reasoning_effort,
190                modes.clone(),
191            )
192            .await;
193
194        let available = get_local_models().await;
195        let all_models = get_all_models(&available);
196        build_config_options_from_modes(
197            &modes,
198            &available,
199            selected_mode.as_deref(),
200            model,
201            reasoning_effort,
202            &all_models,
203            &OAuthCredentialStore::default(),
204        )
205    }
206
207    fn send_available_commands_notification(
208        available_commands: Vec<acp::AvailableCommand>,
209        acp_session_id: SessionId,
210        session_id: &str,
211        cx: &ConnectionTo<Client>,
212    ) {
213        if available_commands.is_empty() {
214            return;
215        }
216        let command_count = available_commands.len();
217        let notification = SessionNotification::new(
218            acp_session_id,
219            SessionUpdate::AvailableCommandsUpdate(AvailableCommandsUpdate::new(available_commands)),
220        );
221        if let Err(e) = cx.send_notification(notification).map_err(|e| AcpServerError::protocol("session/update", e)) {
222            error!("Failed to send available commands notification: {:?}", e);
223        } else {
224            info!("Sent available commands update for session {} ({} commands)", session_id, command_count);
225        }
226    }
227
228    /// Drain every session and stop its relay task. Blocks until every relay
229    /// has exited.
230    pub async fn shutdown_all_sessions(&self) {
231        self.registry.shutdown_all().await;
232    }
233}
234
235fn options_from_snapshot(
236    snapshot: &ConfigSnapshot,
237    available: &[LlmModel],
238    all_models: &[LlmModel],
239    credential_store: &OAuthCredentialStore,
240) -> Vec<acp::SessionConfigOption> {
241    build_config_options_from_modes(
242        &snapshot.modes,
243        available,
244        snapshot.selected_mode.as_deref(),
245        &snapshot.effective_model,
246        snapshot.reasoning_effort,
247        all_models,
248        credential_store,
249    )
250}
251
252/// Merge catalog `all()` with locally-discovered models for the `all_models`
253/// parameter of `build_model_config_option`.
254fn get_all_models(discovered: &[LlmModel]) -> Vec<LlmModel> {
255    let mut all = LlmModel::all().to_vec();
256    for m in discovered {
257        if !all.contains(m) {
258            all.push(m.clone());
259        }
260    }
261    all
262}
263
264fn build_auth_methods(has_credential: impl Fn(&str) -> bool) -> Vec<AuthMethod> {
265    let mut seen = HashSet::new();
266    LlmModel::all()
267        .iter()
268        .filter_map(LlmModel::oauth_provider_id)
269        .filter(|id| seen.insert(*id))
270        .map(|id| {
271            let display = LlmModel::all()
272                .iter()
273                .find(|m| m.oauth_provider_id() == Some(id))
274                .map_or(id, |m| m.provider_display_name());
275            let mut method = acp::AuthMethodAgent::new(id, display);
276            if has_credential(id) {
277                method = method.description("authenticated");
278            }
279            AuthMethod::Agent(method)
280        })
281        .collect()
282}
283
284fn map_acp_to_content_blocks(blocks: Vec<acp::ContentBlock>) -> Vec<ContentBlock> {
285    blocks
286        .into_iter()
287        .map(|block| match block {
288            acp::ContentBlock::Text(t) => ContentBlock::text(t.text),
289            acp::ContentBlock::Image(img) => ContentBlock::Image { data: img.data, mime_type: img.mime_type },
290            acp::ContentBlock::Audio(aud) => ContentBlock::Audio { data: aud.data, mime_type: aud.mime_type },
291            acp::ContentBlock::Resource(r) => ContentBlock::text(format_embedded_resource(&r)),
292            acp::ContentBlock::ResourceLink(l) => ContentBlock::text(format!("[Resource: {}]", l.uri)),
293            _ => ContentBlock::text("[Unknown content]"),
294        })
295        .collect()
296}
297
298fn resolve_agent_spec(catalog: &AgentCatalog, mode_name: &str) -> Result<AgentSpec, acp::Error> {
299    catalog.resolve(mode_name).map_err(|e| {
300        error!("Failed to resolve runtime inputs for mode '{}': {e}", mode_name);
301        acp::Error::invalid_params()
302    })
303}
304
305fn resolve_default_initial_session(
306    mode_catalog: &SessionModeCatalog,
307    default_model: &LlmModel,
308) -> Result<ResolvedInitialSession, acp::Error> {
309    if let Some(mode) = mode_catalog.modes.first() {
310        return resolve_agent_spec(&mode_catalog.catalog, &mode.name)
311            .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(mode.name.clone()) });
312    }
313
314    Ok(ResolvedInitialSession { spec: AgentSpec::default_spec(default_model, None, Vec::new()), selected_mode: None })
315}
316
317fn parse_available_model(model: &str, available: &[LlmModel]) -> Result<LlmModel, acp::Error> {
318    let parsed = model.parse().map_err(|e: String| {
319        warn!("Failed to parse --model `{model}`: {e}");
320        acp::Error::invalid_params()
321    })?;
322    if available.iter().any(|available| available == &parsed) {
323        Ok(parsed)
324    } else {
325        warn!("Requested model `{model}` is not available");
326        Err(acp::Error::invalid_params())
327    }
328}
329
330fn prompt_capabilities_for_models(models: &[LlmModel]) -> PromptCapabilities {
331    PromptCapabilities::new()
332        .embedded_context(true)
333        .image(models.iter().any(LlmModel::supports_image))
334        .audio(models.iter().any(supports_prompt_audio))
335}
336
337fn selected_models(model_value: &str) -> Result<Vec<LlmModel>, acp::Error> {
338    model_value
339        .split(',')
340        .map(str::trim)
341        .filter(|part| !part.is_empty())
342        .map(|part| part.parse::<LlmModel>().map_err(|_| acp::Error::invalid_params()))
343        .collect()
344}
345
346fn validate_prompt_support(model_value: &str, content: &[ContentBlock]) -> Result<(), acp::Error> {
347    let modalities = PromptModalities::from_content(content);
348    if modalities.is_empty() {
349        return Ok(());
350    }
351
352    let selected = selected_models(model_value)?;
353    if modalities.image && selected.iter().any(|model| !model.supports_image()) {
354        return Err(acp::Error::invalid_params());
355    }
356    if modalities.audio && selected.iter().any(|model| !supports_prompt_audio(model)) {
357        return Err(acp::Error::invalid_params());
358    }
359
360    Ok(())
361}
362
363#[cfg(test)]
364#[allow(clippy::items_after_test_module)]
365mod tests {
366    use super::*;
367    use agent_client_protocol::schema::{InitializeRequest, ProtocolVersion};
368
369    const SONNET: &str = "anthropic:claude-sonnet-4-5";
370    const DEEPSEEK: &str = "deepseek:deepseek-chat";
371
372    #[tokio::test]
373    async fn initialize_always_advertises_load_session_support() {
374        let session_store =
375            SessionStore::new().map_or_else(|e| panic!("Failed to initialize session store: {e}"), Arc::new);
376        let manager = SessionManager::new(SessionManagerConfig {
377            registry: Arc::new(SessionRegistry::new()),
378            session_store,
379            has_oauth_credential: |_| false,
380            initial_selection: InitialSessionSelection::default(),
381            settings_source: None,
382        });
383        let response =
384            manager.initialize(InitializeRequest::new(ProtocolVersion::LATEST)).await.expect("initialize succeeds");
385        let json = serde_json::to_string(&response).expect("response serializes");
386        assert!(json.contains("\"loadSession\":true"));
387    }
388
389    #[test]
390    fn prompt_capabilities_reflect_available_modalities() {
391        let image_only = prompt_capabilities_for_models(&["anthropic:claude-sonnet-4-5".parse().unwrap()]);
392        assert!(image_only.image);
393        assert!(!image_only.audio);
394
395        let audio_capable =
396            prompt_capabilities_for_models(&["gemini:gemini-live-2.5-flash-preview-native-audio".parse().unwrap()]);
397        assert!(!audio_capable.image);
398        assert!(audio_capable.audio);
399
400        let text_only = prompt_capabilities_for_models(&[DEEPSEEK.parse().unwrap()]);
401        assert!(!text_only.image);
402        assert!(!text_only.audio);
403    }
404
405    #[test]
406    fn validate_prompt_support_requires_all_selected_models_to_support_media() {
407        let image_content = vec![ContentBlock::Image { data: "aW1n".to_string(), mime_type: "image/png".to_string() }];
408        let audio_content =
409            vec![ContentBlock::Audio { data: "YXVkaW8=".to_string(), mime_type: "audio/wav".to_string() }];
410
411        assert!(validate_prompt_support(SONNET, &image_content).is_ok());
412        assert!(validate_prompt_support(DEEPSEEK, &image_content).is_err());
413        assert!(validate_prompt_support("gemini:gemini-live-2.5-flash-preview-native-audio", &audio_content,).is_ok());
414        assert!(validate_prompt_support(SONNET, &audio_content).is_err());
415        assert!(
416            validate_prompt_support("anthropic:claude-sonnet-4-5,deepseek:deepseek-chat", &image_content,).is_err()
417        );
418        assert!(
419            validate_prompt_support(
420                "gemini:gemini-live-2.5-flash-preview-native-audio,deepseek:deepseek-chat",
421                &audio_content,
422            )
423            .is_err()
424        );
425    }
426}
427
428impl SessionManager {
429    pub async fn initialize(&self, args: InitializeRequest) -> Result<InitializeResponse, acp::Error> {
430        info!("Received initialize request: {:?}", args);
431        let auth_methods = build_auth_methods(self.has_oauth_credential);
432        let available = get_local_models().await;
433        Ok(InitializeResponse::new(ProtocolVersion::V1)
434            .agent_info(Implementation::new("Aether", "0.1.0"))
435            .agent_capabilities(
436                AgentCapabilities::new()
437                    .load_session(true)
438                    .mcp_capabilities(McpCapabilities::new().http(true).sse(true))
439                    .session_capabilities(acp::SessionCapabilities::new().list(acp::SessionListCapabilities::new()))
440                    .prompt_capabilities(prompt_capabilities_for_models(&available)),
441            )
442            .auth_methods(auth_methods))
443    }
444
445    pub async fn authenticate(
446        &self,
447        args: AuthenticateRequest,
448        cx: &ConnectionTo<Client>,
449    ) -> Result<AuthenticateResponse, acp::Error> {
450        info!("Received authenticate request: {:?}", args);
451        let method_id = args.method_id.0.as_ref();
452        match method_id {
453            "codex" => {
454                llm::perform_codex_oauth_flow().await.map_err(|e| {
455                    error!("OAuth flow failed for {method_id}: {e}");
456                    acp::Error::internal_error()
457                })?;
458            }
459            _ => return Err(acp::Error::invalid_params()),
460        }
461        let auth_methods = build_auth_methods(self.has_oauth_credential);
462        if let Err(e) = cx
463            .send_notification(AuthMethodsUpdatedParams { auth_methods })
464            .map_err(|e| AcpServerError::protocol("_aether/auth_methods_updated", e))
465        {
466            error!("Failed to send auth methods updated notification: {:?}", e);
467        }
468
469        let credential_store = OAuthCredentialStore::default();
470        let available = get_local_models().await;
471        let all_models = get_all_models(&available);
472        let snapshots = self.registry.snapshot_all_configs().await;
473
474        for (id, snap) in snapshots {
475            let options = options_from_snapshot(&snap, &available, &all_models, &credential_store);
476            let notification = SessionNotification::new(
477                SessionId::new(id),
478                SessionUpdate::ConfigOptionUpdate(ConfigOptionUpdate::new(options)),
479            );
480            let _ = cx.send_notification(notification);
481        }
482
483        Ok(AuthenticateResponse::default())
484    }
485
486    pub async fn new_session(
487        &self,
488        mut args: NewSessionRequest,
489        cx: &ConnectionTo<Client>,
490    ) -> Result<NewSessionResponse, acp::Error> {
491        // Inside a sandbox container the client sends the *host* cwd, but the
492        // project is mounted at the container's working directory.
493        if std::env::var("AETHER_INSIDE_SANDBOX").is_ok() {
494            let container_cwd = std::env::current_dir().unwrap_or_else(|_| "/workspace".into());
495            info!("Sandbox: remapping cwd {:?} -> {:?}", args.cwd, container_cwd);
496            args.cwd = container_cwd;
497        }
498
499        info!("Creating new session with cwd: {:?}", args.cwd);
500        let session_id = uuid::Uuid::new_v4().to_string();
501        let acp_session_id = acp::SessionId::new(session_id.clone());
502
503        let mode_catalog = self.load_mode_catalog(&args.cwd).await?;
504        let default_model = pick_default_model(&mode_catalog.available).ok_or_else(|| {
505            error!("No models available — set an API key env var (e.g. ANTHROPIC_API_KEY)");
506            acp::Error::internal_error()
507        })?;
508
509        let ResolvedInitialSession { spec, selected_mode } =
510            self.resolve_initial_session(&mode_catalog, default_model)?;
511        let model_str = spec.model.clone();
512        let reasoning_effort = spec.reasoning_effort;
513
514        let session =
515            Session::new(spec, args.cwd.clone(), map_acp_mcp_servers(args.mcp_servers), None, Some(session_id.clone()))
516                .await
517                .map_err(|e| {
518                    error!("Failed to create session: {}", e);
519                    acp::Error::internal_error()
520                })?;
521
522        let available_commands = session.list_available_commands().await.map_err(|e| {
523            error!("Failed to list available commands: {}", e);
524            acp::Error::internal_error()
525        })?;
526
527        let meta = SessionMeta {
528            session_id: session_id.clone(),
529            cwd: args.cwd.clone(),
530            model: model_str.clone(),
531            selected_mode: selected_mode.clone(),
532            created_at: IsoString::now().0,
533        };
534        if let Err(e) = self.session_store.append_meta(&session_id, &meta) {
535            error!("Failed to write session meta: {e}");
536        }
537
538        let config_options = self
539            .register_session(
540                session,
541                &session_id,
542                &acp_session_id,
543                &model_str,
544                selected_mode,
545                reasoning_effort,
546                mode_catalog.modes,
547                cx,
548            )
549            .await;
550
551        info!("Session {} created successfully", session_id);
552
553        let response = NewSessionResponse::new(acp_session_id.clone()).config_options(config_options);
554
555        Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
556
557        Ok(response)
558    }
559
560    pub fn list_sessions(&self, args: &ListSessionsRequest) -> Result<ListSessionsResponse, acp::Error> {
561        info!("Listing sessions, cwd filter: {:?}", args.cwd);
562        let mut summaries = self.session_store.list();
563
564        if let Some(cwd) = args.cwd.as_ref() {
565            summaries.retain(|s| s.meta.cwd == *cwd);
566        }
567
568        let sessions: Vec<acp::SessionInfo> = summaries
569            .into_iter()
570            .map(|s| acp::SessionInfo::new(s.meta.session_id, s.meta.cwd).updated_at(s.meta.created_at).title(s.title))
571            .collect();
572
573        info!("Found {} sessions", sessions.len());
574        Ok(ListSessionsResponse::new(sessions))
575    }
576
577    pub async fn load_session(
578        &self,
579        args: LoadSessionRequest,
580        cx: &ConnectionTo<Client>,
581    ) -> Result<LoadSessionResponse, acp::Error> {
582        let session_id = args.session_id.0.to_string();
583        info!("Loading session: {session_id}");
584
585        let (meta, events) = self.session_store.load(&session_id).ok_or_else(|| {
586            error!("Session not found: {session_id}");
587            acp::Error::invalid_params()
588        })?;
589
590        let context = Context::from_events(&events);
591        let mode_catalog = self.load_mode_catalog(&args.cwd).await?;
592
593        let spec = if let Some(mode_name) = meta.selected_mode.as_deref() {
594            resolve_agent_spec(&mode_catalog.catalog, mode_name)?
595        } else {
596            let parsed_model: LlmModel = meta.model.parse().map_err(|e: String| {
597                error!("Failed to parse restored model '{}': {e}", meta.model);
598                acp::Error::invalid_params()
599            })?;
600            AgentSpec::default_spec(&parsed_model, None, Vec::new())
601        };
602
603        let model = spec.model.clone();
604
605        let restored_messages: Vec<_> = context.messages().iter().filter(|m| !m.is_system()).cloned().collect();
606
607        let session = Session::new(
608            spec,
609            args.cwd.clone(),
610            map_acp_mcp_servers(args.mcp_servers),
611            Some(restored_messages),
612            Some(session_id.clone()),
613        )
614        .await
615        .map_err(|e| {
616            error!("Failed to create session for load: {e}");
617            acp::Error::internal_error()
618        })?;
619
620        let available_commands = session.list_available_commands().await.map_err(|e| {
621            error!("Failed to list available commands: {e}");
622            acp::Error::internal_error()
623        })?;
624
625        let acp_session_id = acp::SessionId::new(session_id.clone());
626
627        let config_options = self
628            .register_session(
629                session,
630                &session_id,
631                &acp_session_id,
632                &model,
633                meta.selected_mode,
634                None,
635                mode_catalog.modes,
636                cx,
637            )
638            .await;
639
640        info!("Session {session_id} loaded successfully");
641
642        let response = LoadSessionResponse::new().config_options(config_options);
643
644        let cx_clone = cx.clone();
645        let replay_session_id = acp_session_id.clone();
646        spawn(async move {
647            replay_to_client(&events, &cx_clone, &replay_session_id).await;
648        });
649
650        Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
651
652        Ok(response)
653    }
654
655    pub async fn prompt(&self, args: acp::PromptRequest) -> Result<acp::PromptResponse, acp::Error> {
656        info!("Received prompt for session: {:?}", args.session_id);
657        let session_id_str = args.session_id.0.to_string();
658        let content = map_acp_to_content_blocks(args.prompt);
659
660        let model = self.registry.effective_model(&session_id_str).await.ok_or_else(|| {
661            error!("Session not found: {}", session_id_str);
662            acp::Error::invalid_params()
663        })?;
664        validate_prompt_support(&model, &content)?;
665
666        let dispatch = self.registry.begin_prompt(&session_id_str).await.ok_or_else(|| {
667            error!("Session not found: {}", session_id_str);
668            acp::Error::invalid_params()
669        })?;
670
671        let (result_tx, result_rx) = oneshot::channel();
672        dispatch
673            .relay_tx
674            .send(SessionCommand::Prompt {
675                content,
676                switch_model: dispatch.switch_model,
677                reasoning_effort: dispatch.reasoning_effort,
678                result_tx,
679            })
680            .await
681            .map_err(|_| {
682                error!("Relay channel closed for session {}", session_id_str);
683                acp::Error::internal_error()
684            })?;
685
686        let stop_reason = result_rx
687            .await
688            .map_err(|_| {
689                error!("Relay dropped result channel for session {}", session_id_str);
690                acp::Error::internal_error()
691            })?
692            .map_err(|e| {
693                error!("Relay error for session {}: {}", session_id_str, e);
694                acp::Error::internal_error()
695            })?;
696
697        info!("Prompt completed with stop reason: {:?}", stop_reason);
698        Ok(PromptResponse::new(stop_reason))
699    }
700
701    pub async fn cancel(&self, args: acp::CancelNotification) -> Result<(), acp::Error> {
702        info!("Received cancel for session: {:?}", args.session_id);
703        let session_id_str = args.session_id.0.to_string();
704        let relay = self.registry.relay(&session_id_str).await.ok_or_else(|| {
705            error!("Session not found for cancel: {}", session_id_str);
706            acp::Error::invalid_params()
707        })?;
708
709        relay.cmd.send(SessionCommand::Cancel).await.map_err(|_| {
710            error!("Relay channel closed for cancel: {}", session_id_str);
711            acp::Error::internal_error()
712        })?;
713
714        Ok(())
715    }
716
717    pub async fn set_session_config_option(
718        &self,
719        args: SetSessionConfigOptionRequest,
720    ) -> Result<SetSessionConfigOptionResponse, acp::Error> {
721        let session_id_str = args.session_id.0.to_string();
722        let config_id = args.config_id.0.to_string();
723        let value = args.value.0.to_string();
724
725        info!("set_session_config_option: session={}, config={}, value={}", session_id_str, config_id, value);
726
727        let setting = ConfigSetting::parse(&config_id, &value).map_err(|e| {
728            error!("{e}");
729            acp::Error::invalid_params()
730        })?;
731
732        let available = get_local_models().await;
733        let all_models = get_all_models(&available);
734
735        let snapshot =
736            self.registry.apply_config_change(&session_id_str, &setting, &available).await.ok_or_else(|| {
737                error!("Session not found: {}", session_id_str);
738                acp::Error::invalid_params()
739            })??;
740
741        let options = options_from_snapshot(&snapshot, &available, &all_models, &OAuthCredentialStore::default());
742        Ok(SetSessionConfigOptionResponse::new(options))
743    }
744
745    pub async fn on_mcp_request(&self, request: McpRequest) -> Result<(), acp::Error> {
746        info!("Received MCP ext request: {:?}", request);
747        match request {
748            McpRequest::Authenticate { session_id, server_name } => {
749                let relay = self.registry.relay(&session_id).await.ok_or_else(|| {
750                    error!("Session not found for authenticate_mcp_server: {}", session_id);
751                    acp::Error::invalid_params()
752                })?;
753
754                relay.mcp_request.send(McpRequest::Authenticate { session_id, server_name }).await.map_err(|_| {
755                    error!("MCP request channel closed for session");
756                    acp::Error::internal_error()
757                })?;
758            }
759        }
760
761        Ok(())
762    }
763}