1use acp_utils::notifications::{AuthMethodsUpdatedParams, McpRequest};
2use acp_utils::server::AcpServerError;
3use agent_client_protocol::schema::{
4 self as acp, AgentCapabilities, AuthMethod, AuthenticateRequest, AuthenticateResponse, AvailableCommandsUpdate,
5 ConfigOptionUpdate, Implementation, InitializeRequest, InitializeResponse, ListSessionsRequest,
6 ListSessionsResponse, LoadSessionRequest, LoadSessionResponse, McpCapabilities, NewSessionRequest,
7 NewSessionResponse, PromptCapabilities, PromptResponse, ProtocolVersion, SessionId, SessionNotification,
8 SessionUpdate, SetSessionConfigOptionRequest, SetSessionConfigOptionResponse,
9};
10use agent_client_protocol::{Client, ConnectionTo};
11use llm::catalog::{LlmModel, get_local_models};
12use llm::oauth::OAuthCredentialStore;
13use llm::types::IsoString;
14use llm::{ContentBlock, ReasoningEffort};
15use std::collections::HashSet;
16use std::path::Path;
17use std::sync::Arc;
18use tokio::spawn;
19use tokio::sync::oneshot;
20use tracing::{error, info, warn};
21
22use super::config_setting::ConfigSetting;
23use super::mappers::{map_acp_mcp_servers, replay_to_client};
24use super::model_config::{
25 ValidatedMode, build_config_options_from_modes, pick_default_model, supports_prompt_audio,
26 validated_modes_from_specs,
27};
28use super::relay::{SessionCommand, spawn_relay};
29use super::session::Session;
30use super::session_registry::{ConfigSnapshot, SessionRegistry};
31use super::session_store::{SessionMeta, SessionStore};
32use acp_utils::content::format_embedded_resource;
33use aether_core::agent_spec::AgentSpec;
34use aether_core::context::ext::ContextExt;
35use aether_project::{AetherSettings, AetherSettingsSource, AgentCatalog};
36use llm::Context;
37
38#[derive(Clone, Debug, Default)]
40pub enum InitialSessionSelection {
41 #[default]
42 Default,
43 Agent(String),
44 Model {
45 model: String,
46 reasoning_effort: Option<ReasoningEffort>,
47 },
48}
49
50impl InitialSessionSelection {
51 pub fn agent(name: String) -> Self {
52 Self::Agent(name)
53 }
54
55 pub fn model(model: String, reasoning_effort: Option<ReasoningEffort>) -> Self {
56 Self::Model { model, reasoning_effort }
57 }
58}
59
60pub struct SessionManager {
62 registry: Arc<SessionRegistry>,
63 session_store: Arc<SessionStore>,
64 has_oauth_credential: fn(&str) -> bool,
65 initial_selection: InitialSessionSelection,
66 settings_source: Option<AetherSettingsSource>,
67}
68
69pub(crate) struct SessionManagerConfig {
70 pub(crate) registry: Arc<SessionRegistry>,
71 pub(crate) session_store: Arc<SessionStore>,
72 pub(crate) has_oauth_credential: fn(&str) -> bool,
73 pub(crate) initial_selection: InitialSessionSelection,
74 pub(crate) settings_source: Option<AetherSettingsSource>,
75}
76
77struct SessionModeCatalog {
78 catalog: AgentCatalog,
79 modes: Vec<ValidatedMode>,
80 available: Vec<LlmModel>,
81}
82
83struct ResolvedInitialSession {
84 spec: AgentSpec,
85 selected_mode: Option<String>,
86}
87
88#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
89struct PromptModalities {
90 image: bool,
91 audio: bool,
92}
93
94impl PromptModalities {
95 fn from_content(content: &[ContentBlock]) -> Self {
96 Self {
97 image: content.iter().any(ContentBlock::is_image),
98 audio: content.iter().any(|block| matches!(block, ContentBlock::Audio { .. })),
99 }
100 }
101
102 fn is_empty(self) -> bool {
103 !self.image && !self.audio
104 }
105}
106
107impl SessionManager {
108 pub(crate) fn new(deps: SessionManagerConfig) -> Self {
109 Self {
110 registry: deps.registry,
111 session_store: deps.session_store,
112 has_oauth_credential: deps.has_oauth_credential,
113 initial_selection: deps.initial_selection,
114 settings_source: deps.settings_source,
115 }
116 }
117
118 fn resolve_initial_session(
119 &self,
120 mode_catalog: &SessionModeCatalog,
121 default_model: &LlmModel,
122 ) -> Result<ResolvedInitialSession, acp::Error> {
123 match &self.initial_selection {
124 InitialSessionSelection::Default => resolve_default_initial_session(mode_catalog, default_model),
125 InitialSessionSelection::Agent(agent) => {
126 if !mode_catalog.modes.iter().any(|mode| mode.name == *agent) {
127 warn!("Unknown agent `{agent}` requested via --agent");
128 return Err(acp::Error::invalid_params());
129 }
130 resolve_agent_spec(&mode_catalog.catalog, agent)
131 .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(agent.clone()) })
132 }
133 InitialSessionSelection::Model { model, reasoning_effort } => {
134 let model = parse_available_model(model, &mode_catalog.available)?;
135 Ok(ResolvedInitialSession {
136 spec: AgentSpec::default_spec(&model, *reasoning_effort, Vec::new()),
137 selected_mode: None,
138 })
139 }
140 }
141 }
142
143 async fn load_mode_catalog(&self, cwd: &Path) -> Result<SessionModeCatalog, acp::Error> {
144 let config = if let Some(source) = self.settings_source.clone() {
145 AetherSettings::load(cwd, [source])
146 } else {
147 AetherSettings::load_default(cwd)
148 }
149 .map_err(|e| {
150 error!("Failed to load agent catalog: {e}");
151 acp::Error::invalid_params()
152 })?;
153 let catalog = if config.agents.is_empty() {
154 AgentCatalog::empty(cwd.to_path_buf())
155 } else {
156 AgentCatalog::from_settings(cwd, config).map_err(|e| {
157 error!("Failed to load agent catalog: {e}");
158 acp::Error::invalid_params()
159 })?
160 };
161
162 let available = get_local_models().await;
163 let specs: Vec<_> = catalog.user_invocable().cloned().collect();
164 let modes = validated_modes_from_specs(&specs, &available);
165
166 Ok(SessionModeCatalog { catalog, modes, available })
167 }
168
169 #[allow(clippy::too_many_arguments, clippy::similar_names)]
170 async fn register_session(
171 &self,
172 session: Session,
173 session_id: &str,
174 acp_session_id: &SessionId,
175 model: &str,
176 selected_mode: Option<String>,
177 reasoning_effort: Option<ReasoningEffort>,
178 modes: Vec<ValidatedMode>,
179 cx: &ConnectionTo<Client>,
180 ) -> Vec<acp::SessionConfigOption> {
181 let relay = spawn_relay(session, cx.clone(), acp_session_id.clone(), self.session_store.clone());
182
183 self.registry
184 .insert(
185 session_id.to_string(),
186 relay,
187 model.to_string(),
188 selected_mode.clone(),
189 reasoning_effort,
190 modes.clone(),
191 )
192 .await;
193
194 let available = get_local_models().await;
195 let all_models = get_all_models(&available);
196 build_config_options_from_modes(
197 &modes,
198 &available,
199 selected_mode.as_deref(),
200 model,
201 reasoning_effort,
202 &all_models,
203 &OAuthCredentialStore::default(),
204 )
205 }
206
207 fn send_available_commands_notification(
208 available_commands: Vec<acp::AvailableCommand>,
209 acp_session_id: SessionId,
210 session_id: &str,
211 cx: &ConnectionTo<Client>,
212 ) {
213 if available_commands.is_empty() {
214 return;
215 }
216 let command_count = available_commands.len();
217 let notification = SessionNotification::new(
218 acp_session_id,
219 SessionUpdate::AvailableCommandsUpdate(AvailableCommandsUpdate::new(available_commands)),
220 );
221 if let Err(e) = cx.send_notification(notification).map_err(|e| AcpServerError::protocol("session/update", e)) {
222 error!("Failed to send available commands notification: {:?}", e);
223 } else {
224 info!("Sent available commands update for session {} ({} commands)", session_id, command_count);
225 }
226 }
227
228 pub async fn shutdown_all_sessions(&self) {
231 self.registry.shutdown_all().await;
232 }
233}
234
235fn options_from_snapshot(
236 snapshot: &ConfigSnapshot,
237 available: &[LlmModel],
238 all_models: &[LlmModel],
239 credential_store: &OAuthCredentialStore,
240) -> Vec<acp::SessionConfigOption> {
241 build_config_options_from_modes(
242 &snapshot.modes,
243 available,
244 snapshot.selected_mode.as_deref(),
245 &snapshot.effective_model,
246 snapshot.reasoning_effort,
247 all_models,
248 credential_store,
249 )
250}
251
252fn get_all_models(discovered: &[LlmModel]) -> Vec<LlmModel> {
255 let mut all = LlmModel::all().to_vec();
256 for m in discovered {
257 if !all.contains(m) {
258 all.push(m.clone());
259 }
260 }
261 all
262}
263
264fn build_auth_methods(has_credential: impl Fn(&str) -> bool) -> Vec<AuthMethod> {
265 let mut seen = HashSet::new();
266 LlmModel::all()
267 .iter()
268 .filter_map(LlmModel::oauth_provider_id)
269 .filter(|id| seen.insert(*id))
270 .map(|id| {
271 let display = LlmModel::all()
272 .iter()
273 .find(|m| m.oauth_provider_id() == Some(id))
274 .map_or(id, |m| m.provider_display_name());
275 let mut method = acp::AuthMethodAgent::new(id, display);
276 if has_credential(id) {
277 method = method.description("authenticated");
278 }
279 AuthMethod::Agent(method)
280 })
281 .collect()
282}
283
284fn map_acp_to_content_blocks(blocks: Vec<acp::ContentBlock>) -> Vec<ContentBlock> {
285 blocks
286 .into_iter()
287 .map(|block| match block {
288 acp::ContentBlock::Text(t) => ContentBlock::text(t.text),
289 acp::ContentBlock::Image(img) => ContentBlock::Image { data: img.data, mime_type: img.mime_type },
290 acp::ContentBlock::Audio(aud) => ContentBlock::Audio { data: aud.data, mime_type: aud.mime_type },
291 acp::ContentBlock::Resource(r) => ContentBlock::text(format_embedded_resource(&r)),
292 acp::ContentBlock::ResourceLink(l) => ContentBlock::text(format!("[Resource: {}]", l.uri)),
293 _ => ContentBlock::text("[Unknown content]"),
294 })
295 .collect()
296}
297
298fn resolve_agent_spec(catalog: &AgentCatalog, mode_name: &str) -> Result<AgentSpec, acp::Error> {
299 catalog.resolve(mode_name).map_err(|e| {
300 error!("Failed to resolve runtime inputs for mode '{}': {e}", mode_name);
301 acp::Error::invalid_params()
302 })
303}
304
305fn resolve_default_initial_session(
306 mode_catalog: &SessionModeCatalog,
307 default_model: &LlmModel,
308) -> Result<ResolvedInitialSession, acp::Error> {
309 if let Some(mode) = mode_catalog.modes.first() {
310 return resolve_agent_spec(&mode_catalog.catalog, &mode.name)
311 .map(|spec| ResolvedInitialSession { spec, selected_mode: Some(mode.name.clone()) });
312 }
313
314 Ok(ResolvedInitialSession { spec: AgentSpec::default_spec(default_model, None, Vec::new()), selected_mode: None })
315}
316
317fn parse_available_model(model: &str, available: &[LlmModel]) -> Result<LlmModel, acp::Error> {
318 let parsed = model.parse().map_err(|e: String| {
319 warn!("Failed to parse --model `{model}`: {e}");
320 acp::Error::invalid_params()
321 })?;
322 if available.iter().any(|available| available == &parsed) {
323 Ok(parsed)
324 } else {
325 warn!("Requested model `{model}` is not available");
326 Err(acp::Error::invalid_params())
327 }
328}
329
330fn prompt_capabilities_for_models(models: &[LlmModel]) -> PromptCapabilities {
331 PromptCapabilities::new()
332 .embedded_context(true)
333 .image(models.iter().any(LlmModel::supports_image))
334 .audio(models.iter().any(supports_prompt_audio))
335}
336
337fn selected_models(model_value: &str) -> Result<Vec<LlmModel>, acp::Error> {
338 model_value
339 .split(',')
340 .map(str::trim)
341 .filter(|part| !part.is_empty())
342 .map(|part| part.parse::<LlmModel>().map_err(|_| acp::Error::invalid_params()))
343 .collect()
344}
345
346fn validate_prompt_support(model_value: &str, content: &[ContentBlock]) -> Result<(), acp::Error> {
347 let modalities = PromptModalities::from_content(content);
348 if modalities.is_empty() {
349 return Ok(());
350 }
351
352 let selected = selected_models(model_value)?;
353 if modalities.image && selected.iter().any(|model| !model.supports_image()) {
354 return Err(acp::Error::invalid_params());
355 }
356 if modalities.audio && selected.iter().any(|model| !supports_prompt_audio(model)) {
357 return Err(acp::Error::invalid_params());
358 }
359
360 Ok(())
361}
362
363#[cfg(test)]
364#[allow(clippy::items_after_test_module)]
365mod tests {
366 use super::*;
367 use agent_client_protocol::schema::{InitializeRequest, ProtocolVersion};
368
369 const SONNET: &str = "anthropic:claude-sonnet-4-5";
370 const DEEPSEEK: &str = "deepseek:deepseek-chat";
371
372 #[tokio::test]
373 async fn initialize_always_advertises_load_session_support() {
374 let session_store =
375 SessionStore::new().map_or_else(|e| panic!("Failed to initialize session store: {e}"), Arc::new);
376 let manager = SessionManager::new(SessionManagerConfig {
377 registry: Arc::new(SessionRegistry::new()),
378 session_store,
379 has_oauth_credential: |_| false,
380 initial_selection: InitialSessionSelection::default(),
381 settings_source: None,
382 });
383 let response =
384 manager.initialize(InitializeRequest::new(ProtocolVersion::LATEST)).await.expect("initialize succeeds");
385 let json = serde_json::to_string(&response).expect("response serializes");
386 assert!(json.contains("\"loadSession\":true"));
387 }
388
389 #[test]
390 fn prompt_capabilities_reflect_available_modalities() {
391 let image_only = prompt_capabilities_for_models(&["anthropic:claude-sonnet-4-5".parse().unwrap()]);
392 assert!(image_only.image);
393 assert!(!image_only.audio);
394
395 let audio_capable =
396 prompt_capabilities_for_models(&["gemini:gemini-live-2.5-flash-preview-native-audio".parse().unwrap()]);
397 assert!(!audio_capable.image);
398 assert!(audio_capable.audio);
399
400 let text_only = prompt_capabilities_for_models(&[DEEPSEEK.parse().unwrap()]);
401 assert!(!text_only.image);
402 assert!(!text_only.audio);
403 }
404
405 #[test]
406 fn validate_prompt_support_requires_all_selected_models_to_support_media() {
407 let image_content = vec![ContentBlock::Image { data: "aW1n".to_string(), mime_type: "image/png".to_string() }];
408 let audio_content =
409 vec![ContentBlock::Audio { data: "YXVkaW8=".to_string(), mime_type: "audio/wav".to_string() }];
410
411 assert!(validate_prompt_support(SONNET, &image_content).is_ok());
412 assert!(validate_prompt_support(DEEPSEEK, &image_content).is_err());
413 assert!(validate_prompt_support("gemini:gemini-live-2.5-flash-preview-native-audio", &audio_content,).is_ok());
414 assert!(validate_prompt_support(SONNET, &audio_content).is_err());
415 assert!(
416 validate_prompt_support("anthropic:claude-sonnet-4-5,deepseek:deepseek-chat", &image_content,).is_err()
417 );
418 assert!(
419 validate_prompt_support(
420 "gemini:gemini-live-2.5-flash-preview-native-audio,deepseek:deepseek-chat",
421 &audio_content,
422 )
423 .is_err()
424 );
425 }
426}
427
428impl SessionManager {
429 pub async fn initialize(&self, args: InitializeRequest) -> Result<InitializeResponse, acp::Error> {
430 info!("Received initialize request: {:?}", args);
431 let auth_methods = build_auth_methods(self.has_oauth_credential);
432 let available = get_local_models().await;
433 Ok(InitializeResponse::new(ProtocolVersion::V1)
434 .agent_info(Implementation::new("Aether", "0.1.0"))
435 .agent_capabilities(
436 AgentCapabilities::new()
437 .load_session(true)
438 .mcp_capabilities(McpCapabilities::new().http(true).sse(true))
439 .session_capabilities(acp::SessionCapabilities::new().list(acp::SessionListCapabilities::new()))
440 .prompt_capabilities(prompt_capabilities_for_models(&available)),
441 )
442 .auth_methods(auth_methods))
443 }
444
445 pub async fn authenticate(
446 &self,
447 args: AuthenticateRequest,
448 cx: &ConnectionTo<Client>,
449 ) -> Result<AuthenticateResponse, acp::Error> {
450 info!("Received authenticate request: {:?}", args);
451 let method_id = args.method_id.0.as_ref();
452 match method_id {
453 "codex" => {
454 llm::perform_codex_oauth_flow().await.map_err(|e| {
455 error!("OAuth flow failed for {method_id}: {e}");
456 acp::Error::internal_error()
457 })?;
458 }
459 _ => return Err(acp::Error::invalid_params()),
460 }
461 let auth_methods = build_auth_methods(self.has_oauth_credential);
462 if let Err(e) = cx
463 .send_notification(AuthMethodsUpdatedParams { auth_methods })
464 .map_err(|e| AcpServerError::protocol("_aether/auth_methods_updated", e))
465 {
466 error!("Failed to send auth methods updated notification: {:?}", e);
467 }
468
469 let credential_store = OAuthCredentialStore::default();
470 let available = get_local_models().await;
471 let all_models = get_all_models(&available);
472 let snapshots = self.registry.snapshot_all_configs().await;
473
474 for (id, snap) in snapshots {
475 let options = options_from_snapshot(&snap, &available, &all_models, &credential_store);
476 let notification = SessionNotification::new(
477 SessionId::new(id),
478 SessionUpdate::ConfigOptionUpdate(ConfigOptionUpdate::new(options)),
479 );
480 let _ = cx.send_notification(notification);
481 }
482
483 Ok(AuthenticateResponse::default())
484 }
485
486 pub async fn new_session(
487 &self,
488 mut args: NewSessionRequest,
489 cx: &ConnectionTo<Client>,
490 ) -> Result<NewSessionResponse, acp::Error> {
491 if std::env::var("AETHER_INSIDE_SANDBOX").is_ok() {
494 let container_cwd = std::env::current_dir().unwrap_or_else(|_| "/workspace".into());
495 info!("Sandbox: remapping cwd {:?} -> {:?}", args.cwd, container_cwd);
496 args.cwd = container_cwd;
497 }
498
499 info!("Creating new session with cwd: {:?}", args.cwd);
500 let session_id = uuid::Uuid::new_v4().to_string();
501 let acp_session_id = acp::SessionId::new(session_id.clone());
502
503 let mode_catalog = self.load_mode_catalog(&args.cwd).await?;
504 let default_model = pick_default_model(&mode_catalog.available).ok_or_else(|| {
505 error!("No models available — set an API key env var (e.g. ANTHROPIC_API_KEY)");
506 acp::Error::internal_error()
507 })?;
508
509 let ResolvedInitialSession { spec, selected_mode } =
510 self.resolve_initial_session(&mode_catalog, default_model)?;
511 let model_str = spec.model.clone();
512 let reasoning_effort = spec.reasoning_effort;
513
514 let session =
515 Session::new(spec, args.cwd.clone(), map_acp_mcp_servers(args.mcp_servers), None, Some(session_id.clone()))
516 .await
517 .map_err(|e| {
518 error!("Failed to create session: {}", e);
519 acp::Error::internal_error()
520 })?;
521
522 let available_commands = session.list_available_commands().await.map_err(|e| {
523 error!("Failed to list available commands: {}", e);
524 acp::Error::internal_error()
525 })?;
526
527 let meta = SessionMeta {
528 session_id: session_id.clone(),
529 cwd: args.cwd.clone(),
530 model: model_str.clone(),
531 selected_mode: selected_mode.clone(),
532 created_at: IsoString::now().0,
533 };
534 if let Err(e) = self.session_store.append_meta(&session_id, &meta) {
535 error!("Failed to write session meta: {e}");
536 }
537
538 let config_options = self
539 .register_session(
540 session,
541 &session_id,
542 &acp_session_id,
543 &model_str,
544 selected_mode,
545 reasoning_effort,
546 mode_catalog.modes,
547 cx,
548 )
549 .await;
550
551 info!("Session {} created successfully", session_id);
552
553 let response = NewSessionResponse::new(acp_session_id.clone()).config_options(config_options);
554
555 Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
556
557 Ok(response)
558 }
559
560 pub fn list_sessions(&self, args: &ListSessionsRequest) -> Result<ListSessionsResponse, acp::Error> {
561 info!("Listing sessions, cwd filter: {:?}", args.cwd);
562 let mut summaries = self.session_store.list();
563
564 if let Some(cwd) = args.cwd.as_ref() {
565 summaries.retain(|s| s.meta.cwd == *cwd);
566 }
567
568 let sessions: Vec<acp::SessionInfo> = summaries
569 .into_iter()
570 .map(|s| acp::SessionInfo::new(s.meta.session_id, s.meta.cwd).updated_at(s.meta.created_at).title(s.title))
571 .collect();
572
573 info!("Found {} sessions", sessions.len());
574 Ok(ListSessionsResponse::new(sessions))
575 }
576
577 pub async fn load_session(
578 &self,
579 args: LoadSessionRequest,
580 cx: &ConnectionTo<Client>,
581 ) -> Result<LoadSessionResponse, acp::Error> {
582 let session_id = args.session_id.0.to_string();
583 info!("Loading session: {session_id}");
584
585 let (meta, events) = self.session_store.load(&session_id).ok_or_else(|| {
586 error!("Session not found: {session_id}");
587 acp::Error::invalid_params()
588 })?;
589
590 let context = Context::from_events(&events);
591 let mode_catalog = self.load_mode_catalog(&args.cwd).await?;
592
593 let spec = if let Some(mode_name) = meta.selected_mode.as_deref() {
594 resolve_agent_spec(&mode_catalog.catalog, mode_name)?
595 } else {
596 let parsed_model: LlmModel = meta.model.parse().map_err(|e: String| {
597 error!("Failed to parse restored model '{}': {e}", meta.model);
598 acp::Error::invalid_params()
599 })?;
600 AgentSpec::default_spec(&parsed_model, None, Vec::new())
601 };
602
603 let model = spec.model.clone();
604
605 let restored_messages: Vec<_> = context.messages().iter().filter(|m| !m.is_system()).cloned().collect();
606
607 let session = Session::new(
608 spec,
609 args.cwd.clone(),
610 map_acp_mcp_servers(args.mcp_servers),
611 Some(restored_messages),
612 Some(session_id.clone()),
613 )
614 .await
615 .map_err(|e| {
616 error!("Failed to create session for load: {e}");
617 acp::Error::internal_error()
618 })?;
619
620 let available_commands = session.list_available_commands().await.map_err(|e| {
621 error!("Failed to list available commands: {e}");
622 acp::Error::internal_error()
623 })?;
624
625 let acp_session_id = acp::SessionId::new(session_id.clone());
626
627 let config_options = self
628 .register_session(
629 session,
630 &session_id,
631 &acp_session_id,
632 &model,
633 meta.selected_mode,
634 None,
635 mode_catalog.modes,
636 cx,
637 )
638 .await;
639
640 info!("Session {session_id} loaded successfully");
641
642 let response = LoadSessionResponse::new().config_options(config_options);
643
644 let cx_clone = cx.clone();
645 let replay_session_id = acp_session_id.clone();
646 spawn(async move {
647 replay_to_client(&events, &cx_clone, &replay_session_id).await;
648 });
649
650 Self::send_available_commands_notification(available_commands, acp_session_id, &session_id, cx);
651
652 Ok(response)
653 }
654
655 pub async fn prompt(&self, args: acp::PromptRequest) -> Result<acp::PromptResponse, acp::Error> {
656 info!("Received prompt for session: {:?}", args.session_id);
657 let session_id_str = args.session_id.0.to_string();
658 let content = map_acp_to_content_blocks(args.prompt);
659
660 let model = self.registry.effective_model(&session_id_str).await.ok_or_else(|| {
661 error!("Session not found: {}", session_id_str);
662 acp::Error::invalid_params()
663 })?;
664 validate_prompt_support(&model, &content)?;
665
666 let dispatch = self.registry.begin_prompt(&session_id_str).await.ok_or_else(|| {
667 error!("Session not found: {}", session_id_str);
668 acp::Error::invalid_params()
669 })?;
670
671 let (result_tx, result_rx) = oneshot::channel();
672 dispatch
673 .relay_tx
674 .send(SessionCommand::Prompt {
675 content,
676 switch_model: dispatch.switch_model,
677 reasoning_effort: dispatch.reasoning_effort,
678 result_tx,
679 })
680 .await
681 .map_err(|_| {
682 error!("Relay channel closed for session {}", session_id_str);
683 acp::Error::internal_error()
684 })?;
685
686 let stop_reason = result_rx
687 .await
688 .map_err(|_| {
689 error!("Relay dropped result channel for session {}", session_id_str);
690 acp::Error::internal_error()
691 })?
692 .map_err(|e| {
693 error!("Relay error for session {}: {}", session_id_str, e);
694 acp::Error::internal_error()
695 })?;
696
697 info!("Prompt completed with stop reason: {:?}", stop_reason);
698 Ok(PromptResponse::new(stop_reason))
699 }
700
701 pub async fn cancel(&self, args: acp::CancelNotification) -> Result<(), acp::Error> {
702 info!("Received cancel for session: {:?}", args.session_id);
703 let session_id_str = args.session_id.0.to_string();
704 let relay = self.registry.relay(&session_id_str).await.ok_or_else(|| {
705 error!("Session not found for cancel: {}", session_id_str);
706 acp::Error::invalid_params()
707 })?;
708
709 relay.cmd.send(SessionCommand::Cancel).await.map_err(|_| {
710 error!("Relay channel closed for cancel: {}", session_id_str);
711 acp::Error::internal_error()
712 })?;
713
714 Ok(())
715 }
716
717 pub async fn set_session_config_option(
718 &self,
719 args: SetSessionConfigOptionRequest,
720 ) -> Result<SetSessionConfigOptionResponse, acp::Error> {
721 let session_id_str = args.session_id.0.to_string();
722 let config_id = args.config_id.0.to_string();
723 let value = args.value.0.to_string();
724
725 info!("set_session_config_option: session={}, config={}, value={}", session_id_str, config_id, value);
726
727 let setting = ConfigSetting::parse(&config_id, &value).map_err(|e| {
728 error!("{e}");
729 acp::Error::invalid_params()
730 })?;
731
732 let available = get_local_models().await;
733 let all_models = get_all_models(&available);
734
735 let snapshot =
736 self.registry.apply_config_change(&session_id_str, &setting, &available).await.ok_or_else(|| {
737 error!("Session not found: {}", session_id_str);
738 acp::Error::invalid_params()
739 })??;
740
741 let options = options_from_snapshot(&snapshot, &available, &all_models, &OAuthCredentialStore::default());
742 Ok(SetSessionConfigOptionResponse::new(options))
743 }
744
745 pub async fn on_mcp_request(&self, request: McpRequest) -> Result<(), acp::Error> {
746 info!("Received MCP ext request: {:?}", request);
747 match request {
748 McpRequest::Authenticate { session_id, server_name } => {
749 let relay = self.registry.relay(&session_id).await.ok_or_else(|| {
750 error!("Session not found for authenticate_mcp_server: {}", session_id);
751 acp::Error::invalid_params()
752 })?;
753
754 relay.mcp_request.send(McpRequest::Authenticate { session_id, server_name }).await.map_err(|_| {
755 error!("MCP request channel closed for session");
756 acp::Error::internal_error()
757 })?;
758 }
759 }
760
761 Ok(())
762 }
763}