Skip to main content

aether_cli/acp/
session_manager.rs

1use acp_utils::notifications::{AuthMethodsUpdatedParams, McpRequest};
2use acp_utils::server::AcpServerError;
3use agent_client_protocol::schema::{
4    self as acp, AgentCapabilities, AuthMethod, AuthenticateRequest, AuthenticateResponse, AvailableCommandsUpdate,
5    ConfigOptionUpdate, Implementation, InitializeRequest, InitializeResponse, ListSessionsRequest,
6    ListSessionsResponse, LoadSessionRequest, LoadSessionResponse, McpCapabilities, NewSessionRequest,
7    NewSessionResponse, PromptCapabilities, PromptResponse, ProtocolVersion, SessionId, SessionNotification,
8    SessionUpdate, SetSessionConfigOptionRequest, SetSessionConfigOptionResponse,
9};
10use agent_client_protocol::{Client, ConnectionTo};
11use llm::catalog::{LlmModel, get_local_models};
12use llm::oauth::OAuthCredentialStore;
13use llm::types::IsoString;
14use llm::{ContentBlock, ReasoningEffort};
15use std::collections::HashSet;
16use std::path::Path;
17use std::sync::Arc;
18use tokio::spawn;
19use tokio::sync::oneshot;
20use tracing::{error, info, warn};
21
22use super::config_setting::ConfigSetting;
23use super::mappers::{map_acp_mcp_servers, replay_to_client};
24use super::model_config::{
25    ValidatedMode, build_config_options_from_modes, pick_default_model, supports_prompt_audio,
26    validated_modes_from_specs,
27};
28use super::relay::{SessionCommand, spawn_relay};
29use super::session::Session;
30use super::session_registry::{ConfigSnapshot, SessionRegistry};
31use super::session_store::{SessionMeta, SessionStore};
32use acp_utils::content::format_embedded_resource;
33use aether_core::agent_spec::AgentSpec;
34use aether_core::context::ext::ContextExt;
35use aether_project::{AgentCatalog, load_agent_catalog};
36use llm::Context;
37
38/// Initial session selection supplied when `aether acp` starts.
39#[derive(Clone, Debug, Default)]
40pub enum InitialSessionSelection {
41    #[default]
42    Default,
43    Agent(String),
44    Model {
45        model: String,
46        reasoning_effort: Option<ReasoningEffort>,
47    },
48}
49
50impl InitialSessionSelection {
51    pub fn agent(name: String) -> Self {
52        Self::Agent(name)
53    }
54
55    pub fn model(model: String, reasoning_effort: Option<ReasoningEffort>) -> Self {
56        Self::Model { model, reasoning_effort }
57    }
58}
59
60/// Manages ACP sessions, each session has its own agent and state
61pub struct SessionManager {
62    registry: Arc<SessionRegistry>,
63    session_store: Arc<SessionStore>,
64    has_oauth_credential: fn(&str) -> bool,
65    initial_selection: InitialSessionSelection,
66}
67
68pub(crate) struct SessionManagerConfig {
69    pub(crate) registry: Arc<SessionRegistry>,
70    pub(crate) session_store: Arc<SessionStore>,
71    pub(crate) has_oauth_credential: fn(&str) -> bool,
72    pub(crate) initial_selection: InitialSessionSelection,
73}
74
75struct SessionModeCatalog {
76    catalog: AgentCatalog,
77    modes: Vec<ValidatedMode>,
78    available: Vec<LlmModel>,
79}
80
81struct ResolvedInitialSession {
82    spec: AgentSpec,
83    selected_mode: Option<String>,
84}
85
86#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
87struct PromptModalities {
88    image: bool,
89    audio: bool,
90}
91
92impl PromptModalities {
93    fn from_content(content: &[ContentBlock]) -> Self {
94        Self {
95            image: content.iter().any(ContentBlock::is_image),
96            audio: content.iter().any(|block| matches!(block, ContentBlock::Audio { .. })),
97        }
98    }
99
100    fn is_empty(self) -> bool {
101        !self.image && !self.audio
102    }
103}
104
105impl SessionManager {
106    pub(crate) fn new(deps: SessionManagerConfig) -> Self {
107        Self {
108            registry: deps.registry,
109            session_store: deps.session_store,
110            has_oauth_credential: deps.has_oauth_credential,
111            initial_selection: deps.initial_selection,
112        }
113    }
114
115    fn resolve_initial_session(
116        &self,
117        mode_catalog: &SessionModeCatalog,
118        default_model: &LlmModel,
119        cwd: &Path,
120    ) -> Result<ResolvedInitialSession, acp::Error> {
121        match &self.initial_selection {
122            InitialSessionSelection::Default => resolve_default_initial_session(mode_catalog, default_model, cwd),
123            InitialSessionSelection::Agent(agent) => {
124                if !mode_catalog.modes.iter().any(|mode| mode.name == *agent) {
125                    warn!("Unknown agent `{agent}` requested via --agent");
126                    return Err(acp::Error::invalid_params());
127                }
128                resolve_agent_spec(&mode_catalog.catalog, agent, cwd)
129                    .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(agent.clone()) })
130            }
131            InitialSessionSelection::Model { model, reasoning_effort } => {
132                let model = parse_available_model(model, &mode_catalog.available)?;
133                Ok(ResolvedInitialSession {
134                    spec: mode_catalog.catalog.resolve_default(&model, *reasoning_effort, cwd),
135                    selected_mode: None,
136                })
137            }
138        }
139    }
140
141    async fn load_mode_catalog(cwd: &Path) -> Result<SessionModeCatalog, acp::Error> {
142        let catalog = load_agent_catalog(cwd).map_err(|e| {
143            error!("Failed to load agent catalog: {e}");
144            acp::Error::invalid_params()
145        })?;
146
147        let available = get_local_models().await;
148        let specs: Vec<_> = catalog.user_invocable().cloned().collect();
149        let modes = validated_modes_from_specs(&specs, &available);
150
151        Ok(SessionModeCatalog { catalog, modes, available })
152    }
153
154    #[allow(clippy::too_many_arguments, clippy::similar_names)]
155    async fn register_session(
156        &self,
157        session: Session,
158        session_id: &str,
159        acp_session_id: &SessionId,
160        model: &str,
161        selected_mode: Option<String>,
162        reasoning_effort: Option<ReasoningEffort>,
163        modes: Vec<ValidatedMode>,
164        cx: &ConnectionTo<Client>,
165    ) -> Vec<acp::SessionConfigOption> {
166        let relay = spawn_relay(session, cx.clone(), acp_session_id.clone(), self.session_store.clone());
167
168        self.registry
169            .insert(
170                session_id.to_string(),
171                relay,
172                model.to_string(),
173                selected_mode.clone(),
174                reasoning_effort,
175                modes.clone(),
176            )
177            .await;
178
179        let available = get_local_models().await;
180        let all_models = get_all_models(&available);
181        build_config_options_from_modes(
182            &modes,
183            &available,
184            selected_mode.as_deref(),
185            model,
186            reasoning_effort,
187            &all_models,
188            &OAuthCredentialStore::default(),
189        )
190    }
191
192    fn send_available_commands_notification(
193        available_commands: Vec<acp::AvailableCommand>,
194        acp_session_id: SessionId,
195        session_id: &str,
196        cx: &ConnectionTo<Client>,
197    ) {
198        if available_commands.is_empty() {
199            return;
200        }
201        let command_count = available_commands.len();
202        let notification = SessionNotification::new(
203            acp_session_id,
204            SessionUpdate::AvailableCommandsUpdate(AvailableCommandsUpdate::new(available_commands)),
205        );
206        if let Err(e) = cx.send_notification(notification).map_err(|e| AcpServerError::protocol("session/update", e)) {
207            error!("Failed to send available commands notification: {:?}", e);
208        } else {
209            info!("Sent available commands update for session {} ({} commands)", session_id, command_count);
210        }
211    }
212
213    /// Drain every session and stop its relay task. Blocks until every relay
214    /// has exited.
215    pub async fn shutdown_all_sessions(&self) {
216        self.registry.shutdown_all().await;
217    }
218}
219
220fn options_from_snapshot(
221    snapshot: &ConfigSnapshot,
222    available: &[LlmModel],
223    all_models: &[LlmModel],
224    credential_store: &OAuthCredentialStore,
225) -> Vec<acp::SessionConfigOption> {
226    build_config_options_from_modes(
227        &snapshot.modes,
228        available,
229        snapshot.selected_mode.as_deref(),
230        &snapshot.effective_model,
231        snapshot.reasoning_effort,
232        all_models,
233        credential_store,
234    )
235}
236
237/// Merge catalog `all()` with locally-discovered models for the `all_models`
238/// parameter of `build_model_config_option`.
239fn get_all_models(discovered: &[LlmModel]) -> Vec<LlmModel> {
240    let mut all = LlmModel::all().to_vec();
241    for m in discovered {
242        if !all.contains(m) {
243            all.push(m.clone());
244        }
245    }
246    all
247}
248
249fn build_auth_methods(has_credential: impl Fn(&str) -> bool) -> Vec<AuthMethod> {
250    let mut seen = HashSet::new();
251    LlmModel::all()
252        .iter()
253        .filter_map(LlmModel::oauth_provider_id)
254        .filter(|id| seen.insert(*id))
255        .map(|id| {
256            let display = LlmModel::all()
257                .iter()
258                .find(|m| m.oauth_provider_id() == Some(id))
259                .map_or(id, |m| m.provider_display_name());
260            let mut method = acp::AuthMethodAgent::new(id, display);
261            if has_credential(id) {
262                method = method.description("authenticated");
263            }
264            AuthMethod::Agent(method)
265        })
266        .collect()
267}
268
269fn map_acp_to_content_blocks(blocks: Vec<acp::ContentBlock>) -> Vec<ContentBlock> {
270    blocks
271        .into_iter()
272        .map(|block| match block {
273            acp::ContentBlock::Text(t) => ContentBlock::text(t.text),
274            acp::ContentBlock::Image(img) => ContentBlock::Image { data: img.data, mime_type: img.mime_type },
275            acp::ContentBlock::Audio(aud) => ContentBlock::Audio { data: aud.data, mime_type: aud.mime_type },
276            acp::ContentBlock::Resource(r) => ContentBlock::text(format_embedded_resource(&r)),
277            acp::ContentBlock::ResourceLink(l) => ContentBlock::text(format!("[Resource: {}]", l.uri)),
278            _ => ContentBlock::text("[Unknown content]"),
279        })
280        .collect()
281}
282
283fn resolve_agent_spec(catalog: &AgentCatalog, mode_name: &str, cwd: &Path) -> Result<AgentSpec, acp::Error> {
284    catalog.resolve(mode_name, cwd).map_err(|e| {
285        error!("Failed to resolve runtime inputs for mode '{}': {e}", mode_name);
286        acp::Error::invalid_params()
287    })
288}
289
290fn resolve_default_initial_session(
291    mode_catalog: &SessionModeCatalog,
292    default_model: &LlmModel,
293    cwd: &Path,
294) -> Result<ResolvedInitialSession, acp::Error> {
295    if let Some(mode) = mode_catalog.modes.first() {
296        return resolve_agent_spec(&mode_catalog.catalog, &mode.name, cwd)
297            .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(mode.name.clone()) });
298    }
299
300    Ok(ResolvedInitialSession {
301        spec: mode_catalog.catalog.resolve_default(default_model, None, cwd),
302        selected_mode: None,
303    })
304}
305
306fn parse_available_model(model: &str, available: &[LlmModel]) -> Result<LlmModel, acp::Error> {
307    let parsed = model.parse().map_err(|e: String| {
308        warn!("Failed to parse --model `{model}`: {e}");
309        acp::Error::invalid_params()
310    })?;
311    if available.iter().any(|available| available == &parsed) {
312        Ok(parsed)
313    } else {
314        warn!("Requested model `{model}` is not available");
315        Err(acp::Error::invalid_params())
316    }
317}
318
319fn prompt_capabilities_for_models(models: &[LlmModel]) -> PromptCapabilities {
320    PromptCapabilities::new()
321        .embedded_context(true)
322        .image(models.iter().any(LlmModel::supports_image))
323        .audio(models.iter().any(supports_prompt_audio))
324}
325
326fn selected_models(model_value: &str) -> Result<Vec<LlmModel>, acp::Error> {
327    model_value
328        .split(',')
329        .map(str::trim)
330        .filter(|part| !part.is_empty())
331        .map(|part| part.parse::<LlmModel>().map_err(|_| acp::Error::invalid_params()))
332        .collect()
333}
334
335fn validate_prompt_support(model_value: &str, content: &[ContentBlock]) -> Result<(), acp::Error> {
336    let modalities = PromptModalities::from_content(content);
337    if modalities.is_empty() {
338        return Ok(());
339    }
340
341    let selected = selected_models(model_value)?;
342    if modalities.image && selected.iter().any(|model| !model.supports_image()) {
343        return Err(acp::Error::invalid_params());
344    }
345    if modalities.audio && selected.iter().any(|model| !supports_prompt_audio(model)) {
346        return Err(acp::Error::invalid_params());
347    }
348
349    Ok(())
350}
351
352#[cfg(test)]
353#[allow(clippy::items_after_test_module)]
354mod tests {
355    use super::*;
356    use agent_client_protocol::schema::{InitializeRequest, ProtocolVersion};
357
358    const SONNET: &str = "anthropic:claude-sonnet-4-5";
359    const DEEPSEEK: &str = "deepseek:deepseek-chat";
360
361    #[tokio::test]
362    async fn initialize_always_advertises_load_session_support() {
363        let session_store =
364            SessionStore::new().map_or_else(|e| panic!("Failed to initialize session store: {e}"), Arc::new);
365        let manager = SessionManager::new(SessionManagerConfig {
366            registry: Arc::new(SessionRegistry::new()),
367            session_store,
368            has_oauth_credential: |_| false,
369            initial_selection: InitialSessionSelection::default(),
370        });
371        let response =
372            manager.initialize(InitializeRequest::new(ProtocolVersion::LATEST)).await.expect("initialize succeeds");
373        let json = serde_json::to_string(&response).expect("response serializes");
374        assert!(json.contains("\"loadSession\":true"));
375    }
376
377    #[test]
378    fn prompt_capabilities_reflect_available_modalities() {
379        let image_only = prompt_capabilities_for_models(&["anthropic:claude-sonnet-4-5".parse().unwrap()]);
380        assert!(image_only.image);
381        assert!(!image_only.audio);
382
383        let audio_capable =
384            prompt_capabilities_for_models(&["gemini:gemini-live-2.5-flash-preview-native-audio".parse().unwrap()]);
385        assert!(!audio_capable.image);
386        assert!(audio_capable.audio);
387
388        let text_only = prompt_capabilities_for_models(&[DEEPSEEK.parse().unwrap()]);
389        assert!(!text_only.image);
390        assert!(!text_only.audio);
391    }
392
393    #[test]
394    fn validate_prompt_support_requires_all_selected_models_to_support_media() {
395        let image_content = vec![ContentBlock::Image { data: "aW1n".to_string(), mime_type: "image/png".to_string() }];
396        let audio_content =
397            vec![ContentBlock::Audio { data: "YXVkaW8=".to_string(), mime_type: "audio/wav".to_string() }];
398
399        assert!(validate_prompt_support(SONNET, &image_content).is_ok());
400        assert!(validate_prompt_support(DEEPSEEK, &image_content).is_err());
401        assert!(validate_prompt_support("gemini:gemini-live-2.5-flash-preview-native-audio", &audio_content,).is_ok());
402        assert!(validate_prompt_support(SONNET, &audio_content).is_err());
403        assert!(
404            validate_prompt_support("anthropic:claude-sonnet-4-5,deepseek:deepseek-chat", &image_content,).is_err()
405        );
406        assert!(
407            validate_prompt_support(
408                "gemini:gemini-live-2.5-flash-preview-native-audio,deepseek:deepseek-chat",
409                &audio_content,
410            )
411            .is_err()
412        );
413    }
414}
415
416impl SessionManager {
417    pub async fn initialize(&self, args: InitializeRequest) -> Result<InitializeResponse, acp::Error> {
418        info!("Received initialize request: {:?}", args);
419        let auth_methods = build_auth_methods(self.has_oauth_credential);
420        let available = get_local_models().await;
421        Ok(InitializeResponse::new(ProtocolVersion::V1)
422            .agent_info(Implementation::new("Aether", "0.1.0"))
423            .agent_capabilities(
424                AgentCapabilities::new()
425                    .load_session(true)
426                    .mcp_capabilities(McpCapabilities::new().http(true).sse(true))
427                    .session_capabilities(acp::SessionCapabilities::new().list(acp::SessionListCapabilities::new()))
428                    .prompt_capabilities(prompt_capabilities_for_models(&available)),
429            )
430            .auth_methods(auth_methods))
431    }
432
433    pub async fn authenticate(
434        &self,
435        args: AuthenticateRequest,
436        cx: &ConnectionTo<Client>,
437    ) -> Result<AuthenticateResponse, acp::Error> {
438        info!("Received authenticate request: {:?}", args);
439        let method_id = args.method_id.0.as_ref();
440        match method_id {
441            "codex" => {
442                llm::perform_codex_oauth_flow().await.map_err(|e| {
443                    error!("OAuth flow failed for {method_id}: {e}");
444                    acp::Error::internal_error()
445                })?;
446            }
447            _ => return Err(acp::Error::invalid_params()),
448        }
449        let auth_methods = build_auth_methods(self.has_oauth_credential);
450        if let Err(e) = cx
451            .send_notification(AuthMethodsUpdatedParams { auth_methods })
452            .map_err(|e| AcpServerError::protocol("_aether/auth_methods_updated", e))
453        {
454            error!("Failed to send auth methods updated notification: {:?}", e);
455        }
456
457        let credential_store = OAuthCredentialStore::default();
458        let available = get_local_models().await;
459        let all_models = get_all_models(&available);
460        let snapshots = self.registry.snapshot_all_configs().await;
461
462        for (id, snap) in snapshots {
463            let options = options_from_snapshot(&snap, &available, &all_models, &credential_store);
464            let notification = SessionNotification::new(
465                SessionId::new(id),
466                SessionUpdate::ConfigOptionUpdate(ConfigOptionUpdate::new(options)),
467            );
468            let _ = cx.send_notification(notification);
469        }
470
471        Ok(AuthenticateResponse::default())
472    }
473
474    pub async fn new_session(
475        &self,
476        mut args: NewSessionRequest,
477        cx: &ConnectionTo<Client>,
478    ) -> Result<NewSessionResponse, acp::Error> {
479        // Inside a sandbox container the client sends the *host* cwd, but the
480        // project is mounted at the container's working directory.
481        if std::env::var("AETHER_INSIDE_SANDBOX").is_ok() {
482            let container_cwd = std::env::current_dir().unwrap_or_else(|_| "/workspace".into());
483            info!("Sandbox: remapping cwd {:?} -> {:?}", args.cwd, container_cwd);
484            args.cwd = container_cwd;
485        }
486
487        info!("Creating new session with cwd: {:?}", args.cwd);
488        let session_id = uuid::Uuid::new_v4().to_string();
489        let acp_session_id = acp::SessionId::new(session_id.clone());
490
491        let mode_catalog = Self::load_mode_catalog(&args.cwd).await?;
492        let default_model = pick_default_model(&mode_catalog.available).ok_or_else(|| {
493            error!("No models available — set an API key env var (e.g. ANTHROPIC_API_KEY)");
494            acp::Error::internal_error()
495        })?;
496
497        let ResolvedInitialSession { spec, selected_mode } =
498            self.resolve_initial_session(&mode_catalog, default_model, &args.cwd)?;
499        let model_str = spec.model.clone();
500        let reasoning_effort = spec.reasoning_effort;
501
502        let session =
503            Session::new(spec, args.cwd.clone(), map_acp_mcp_servers(args.mcp_servers), None, Some(session_id.clone()))
504                .await
505                .map_err(|e| {
506                    error!("Failed to create session: {}", e);
507                    acp::Error::internal_error()
508                })?;
509
510        let available_commands = session.list_available_commands().await.map_err(|e| {
511            error!("Failed to list available commands: {}", e);
512            acp::Error::internal_error()
513        })?;
514
515        let meta = SessionMeta {
516            session_id: session_id.clone(),
517            cwd: args.cwd.clone(),
518            model: model_str.clone(),
519            selected_mode: selected_mode.clone(),
520            created_at: IsoString::now().0,
521        };
522        if let Err(e) = self.session_store.append_meta(&session_id, &meta) {
523            error!("Failed to write session meta: {e}");
524        }
525
526        let config_options = self
527            .register_session(
528                session,
529                &session_id,
530                &acp_session_id,
531                &model_str,
532                selected_mode,
533                reasoning_effort,
534                mode_catalog.modes,
535                cx,
536            )
537            .await;
538
539        info!("Session {} created successfully", session_id);
540
541        let response = NewSessionResponse::new(acp_session_id.clone()).config_options(config_options);
542
543        Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
544
545        Ok(response)
546    }
547
548    pub fn list_sessions(&self, args: &ListSessionsRequest) -> Result<ListSessionsResponse, acp::Error> {
549        info!("Listing sessions, cwd filter: {:?}", args.cwd);
550        let mut summaries = self.session_store.list();
551
552        if let Some(cwd) = args.cwd.as_ref() {
553            summaries.retain(|s| s.meta.cwd == *cwd);
554        }
555
556        let sessions: Vec<acp::SessionInfo> = summaries
557            .into_iter()
558            .map(|s| acp::SessionInfo::new(s.meta.session_id, s.meta.cwd).updated_at(s.meta.created_at).title(s.title))
559            .collect();
560
561        info!("Found {} sessions", sessions.len());
562        Ok(ListSessionsResponse::new(sessions))
563    }
564
565    pub async fn load_session(
566        &self,
567        args: LoadSessionRequest,
568        cx: &ConnectionTo<Client>,
569    ) -> Result<LoadSessionResponse, acp::Error> {
570        let session_id = args.session_id.0.to_string();
571        info!("Loading session: {session_id}");
572
573        let (meta, events) = self.session_store.load(&session_id).ok_or_else(|| {
574            error!("Session not found: {session_id}");
575            acp::Error::invalid_params()
576        })?;
577
578        let context = Context::from_events(&events);
579        let mode_catalog = Self::load_mode_catalog(&args.cwd).await?;
580
581        let spec = if let Some(mode_name) = meta.selected_mode.as_deref() {
582            resolve_agent_spec(&mode_catalog.catalog, mode_name, &args.cwd)?
583        } else {
584            let parsed_model: LlmModel = meta.model.parse().map_err(|e: String| {
585                error!("Failed to parse restored model '{}': {e}", meta.model);
586                acp::Error::invalid_params()
587            })?;
588            mode_catalog.catalog.resolve_default(&parsed_model, None, &args.cwd)
589        };
590
591        let model = spec.model.clone();
592
593        let restored_messages: Vec<_> = context.messages().iter().filter(|m| !m.is_system()).cloned().collect();
594
595        let session = Session::new(
596            spec,
597            args.cwd.clone(),
598            map_acp_mcp_servers(args.mcp_servers),
599            Some(restored_messages),
600            Some(session_id.clone()),
601        )
602        .await
603        .map_err(|e| {
604            error!("Failed to create session for load: {e}");
605            acp::Error::internal_error()
606        })?;
607
608        let available_commands = session.list_available_commands().await.map_err(|e| {
609            error!("Failed to list available commands: {e}");
610            acp::Error::internal_error()
611        })?;
612
613        let acp_session_id = acp::SessionId::new(session_id.clone());
614
615        let config_options = self
616            .register_session(
617                session,
618                &session_id,
619                &acp_session_id,
620                &model,
621                meta.selected_mode,
622                None,
623                mode_catalog.modes,
624                cx,
625            )
626            .await;
627
628        info!("Session {session_id} loaded successfully");
629
630        let response = LoadSessionResponse::new().config_options(config_options);
631
632        let cx_clone = cx.clone();
633        let replay_session_id = acp_session_id.clone();
634        spawn(async move {
635            replay_to_client(&events, &cx_clone, &replay_session_id).await;
636        });
637
638        Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
639
640        Ok(response)
641    }
642
643    pub async fn prompt(&self, args: acp::PromptRequest) -> Result<acp::PromptResponse, acp::Error> {
644        info!("Received prompt for session: {:?}", args.session_id);
645        let session_id_str = args.session_id.0.to_string();
646        let content = map_acp_to_content_blocks(args.prompt);
647
648        let model = self.registry.effective_model(&session_id_str).await.ok_or_else(|| {
649            error!("Session not found: {}", session_id_str);
650            acp::Error::invalid_params()
651        })?;
652        validate_prompt_support(&model, &content)?;
653
654        let dispatch = self.registry.begin_prompt(&session_id_str).await.ok_or_else(|| {
655            error!("Session not found: {}", session_id_str);
656            acp::Error::invalid_params()
657        })?;
658
659        let (result_tx, result_rx) = oneshot::channel();
660        dispatch
661            .relay_tx
662            .send(SessionCommand::Prompt {
663                content,
664                switch_model: dispatch.switch_model,
665                reasoning_effort: dispatch.reasoning_effort,
666                result_tx,
667            })
668            .await
669            .map_err(|_| {
670                error!("Relay channel closed for session {}", session_id_str);
671                acp::Error::internal_error()
672            })?;
673
674        let stop_reason = result_rx
675            .await
676            .map_err(|_| {
677                error!("Relay dropped result channel for session {}", session_id_str);
678                acp::Error::internal_error()
679            })?
680            .map_err(|e| {
681                error!("Relay error for session {}: {}", session_id_str, e);
682                acp::Error::internal_error()
683            })?;
684
685        info!("Prompt completed with stop reason: {:?}", stop_reason);
686        Ok(PromptResponse::new(stop_reason))
687    }
688
689    pub async fn cancel(&self, args: acp::CancelNotification) -> Result<(), acp::Error> {
690        info!("Received cancel for session: {:?}", args.session_id);
691        let session_id_str = args.session_id.0.to_string();
692        let relay = self.registry.relay(&session_id_str).await.ok_or_else(|| {
693            error!("Session not found for cancel: {}", session_id_str);
694            acp::Error::invalid_params()
695        })?;
696
697        relay.cmd.send(SessionCommand::Cancel).await.map_err(|_| {
698            error!("Relay channel closed for cancel: {}", session_id_str);
699            acp::Error::internal_error()
700        })?;
701
702        Ok(())
703    }
704
705    pub async fn set_session_config_option(
706        &self,
707        args: SetSessionConfigOptionRequest,
708    ) -> Result<SetSessionConfigOptionResponse, acp::Error> {
709        let session_id_str = args.session_id.0.to_string();
710        let config_id = args.config_id.0.to_string();
711        let value = args.value.0.to_string();
712
713        info!("set_session_config_option: session={}, config={}, value={}", session_id_str, config_id, value);
714
715        let setting = ConfigSetting::parse(&config_id, &value).map_err(|e| {
716            error!("{e}");
717            acp::Error::invalid_params()
718        })?;
719
720        let available = get_local_models().await;
721        let all_models = get_all_models(&available);
722
723        let snapshot =
724            self.registry.apply_config_change(&session_id_str, &setting, &available).await.ok_or_else(|| {
725                error!("Session not found: {}", session_id_str);
726                acp::Error::invalid_params()
727            })??;
728
729        let options = options_from_snapshot(&snapshot, &available, &all_models, &OAuthCredentialStore::default());
730        Ok(SetSessionConfigOptionResponse::new(options))
731    }
732
733    pub async fn on_mcp_request(&self, request: McpRequest) -> Result<(), acp::Error> {
734        info!("Received MCP ext request: {:?}", request);
735        match request {
736            McpRequest::Authenticate { session_id, server_name } => {
737                let relay = self.registry.relay(&session_id).await.ok_or_else(|| {
738                    error!("Session not found for authenticate_mcp_server: {}", session_id);
739                    acp::Error::invalid_params()
740                })?;
741
742                relay.mcp_request.send(McpRequest::Authenticate { session_id, server_name }).await.map_err(|_| {
743                    error!("MCP request channel closed for session");
744                    acp::Error::internal_error()
745                })?;
746            }
747        }
748
749        Ok(())
750    }
751}