1use acp_utils::config_option_id::ConfigOptionId;
2use acp_utils::notifications::{AuthMethodsUpdatedParams, McpRequest};
3use agent_client_protocol::{
4 self as acp, Agent, AgentCapabilities, AuthMethod, AuthenticateRequest, AuthenticateResponse,
5 AvailableCommandsUpdate, ConfigOptionUpdate, ExtNotification, Implementation,
6 InitializeRequest, InitializeResponse, ListSessionsRequest, ListSessionsResponse,
7 LoadSessionRequest, LoadSessionResponse, McpCapabilities, NewSessionRequest,
8 NewSessionResponse, PromptCapabilities, PromptResponse, ProtocolVersion, SessionId,
9 SessionNotification, SessionUpdate, SetSessionConfigOptionRequest,
10 SetSessionConfigOptionResponse, SetSessionModeRequest, SetSessionModeResponse,
11};
12use llm::catalog::{self, LlmModel, get_local_models};
13use llm::oauth::OAuthCredentialStore;
14use llm::types::IsoString;
15use llm::{ContentBlock, ReasoningEffort};
16use std::collections::{HashMap, HashSet};
17use std::path::Path;
18use std::sync::Arc;
19use tokio::spawn;
20use tokio::sync::oneshot;
21use tokio::sync::{Mutex, mpsc};
22use tokio::task::JoinHandle;
23use tracing::{error, info};
24
25use super::config_setting::ConfigSetting;
26use super::mappers::{map_acp_mcp_servers, replay_to_client};
27use super::model_config::{
28 ValidatedMode, build_config_options_from_modes, effective_model,
29 mode_name_for_state_from_modes, model_exists, pick_default_model, resolve_mode_from_modes,
30 supports_prompt_audio, validated_modes_from_specs,
31};
32use super::relay::{RelayHandle, SessionCommand, spawn_relay};
33use super::session::Session;
34use super::session_store::{SessionMeta, SessionStore};
35use acp_utils::content::format_embedded_resource;
36use acp_utils::server::AcpActorHandle;
37use aether_core::context::ext::ContextExt;
38use aether_project::{AgentCatalog, load_agent_catalog};
39use llm::Context;
40
41struct SessionModeCatalog {
42 catalog: AgentCatalog,
43 modes: Vec<ValidatedMode>,
44}
45
46#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
47struct PromptModalities {
48 image: bool,
49 audio: bool,
50}
51
52impl PromptModalities {
53 fn from_content(content: &[ContentBlock]) -> Self {
54 Self {
55 image: content.iter().any(ContentBlock::is_image),
56 audio: content
57 .iter()
58 .any(|block| matches!(block, ContentBlock::Audio { .. })),
59 }
60 }
61
62 fn is_empty(self) -> bool {
63 !self.image && !self.audio
64 }
65}
66
67#[derive(Clone, Debug, Default, PartialEq, Eq)]
69struct SessionConfigState {
70 active_model: String,
71 pending_model: Option<String>,
72 reasoning_effort: Option<ReasoningEffort>,
73 selected_mode: Option<String>,
74}
75
76impl SessionConfigState {
77 fn new(active_model: String) -> Self {
78 Self {
79 active_model,
80 pending_model: None,
81 reasoning_effort: None,
82 selected_mode: None,
83 }
84 }
85
86 fn apply_config_change(
87 &mut self,
88 validated_modes: &[ValidatedMode],
89 available: &[LlmModel],
90 setting: &ConfigSetting,
91 ) -> Result<(), acp::Error> {
92 match setting {
93 ConfigSetting::Mode(value) => {
94 let Some((mode_model, mode_reasoning_effort)) =
95 resolve_mode_from_modes(validated_modes, value)
96 else {
97 error!("Unknown or invalid mode: {}", value);
98 return Err(acp::Error::invalid_params());
99 };
100
101 self.pending_model = (self.active_model != mode_model).then_some(mode_model);
102 self.reasoning_effort = mode_reasoning_effort;
103 self.selected_mode = Some(value.clone());
104 }
105 ConfigSetting::Model(value) => {
106 if !model_exists(available, value) {
107 error!("Unknown model in set_session_config_option: {}", value);
108 return Err(acp::Error::invalid_params());
109 }
110 self.pending_model = (self.active_model != *value).then_some(value.clone());
111 }
112 ConfigSetting::ReasoningEffort(effort) => {
113 self.reasoning_effort = *effort;
114 }
115 }
116
117 let effective = effective_model(&self.active_model, self.pending_model.as_deref());
118 if setting.config_id() == ConfigOptionId::Model {
119 self.selected_mode =
120 mode_name_for_state_from_modes(validated_modes, effective, self.reasoning_effort);
121 }
122
123 Ok(())
124 }
125}
126
127struct SessionState {
129 relay_tx: mpsc::Sender<SessionCommand>,
130 mcp_request_tx: mpsc::Sender<McpRequest>,
131 #[allow(dead_code)]
132 _relay_handle: JoinHandle<()>,
133 config: SessionConfigState,
134 modes: Vec<ValidatedMode>,
135}
136
137pub struct SessionManager {
139 sessions: Arc<Mutex<HashMap<String, SessionState>>>,
140 actor_handle: AcpActorHandle,
141 session_store: Arc<SessionStore>,
142 has_oauth_credential: fn(&str) -> bool,
143}
144
145impl SessionManager {
146 pub fn new(actor_handle: AcpActorHandle) -> Self {
147 info!("Creating SessionManager");
148 let session_store = SessionStore::new().map_or_else(
149 |e| panic!("Failed to initialize session store: {e}"),
150 Arc::new,
151 );
152 Self {
153 sessions: Arc::new(Mutex::new(HashMap::new())),
154 actor_handle,
155 session_store,
156 has_oauth_credential: OAuthCredentialStore::has_credential,
157 }
158 }
159
160 async fn load_mode_catalog(cwd: &Path) -> Result<SessionModeCatalog, acp::Error> {
161 let catalog = load_agent_catalog(cwd).map_err(|e| {
162 error!("Failed to load agent catalog: {e}");
163 acp::Error::invalid_params()
164 })?;
165
166 let available = get_local_models().await;
167 let specs: Vec<_> = catalog.user_invocable().cloned().collect();
168 let modes = validated_modes_from_specs(&specs, &available);
169
170 Ok(SessionModeCatalog { catalog, modes })
171 }
172
173 #[allow(clippy::too_many_arguments, clippy::similar_names)]
174 async fn register_session(
175 &self,
176 session: Session,
177 session_id: &str,
178 acp_session_id: &SessionId,
179 model: &str,
180 selected_mode: Option<String>,
181 reasoning_effort: Option<ReasoningEffort>,
182 modes: Vec<ValidatedMode>,
183 ) -> Vec<acp::SessionConfigOption> {
184 let RelayHandle {
185 cmd_tx,
186 mcp_request_tx,
187 join_handle,
188 } = spawn_relay(
189 session,
190 self.actor_handle.clone(),
191 acp_session_id.clone(),
192 self.session_store.clone(),
193 );
194
195 let mut config = SessionConfigState::new(model.to_string());
196 config.reasoning_effort = reasoning_effort;
197 config.selected_mode.clone_from(&selected_mode);
198
199 let state = SessionState {
200 relay_tx: cmd_tx,
201 mcp_request_tx,
202 _relay_handle: join_handle,
203 config,
204 modes: modes.clone(),
205 };
206
207 let mut sessions = self.sessions.lock().await;
208 sessions.insert(session_id.to_string(), state);
209
210 let available = get_local_models().await;
211 let all_models = get_all_models(&available);
212 build_config_options_from_modes(
213 &modes,
214 &available,
215 selected_mode.as_deref(),
216 model,
217 reasoning_effort,
218 &all_models,
219 &OAuthCredentialStore::default(),
220 )
221 }
222
223 fn spawn_available_commands_notification(
224 &self,
225 available_commands: Vec<acp::AvailableCommand>,
226 acp_session_id: SessionId,
227 session_id: &str,
228 ) {
229 if available_commands.is_empty() {
230 return;
231 }
232 let command_count = available_commands.len();
233 let notification = SessionNotification::new(
234 acp_session_id,
235 SessionUpdate::AvailableCommandsUpdate(AvailableCommandsUpdate::new(
236 available_commands,
237 )),
238 );
239 let actor_handle = self.actor_handle.clone();
240 let session_id_log = session_id.to_string();
241 spawn(async move {
242 if let Err(e) = actor_handle.send_session_notification(notification).await {
243 error!("Failed to send available commands notification: {:?}", e);
244 } else {
245 info!(
246 "Sent available commands update for session {} ({} commands)",
247 session_id_log, command_count
248 );
249 }
250 });
251 }
252}
253
254fn get_all_models(discovered: &[LlmModel]) -> Vec<LlmModel> {
257 let mut all = LlmModel::all().to_vec();
258 for m in discovered {
259 if !all.contains(m) {
260 all.push(m.clone());
261 }
262 }
263 all
264}
265
266fn build_auth_methods(has_credential: impl Fn(&str) -> bool) -> Vec<AuthMethod> {
267 let mut seen = HashSet::new();
268 LlmModel::all()
269 .iter()
270 .filter_map(LlmModel::oauth_provider_id)
271 .filter(|id| seen.insert(*id))
272 .map(|id| {
273 let display = LlmModel::all()
274 .iter()
275 .find(|m| m.oauth_provider_id() == Some(id))
276 .map_or(id, |m| m.provider_display_name());
277 let mut method = acp::AuthMethodAgent::new(id, display);
278 if has_credential(id) {
279 method = method.description("authenticated");
280 }
281 AuthMethod::Agent(method)
282 })
283 .collect()
284}
285
286fn map_acp_to_content_blocks(blocks: Vec<acp::ContentBlock>) -> Vec<ContentBlock> {
287 blocks
288 .into_iter()
289 .map(|block| match block {
290 acp::ContentBlock::Text(t) => ContentBlock::text(t.text),
291 acp::ContentBlock::Image(img) => ContentBlock::Image {
292 data: img.data,
293 mime_type: img.mime_type,
294 },
295 acp::ContentBlock::Audio(aud) => ContentBlock::Audio {
296 data: aud.data,
297 mime_type: aud.mime_type,
298 },
299 acp::ContentBlock::Resource(r) => ContentBlock::text(format_embedded_resource(&r)),
300 acp::ContentBlock::ResourceLink(l) => {
301 ContentBlock::text(format!("[Resource: {}]", l.uri))
302 }
303 _ => ContentBlock::text("[Unknown content]"),
304 })
305 .collect()
306}
307
308fn select_initial_mode(
309 validated_modes: &[ValidatedMode],
310) -> (Option<String>, Option<(String, Option<ReasoningEffort>)>) {
311 validated_modes.first().map_or((None, None), |mode| {
312 (
313 Some(mode.name.clone()),
314 Some((mode.model.clone(), mode.reasoning_effort)),
315 )
316 })
317}
318
319fn prompt_capabilities_for_models(models: &[LlmModel]) -> PromptCapabilities {
320 PromptCapabilities::new()
321 .embedded_context(true)
322 .image(models.iter().any(LlmModel::supports_image))
323 .audio(models.iter().any(supports_prompt_audio))
324}
325
326fn selected_models(model_value: &str) -> Result<Vec<LlmModel>, acp::Error> {
327 model_value
328 .split(',')
329 .map(str::trim)
330 .filter(|part| !part.is_empty())
331 .map(|part| {
332 part.parse::<LlmModel>()
333 .map_err(|_| acp::Error::invalid_params())
334 })
335 .collect()
336}
337
338fn validate_prompt_support(model_value: &str, content: &[ContentBlock]) -> Result<(), acp::Error> {
339 let modalities = PromptModalities::from_content(content);
340 if modalities.is_empty() {
341 return Ok(());
342 }
343
344 let selected = selected_models(model_value)?;
345 if modalities.image && selected.iter().any(|model| !model.supports_image()) {
346 return Err(acp::Error::invalid_params());
347 }
348 if modalities.audio && selected.iter().any(|model| !supports_prompt_audio(model)) {
349 return Err(acp::Error::invalid_params());
350 }
351
352 Ok(())
353}
354
355#[cfg(test)]
356#[allow(clippy::items_after_test_module)]
357mod tests {
358 use super::*;
359 use crate::acp::config_setting::ConfigSetting;
360 use ReasoningEffort as RE;
361 use agent_client_protocol::{InitializeRequest, ProtocolVersion};
362
363 const SONNET: &str = "anthropic:claude-sonnet-4-5";
364 const DEEPSEEK: &str = "deepseek:deepseek-chat";
365
366 fn available_models() -> Vec<LlmModel> {
367 [SONNET, "anthropic:claude-opus-4-6", DEEPSEEK]
368 .into_iter()
369 .map(|s| s.parse().expect("valid model"))
370 .collect()
371 }
372
373 fn validated_modes() -> Vec<ValidatedMode> {
374 let m = |name: &str, model: &str, effort| ValidatedMode {
375 name: name.into(),
376 model: model.into(),
377 reasoning_effort: effort,
378 };
379 vec![
380 m("Planner", SONNET, Some(RE::High)),
381 m("Coder", DEEPSEEK, None),
382 ]
383 }
384
385 fn apply(
386 active: &str,
387 effort: Option<RE>,
388 mode: Option<&str>,
389 setting: ConfigSetting,
390 ) -> (Result<(), acp::Error>, SessionConfigState) {
391 let mut state = SessionConfigState::new(active.into());
392 state.reasoning_effort = effort;
393 state.selected_mode = mode.map(Into::into);
394 let result = state.apply_config_change(&validated_modes(), &available_models(), &setting);
395 (result, state)
396 }
397
398 #[test]
399 fn new_state_has_no_pending_model_or_mode() {
400 let s = SessionConfigState::new(DEEPSEEK.into());
401 assert!(
402 (s.pending_model.is_none()
403 && s.reasoning_effort.is_none()
404 && s.selected_mode.is_none())
405 );
406 }
407
408 #[test]
409 fn mode_selection_updates_pending_model_and_reasoning() {
410 let (res, s) = apply(DEEPSEEK, None, None, ConfigSetting::Mode("Planner".into()));
411 assert!(res.is_ok());
412 assert_eq!(s.pending_model.as_deref(), Some(SONNET));
413 assert_eq!(s.reasoning_effort, Some(RE::High));
414 assert_eq!(s.selected_mode.as_deref(), Some("Planner"));
415 }
416
417 #[test]
418 fn unknown_mode_is_rejected() {
419 let (res, _) = apply(DEEPSEEK, None, None, ConfigSetting::Mode("Unknown".into()));
420 assert!(res.is_err());
421 }
422
423 #[test]
424 fn effort_change_preserves_mode_and_model_change_clears_it() {
425 let (res, s) = apply(
427 SONNET,
428 Some(RE::High),
429 Some("Planner"),
430 ConfigSetting::ReasoningEffort(Some(RE::Low)),
431 );
432 assert!(res.is_ok());
433 assert_eq!(s.reasoning_effort, Some(RE::Low));
434 assert_eq!(s.selected_mode.as_deref(), Some("Planner"));
435
436 let (res, s) = apply(
438 SONNET,
439 Some(RE::Medium),
440 Some("Planner"),
441 ConfigSetting::Model(DEEPSEEK.into()),
442 );
443 assert!(res.is_ok());
444 assert_eq!(s.pending_model.as_deref(), Some(DEEPSEEK));
445 assert!(s.selected_mode.is_none());
446 }
447
448 #[tokio::test]
449 async fn initialize_always_advertises_load_session_support() {
450 let (tx, _rx) = mpsc::unbounded_channel();
451 let mut manager = SessionManager::new(AcpActorHandle::new(tx));
452 manager.has_oauth_credential = |_| false;
453 let response = manager
454 .initialize(InitializeRequest::new(ProtocolVersion::LATEST))
455 .await
456 .expect("initialize succeeds");
457 let json = serde_json::to_string(&response).expect("response serializes");
458 assert!(json.contains("\"loadSession\":true"));
459 }
460
461 #[test]
462 fn prompt_capabilities_reflect_available_modalities() {
463 let image_only =
464 prompt_capabilities_for_models(&["anthropic:claude-sonnet-4-5".parse().unwrap()]);
465 assert!(image_only.image);
466 assert!(!image_only.audio);
467
468 let audio_capable =
469 prompt_capabilities_for_models(&["gemini:gemini-live-2.5-flash-preview-native-audio"
470 .parse()
471 .unwrap()]);
472 assert!(!audio_capable.image);
473 assert!(audio_capable.audio);
474
475 let text_only = prompt_capabilities_for_models(&[DEEPSEEK.parse().unwrap()]);
476 assert!(!text_only.image);
477 assert!(!text_only.audio);
478 }
479
480 #[test]
481 fn validate_prompt_support_requires_all_selected_models_to_support_media() {
482 let image_content = vec![ContentBlock::Image {
483 data: "aW1n".to_string(),
484 mime_type: "image/png".to_string(),
485 }];
486 let audio_content = vec![ContentBlock::Audio {
487 data: "YXVkaW8=".to_string(),
488 mime_type: "audio/wav".to_string(),
489 }];
490
491 assert!(validate_prompt_support(SONNET, &image_content).is_ok());
492 assert!(validate_prompt_support(DEEPSEEK, &image_content).is_err());
493 assert!(
494 validate_prompt_support(
495 "gemini:gemini-live-2.5-flash-preview-native-audio",
496 &audio_content,
497 )
498 .is_ok()
499 );
500 assert!(validate_prompt_support(SONNET, &audio_content).is_err());
501 assert!(
502 validate_prompt_support(
503 "anthropic:claude-sonnet-4-5,deepseek:deepseek-chat",
504 &image_content,
505 )
506 .is_err()
507 );
508 assert!(
509 validate_prompt_support(
510 "gemini:gemini-live-2.5-flash-preview-native-audio,deepseek:deepseek-chat",
511 &audio_content,
512 )
513 .is_err()
514 );
515 }
516}
517
518#[async_trait::async_trait(?Send)]
519impl Agent for SessionManager {
520 async fn initialize(&self, args: InitializeRequest) -> Result<InitializeResponse, acp::Error> {
521 info!("Received initialize request: {:?}", args);
522 let auth_methods = build_auth_methods(self.has_oauth_credential);
523 let available = get_local_models().await;
524 Ok(InitializeResponse::new(ProtocolVersion::V1)
525 .agent_info(Implementation::new("Aether", "0.1.0"))
526 .agent_capabilities(
527 AgentCapabilities::new()
528 .load_session(true)
529 .mcp_capabilities(McpCapabilities::new().http(true).sse(true))
530 .session_capabilities(
531 acp::SessionCapabilities::new().list(acp::SessionListCapabilities::new()),
532 )
533 .prompt_capabilities(prompt_capabilities_for_models(&available)),
534 )
535 .auth_methods(auth_methods))
536 }
537
538 async fn authenticate(
539 &self,
540 args: AuthenticateRequest,
541 ) -> Result<AuthenticateResponse, acp::Error> {
542 info!("Received authenticate request: {:?}", args);
543 let method_id = args.method_id.0.as_ref();
544 match method_id {
545 "codex" => {
546 llm::perform_codex_oauth_flow().await.map_err(|e| {
547 error!("OAuth flow failed for {method_id}: {e}");
548 acp::Error::internal_error()
549 })?;
550 }
551 _ => return Err(acp::Error::invalid_params()),
552 }
553 let auth_methods = build_auth_methods(self.has_oauth_credential);
554 let auth_methods_notification: ExtNotification =
555 AuthMethodsUpdatedParams { auth_methods }.into();
556 if let Err(e) = self
557 .actor_handle
558 .send_ext_notification(auth_methods_notification)
559 .await
560 {
561 error!("Failed to send auth methods updated notification: {:?}", e);
562 }
563
564 let credential_store = OAuthCredentialStore::default();
566 let available = catalog::get_local_models().await;
567 let all_models = get_all_models(&available);
568 let sessions = self.sessions.lock().await;
569 for (id, state) in sessions.iter() {
570 let model = effective_model(
571 &state.config.active_model,
572 state.config.pending_model.as_deref(),
573 );
574 let options = build_config_options_from_modes(
575 &state.modes,
576 &available,
577 state.config.selected_mode.as_deref(),
578 model,
579 state.config.reasoning_effort,
580 &all_models,
581 &credential_store,
582 );
583 let notification = SessionNotification::new(
584 SessionId::new(id.clone()),
585 SessionUpdate::ConfigOptionUpdate(ConfigOptionUpdate::new(options)),
586 );
587 let _ = self
588 .actor_handle
589 .send_session_notification(notification)
590 .await;
591 }
592
593 Ok(AuthenticateResponse::default())
594 }
595
596 async fn new_session(
597 &self,
598 mut args: NewSessionRequest,
599 ) -> Result<NewSessionResponse, acp::Error> {
600 if std::env::var("AETHER_INSIDE_SANDBOX").is_ok() {
603 let container_cwd = std::env::current_dir().unwrap_or_else(|_| "/workspace".into());
604 info!(
605 "Sandbox: remapping cwd {:?} -> {:?}",
606 args.cwd, container_cwd
607 );
608 args.cwd = container_cwd;
609 }
610
611 info!("Creating new session with cwd: {:?}", args.cwd);
612 let session_id = uuid::Uuid::new_v4().to_string();
613 let acp_session_id = acp::SessionId::new(session_id.clone());
614
615 let mode_catalog = Self::load_mode_catalog(&args.cwd).await?;
616 let available = catalog::get_local_models().await;
617 let default_model = pick_default_model(&available).ok_or_else(|| {
618 error!("No models available — set an API key env var (e.g. ANTHROPIC_API_KEY)");
619 acp::Error::internal_error()
620 })?;
621
622 let (initial_selected_mode, mode_config) = select_initial_mode(&mode_catalog.modes);
623
624 let spec = if let Some(selected_mode) = initial_selected_mode.as_deref() {
625 mode_catalog
626 .catalog
627 .resolve(selected_mode, &args.cwd)
628 .map_err(|e| {
629 error!(
630 "Failed to resolve runtime inputs for mode '{}': {e}",
631 selected_mode
632 );
633 acp::Error::invalid_params()
634 })?
635 } else {
636 mode_catalog
637 .catalog
638 .resolve_default(default_model, None, &args.cwd)
639 };
640
641 let model_str = spec.model.clone();
642 let initial_reasoning_effort = mode_config.and_then(|(_, effort)| effort);
643
644 let session = Session::new(
645 spec,
646 args.cwd.clone(),
647 map_acp_mcp_servers(args.mcp_servers),
648 None,
649 Some(session_id.clone()),
650 )
651 .await
652 .map_err(|e| {
653 error!("Failed to create session: {}", e);
654 acp::Error::internal_error()
655 })?;
656
657 let available_commands = session.list_available_commands().await.map_err(|e| {
658 error!("Failed to list available commands: {}", e);
659 acp::Error::internal_error()
660 })?;
661
662 let meta = SessionMeta {
663 session_id: session_id.clone(),
664 cwd: args.cwd.clone(),
665 model: model_str.clone(),
666 selected_mode: initial_selected_mode.clone(),
667 created_at: IsoString::now().0,
668 };
669 if let Err(e) = self.session_store.append_meta(&session_id, &meta) {
670 error!("Failed to write session meta: {e}");
671 }
672
673 let config_options = self
674 .register_session(
675 session,
676 &session_id,
677 &acp_session_id,
678 &model_str,
679 initial_selected_mode,
680 initial_reasoning_effort,
681 mode_catalog.modes,
682 )
683 .await;
684
685 info!("Session {} created successfully", session_id);
686
687 let response =
688 NewSessionResponse::new(acp_session_id.clone()).config_options(config_options);
689
690 self.spawn_available_commands_notification(available_commands, acp_session_id, &session_id);
691
692 Ok(response)
693 }
694
695 async fn list_sessions(
696 &self,
697 args: ListSessionsRequest,
698 ) -> Result<ListSessionsResponse, acp::Error> {
699 info!("Listing sessions, cwd filter: {:?}", args.cwd);
700 let mut summaries = self.session_store.list();
701
702 if let Some(ref cwd) = args.cwd {
703 summaries.retain(|s| s.meta.cwd == *cwd);
704 }
705
706 let sessions: Vec<acp::SessionInfo> = summaries
707 .into_iter()
708 .map(|s| {
709 acp::SessionInfo::new(s.meta.session_id, s.meta.cwd)
710 .updated_at(s.meta.created_at)
711 .title(s.title)
712 })
713 .collect();
714
715 info!("Found {} sessions", sessions.len());
716 Ok(ListSessionsResponse::new(sessions))
717 }
718
719 async fn load_session(
720 &self,
721 args: LoadSessionRequest,
722 ) -> Result<LoadSessionResponse, acp::Error> {
723 let session_id = args.session_id.0.to_string();
724 info!("Loading session: {session_id}");
725
726 let (meta, events) = self.session_store.load(&session_id).ok_or_else(|| {
727 error!("Session not found: {session_id}");
728 acp::Error::invalid_params()
729 })?;
730
731 let context = Context::from_events(&events);
732 let mode_catalog = Self::load_mode_catalog(&args.cwd).await?;
733
734 let spec = if let Some(mode_name) = meta.selected_mode.as_deref() {
735 mode_catalog
736 .catalog
737 .resolve(mode_name, &args.cwd)
738 .map_err(|e| {
739 error!(
740 "Failed to resolve runtime inputs for mode '{}': {e}",
741 mode_name
742 );
743 acp::Error::invalid_params()
744 })?
745 } else {
746 let parsed_model: LlmModel = meta.model.parse().map_err(|e: String| {
747 error!("Failed to parse restored model '{}': {e}", meta.model);
748 acp::Error::invalid_params()
749 })?;
750 mode_catalog
751 .catalog
752 .resolve_default(&parsed_model, None, &args.cwd)
753 };
754
755 let model = spec.model.clone();
756
757 let restored_messages: Vec<_> = context
758 .messages()
759 .iter()
760 .filter(|m| !m.is_system())
761 .cloned()
762 .collect();
763
764 let session = Session::new(
765 spec,
766 args.cwd.clone(),
767 map_acp_mcp_servers(args.mcp_servers),
768 Some(restored_messages),
769 Some(session_id.clone()),
770 )
771 .await
772 .map_err(|e| {
773 error!("Failed to create session for load: {e}");
774 acp::Error::internal_error()
775 })?;
776
777 let available_commands = session.list_available_commands().await.map_err(|e| {
778 error!("Failed to list available commands: {e}");
779 acp::Error::internal_error()
780 })?;
781
782 let acp_session_id = acp::SessionId::new(session_id.clone());
783
784 let config_options = self
785 .register_session(
786 session,
787 &session_id,
788 &acp_session_id,
789 &model,
790 meta.selected_mode,
791 None,
792 mode_catalog.modes,
793 )
794 .await;
795
796 info!("Session {session_id} loaded successfully");
797
798 let response = LoadSessionResponse::new().config_options(config_options);
799
800 let actor_handle = self.actor_handle.clone();
801 let replay_session_id = acp_session_id.clone();
802 spawn(async move {
803 replay_to_client(&events, &actor_handle, &replay_session_id).await;
804 });
805
806 self.spawn_available_commands_notification(available_commands, acp_session_id, &session_id);
807
808 Ok(response)
809 }
810
811 async fn prompt(&self, args: acp::PromptRequest) -> Result<acp::PromptResponse, acp::Error> {
812 info!("Received prompt for session: {:?}", args.session_id);
813 let session_id_str = args.session_id.0.to_string();
814
815 let (relay_tx, content, switch_model, reasoning_effort) = {
816 let mut sessions = self.sessions.lock().await;
817 let state = sessions.get_mut(&session_id_str).ok_or_else(|| {
818 error!("Session not found: {}", session_id_str);
819 acp::Error::invalid_params()
820 })?;
821
822 let content = map_acp_to_content_blocks(args.prompt);
823 let effective_model = effective_model(
824 &state.config.active_model,
825 state.config.pending_model.as_deref(),
826 );
827 validate_prompt_support(effective_model, &content)?;
828
829 let switch_model = state.config.pending_model.take().and_then(|pending| {
830 if pending == state.config.active_model {
831 None
832 } else {
833 state.config.active_model.clone_from(&pending);
834 Some(pending)
835 }
836 });
837
838 let reasoning_effort = state.config.reasoning_effort;
839
840 (
841 state.relay_tx.clone(),
842 content,
843 switch_model,
844 reasoning_effort,
845 )
846 };
847
848 let (result_tx, result_rx) = oneshot::channel();
849 relay_tx
850 .send(SessionCommand::Prompt {
851 content,
852 switch_model,
853 reasoning_effort,
854 result_tx,
855 })
856 .await
857 .map_err(|_| {
858 error!("Relay channel closed for session {}", session_id_str);
859 acp::Error::internal_error()
860 })?;
861
862 let stop_reason = result_rx
863 .await
864 .map_err(|_| {
865 error!(
866 "Relay dropped result channel for session {}",
867 session_id_str
868 );
869 acp::Error::internal_error()
870 })?
871 .map_err(|e| {
872 error!("Relay error for session {}: {}", session_id_str, e);
873 acp::Error::internal_error()
874 })?;
875
876 info!("Prompt completed with stop reason: {:?}", stop_reason);
877 Ok(PromptResponse::new(stop_reason))
878 }
879
880 async fn cancel(&self, args: acp::CancelNotification) -> Result<(), acp::Error> {
881 info!("Received cancel for session: {:?}", args.session_id);
882 let session_id_str = args.session_id.0.to_string();
883 let relay_tx = {
884 let sessions = self.sessions.lock().await;
885 sessions
886 .get(&session_id_str)
887 .ok_or_else(|| {
888 error!("Session not found for cancel: {}", session_id_str);
889 acp::Error::invalid_params()
890 })?
891 .relay_tx
892 .clone()
893 };
894
895 relay_tx.send(SessionCommand::Cancel).await.map_err(|_| {
896 error!("Relay channel closed for cancel: {}", session_id_str);
897 acp::Error::internal_error()
898 })?;
899
900 Ok(())
901 }
902
903 async fn set_session_config_option(
904 &self,
905 args: SetSessionConfigOptionRequest,
906 ) -> Result<SetSessionConfigOptionResponse, acp::Error> {
907 let session_id_str = args.session_id.0.to_string();
908 let config_id = args.config_id.0.to_string();
909 let value = args.value.0.to_string();
910
911 info!(
912 "set_session_config_option: session={}, config={}, value={}",
913 session_id_str, config_id, value
914 );
915
916 let setting = ConfigSetting::parse(&config_id, &value).map_err(|e| {
917 error!("{e}");
918 acp::Error::invalid_params()
919 })?;
920
921 let available = get_local_models().await;
922 let all_models = get_all_models(&available);
923
924 let mut sessions = self.sessions.lock().await;
925 let state = sessions.get_mut(&session_id_str).ok_or_else(|| {
926 error!("Session not found: {}", session_id_str);
927 acp::Error::invalid_params()
928 })?;
929
930 state
931 .config
932 .apply_config_change(&state.modes, &available, &setting)?;
933
934 let effective_model = effective_model(
935 &state.config.active_model,
936 state.config.pending_model.as_deref(),
937 );
938 let options = build_config_options_from_modes(
939 &state.modes,
940 &available,
941 state.config.selected_mode.as_deref(),
942 effective_model,
943 state.config.reasoning_effort,
944 &all_models,
945 &OAuthCredentialStore::default(),
946 );
947 Ok(SetSessionConfigOptionResponse::new(options))
948 }
949
950 async fn set_session_mode(
951 &self,
952 args: SetSessionModeRequest,
953 ) -> Result<SetSessionModeResponse, acp::Error> {
954 info!("Received set_session_mode request: {:?}", args);
955 Err(acp::Error::method_not_found())
956 }
957
958 async fn ext_method(&self, args: acp::ExtRequest) -> Result<acp::ExtResponse, acp::Error> {
959 info!(
960 "Received extension method: {}, params: {:?}",
961 args.method, args.params
962 );
963 let null_value: Arc<serde_json::value::RawValue> =
964 serde_json::from_str("null").expect("null is valid JSON");
965 Ok(null_value.into())
966 }
967
968 async fn ext_notification(&self, args: acp::ExtNotification) -> Result<(), acp::Error> {
969 info!(
970 "Received extension notification: {}, params: {:?}",
971 args.method, args.params
972 );
973
974 if let Ok(msg) = McpRequest::try_from(&args) {
975 match msg {
976 McpRequest::Authenticate {
977 session_id,
978 server_name,
979 } => {
980 let mcp_request_tx = {
981 let sessions = self.sessions.lock().await;
982 let session = sessions.get(&session_id);
983 session
984 .ok_or_else(|| {
985 error!(
986 "Session not found for authenticate_mcp_server: {}",
987 session_id
988 );
989 acp::Error::invalid_params()
990 })?
991 .mcp_request_tx
992 .clone()
993 };
994
995 mcp_request_tx
996 .send(McpRequest::Authenticate {
997 session_id,
998 server_name,
999 })
1000 .await
1001 .map_err(|_| {
1002 error!("MCP request channel closed for session");
1003 acp::Error::internal_error()
1004 })?;
1005 }
1006 }
1007 }
1008
1009 Ok(())
1010 }
1011}