1#![allow(clippy::significant_drop_tightening)]
7#![allow(clippy::too_many_arguments)]
8#![allow(clippy::too_many_lines)]
9#![allow(clippy::cast_possible_truncation)]
10#![allow(clippy::cast_lossless)]
11#![allow(clippy::ignored_unit_patterns)]
12#![allow(clippy::needless_pass_by_value)]
13
14use crate::agent::{AbortHandle, AgentEvent, AgentSession, QueueMode};
15use crate::agent_cx::AgentCx;
16use crate::auth::AuthStorage;
17use crate::compaction::{
18 ResolvedCompactionSettings, compact, compaction_details_to_value, prepare_compaction,
19};
20use crate::config::Config;
21use crate::error::{Error, Result};
22use crate::error_hints;
23use crate::extensions::{ExtensionManager, ExtensionUiRequest, ExtensionUiResponse};
24use crate::model::{
25 ContentBlock, ImageContent, Message, StopReason, TextContent, UserContent, UserMessage,
26};
27use crate::models::ModelEntry;
28use crate::provider_metadata::{canonical_provider_id, provider_metadata};
29use crate::providers;
30use crate::resources::ResourceLoader;
31use crate::session::SessionMessage;
32use crate::tools::{DEFAULT_MAX_BYTES, DEFAULT_MAX_LINES, truncate_tail};
33use asupersync::channel::{mpsc, oneshot};
34use asupersync::runtime::RuntimeHandle;
35use asupersync::sync::{Mutex, OwnedMutexGuard};
36use asupersync::time::{sleep, wall_now};
37use memchr::memchr_iter;
38use serde_json::{Value, json};
39use std::collections::VecDeque;
40use std::io::{self, BufRead, Write};
41use std::path::PathBuf;
42use std::sync::Arc;
43use std::sync::atomic::{AtomicBool, Ordering};
44use std::time::{Duration, Instant};
45
46fn provider_ids_match(left: &str, right: &str) -> bool {
47 let left = left.trim();
48 let right = right.trim();
49 if left.eq_ignore_ascii_case(right) {
50 return true;
51 }
52
53 let left_canonical = canonical_provider_id(left).unwrap_or(left);
54 let right_canonical = canonical_provider_id(right).unwrap_or(right);
55
56 left_canonical.eq_ignore_ascii_case(right)
57 || right_canonical.eq_ignore_ascii_case(left)
58 || left_canonical.eq_ignore_ascii_case(right_canonical)
59}
60
61#[derive(Clone)]
62pub struct RpcOptions {
63 pub config: Config,
64 pub resources: ResourceLoader,
65 pub available_models: Vec<ModelEntry>,
66 pub scoped_models: Vec<RpcScopedModel>,
67 pub auth: AuthStorage,
68 pub runtime_handle: RuntimeHandle,
69}
70
71#[derive(Debug, Clone)]
72pub struct RpcScopedModel {
73 pub model: ModelEntry,
74 pub thinking_level: Option<crate::model::ThinkingLevel>,
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78enum StreamingBehavior {
79 Steer,
80 FollowUp,
81}
82
83#[derive(Debug, Clone)]
84struct RpcStateSnapshot {
85 steering_count: usize,
86 follow_up_count: usize,
87 steering_mode: QueueMode,
88 follow_up_mode: QueueMode,
89 auto_compaction_enabled: bool,
90 auto_retry_enabled: bool,
91}
92
93impl From<&RpcSharedState> for RpcStateSnapshot {
94 fn from(state: &RpcSharedState) -> Self {
95 Self {
96 steering_count: state.steering.len(),
97 follow_up_count: state.follow_up.len(),
98 steering_mode: state.steering_mode,
99 follow_up_mode: state.follow_up_mode,
100 auto_compaction_enabled: state.auto_compaction_enabled,
101 auto_retry_enabled: state.auto_retry_enabled,
102 }
103 }
104}
105
106impl RpcStateSnapshot {
107 const fn pending_count(&self) -> usize {
108 self.steering_count + self.follow_up_count
109 }
110}
111
112use crate::config::parse_queue_mode;
113
114fn parse_streaming_behavior(value: Option<&Value>) -> Result<Option<StreamingBehavior>> {
115 let Some(value) = value else {
116 return Ok(None);
117 };
118 let Some(s) = value.as_str() else {
119 return Err(Error::validation("streamingBehavior must be a string"));
120 };
121 match s {
122 "steer" => Ok(Some(StreamingBehavior::Steer)),
123 "follow-up" | "followUp" => Ok(Some(StreamingBehavior::FollowUp)),
124 _ => Err(Error::validation(format!("Invalid streamingBehavior: {s}"))),
125 }
126}
127
128fn normalize_command_type(command_type: &str) -> &str {
129 match command_type {
130 "follow-up" | "followUp" | "queue-follow-up" | "queueFollowUp" => "follow_up",
131 "get-state" | "getState" => "get_state",
132 "set-model" | "setModel" => "set_model",
133 "set-steering-mode" | "setSteeringMode" => "set_steering_mode",
134 "set-follow-up-mode" | "setFollowUpMode" => "set_follow_up_mode",
135 "set-auto-compaction" | "setAutoCompaction" => "set_auto_compaction",
136 "set-auto-retry" | "setAutoRetry" => "set_auto_retry",
137 _ => command_type,
138 }
139}
140
141fn build_user_message(text: &str, images: &[ImageContent]) -> Message {
142 let timestamp = chrono::Utc::now().timestamp_millis();
143 if images.is_empty() {
144 return Message::User(UserMessage {
145 content: UserContent::Text(text.to_string()),
146 timestamp,
147 });
148 }
149 let mut blocks = vec![ContentBlock::Text(TextContent::new(text.to_string()))];
150 for image in images {
151 blocks.push(ContentBlock::Image(image.clone()));
152 }
153 Message::User(UserMessage {
154 content: UserContent::Blocks(blocks),
155 timestamp,
156 })
157}
158
159fn is_extension_command(message: &str, expanded: &str) -> bool {
160 message.trim_start().starts_with('/') && message == expanded
163}
164
165fn try_send_line_with_backpressure(tx: &mpsc::Sender<String>, mut line: String) -> bool {
166 loop {
167 match tx.try_send(line) {
168 Ok(()) => return true,
169 Err(mpsc::SendError::Full(unsent)) => {
170 line = unsent;
171 std::thread::sleep(Duration::from_millis(10));
172 }
173 Err(mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_)) => {
174 return false;
175 }
176 }
177 }
178}
179
180#[derive(Debug)]
181struct RpcSharedState {
182 steering: VecDeque<Message>,
183 follow_up: VecDeque<Message>,
184 steering_mode: QueueMode,
185 follow_up_mode: QueueMode,
186 auto_compaction_enabled: bool,
187 auto_retry_enabled: bool,
188}
189
190const MAX_RPC_PENDING_MESSAGES: usize = 128;
191
192impl RpcSharedState {
193 fn new(config: &Config) -> Self {
194 Self {
195 steering: VecDeque::new(),
196 follow_up: VecDeque::new(),
197 steering_mode: config.steering_queue_mode(),
198 follow_up_mode: config.follow_up_queue_mode(),
199 auto_compaction_enabled: config.compaction_enabled(),
200 auto_retry_enabled: config.retry_enabled(),
201 }
202 }
203
204 fn pending_count(&self) -> usize {
205 self.steering.len() + self.follow_up.len()
206 }
207
208 fn push_steering(&mut self, message: Message) -> Result<()> {
209 if self.steering.len() >= MAX_RPC_PENDING_MESSAGES {
210 return Err(Error::session(
211 "Steering queue is full (Do you have too many pending commands?)",
212 ));
213 }
214 self.steering.push_back(message);
215 Ok(())
216 }
217
218 fn push_follow_up(&mut self, message: Message) -> Result<()> {
219 if self.follow_up.len() >= MAX_RPC_PENDING_MESSAGES {
220 return Err(Error::session("Follow-up queue is full"));
221 }
222 self.follow_up.push_back(message);
223 Ok(())
224 }
225
226 fn pop_steering(&mut self) -> Vec<Message> {
227 match self.steering_mode {
228 QueueMode::All => self.steering.drain(..).collect(),
229 QueueMode::OneAtATime => self.steering.pop_front().into_iter().collect(),
230 }
231 }
232
233 fn pop_follow_up(&mut self) -> Vec<Message> {
234 match self.follow_up_mode {
235 QueueMode::All => self.follow_up.drain(..).collect(),
236 QueueMode::OneAtATime => self.follow_up.pop_front().into_iter().collect(),
237 }
238 }
239}
240
241struct RunningBash {
243 id: String,
244 abort_tx: oneshot::Sender<()>,
245}
246
247#[derive(Debug, Default)]
248struct RpcUiBridgeState {
249 active: Option<ExtensionUiRequest>,
250 queue: VecDeque<ExtensionUiRequest>,
251}
252
253pub async fn run_stdio(mut session: AgentSession, options: RpcOptions) -> Result<()> {
254 session.agent.set_queue_modes(
255 options.config.steering_queue_mode(),
256 options.config.follow_up_queue_mode(),
257 );
258
259 let (in_tx, in_rx) = mpsc::channel::<String>(1024);
260 let (out_tx, out_rx) = std::sync::mpsc::channel::<String>();
261
262 std::thread::spawn(move || {
263 let stdin = io::stdin();
264 let mut reader = io::BufReader::new(stdin.lock());
265 let mut line = String::new();
266 loop {
267 line.clear();
268 match reader.read_line(&mut line) {
269 Ok(0) | Err(_) => break,
270 Ok(_) => {
271 let line_to_send = std::mem::take(&mut line);
272 if !try_send_line_with_backpressure(&in_tx, line_to_send) {
275 break;
276 }
277 }
278 }
279 }
280 });
281
282 std::thread::spawn(move || {
283 let stdout = io::stdout();
284 let mut writer = io::BufWriter::new(stdout.lock());
285 for line in out_rx {
286 if writer.write_all(line.as_bytes()).is_err() {
287 break;
288 }
289 if writer.write_all(b"\n").is_err() {
290 break;
291 }
292 if writer.flush().is_err() {
293 break;
294 }
295 }
296 });
297
298 run(session, options, in_rx, out_tx).await
299}
300
301#[allow(clippy::too_many_lines)]
302#[allow(
303 clippy::significant_drop_tightening,
304 clippy::significant_drop_in_scrutinee
305)]
306pub async fn run(
307 session: AgentSession,
308 options: RpcOptions,
309 in_rx: mpsc::Receiver<String>,
310 out_tx: std::sync::mpsc::Sender<String>,
311) -> Result<()> {
312 let cx = AgentCx::for_request();
313 let session_handle = Arc::clone(&session.session);
314 let session = Arc::new(Mutex::new(session));
315 let shared_state = Arc::new(Mutex::new(RpcSharedState::new(&options.config)));
316 let is_streaming = Arc::new(AtomicBool::new(false));
317 let is_compacting = Arc::new(AtomicBool::new(false));
318 let abort_handle: Arc<Mutex<Option<AbortHandle>>> = Arc::new(Mutex::new(None));
319 let bash_state: Arc<Mutex<Option<RunningBash>>> = Arc::new(Mutex::new(None));
320 let retry_abort = Arc::new(AtomicBool::new(false));
321
322 {
323 use futures::future::BoxFuture;
324 let steering_state = Arc::clone(&shared_state);
325 let follow_state = Arc::clone(&shared_state);
326 let steering_cx = cx.clone();
327 let follow_cx = cx.clone();
328 let mut guard = session
329 .lock(&cx)
330 .await
331 .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
332 let steering_fetcher = move || -> BoxFuture<'static, Vec<Message>> {
333 let steering_state = Arc::clone(&steering_state);
334 let steering_cx = steering_cx.clone();
335 Box::pin(async move {
336 steering_state
337 .lock(&steering_cx)
338 .await
339 .map_or_else(|_| Vec::new(), |mut state| state.pop_steering())
340 })
341 };
342 let follow_fetcher = move || -> BoxFuture<'static, Vec<Message>> {
343 let follow_state = Arc::clone(&follow_state);
344 let follow_cx = follow_cx.clone();
345 Box::pin(async move {
346 follow_state
347 .lock(&follow_cx)
348 .await
349 .map_or_else(|_| Vec::new(), |mut state| state.pop_follow_up())
350 })
351 };
352 guard.agent.register_message_fetchers(
353 Some(Arc::new(steering_fetcher)),
354 Some(Arc::new(follow_fetcher)),
355 );
356 }
357
358 let rpc_extension_manager = {
362 let cx_ui = cx.clone();
363 let guard = session
364 .lock(&cx_ui)
365 .await
366 .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
367 guard
368 .extensions
369 .as_ref()
370 .map(crate::extensions::ExtensionRegion::manager)
371 .cloned()
372 };
373
374 let rpc_ui_state: Option<Arc<Mutex<RpcUiBridgeState>>> = rpc_extension_manager
375 .as_ref()
376 .map(|_| Arc::new(Mutex::new(RpcUiBridgeState::default())));
377
378 if let Some(ref manager) = rpc_extension_manager {
379 let (extension_ui_tx, extension_ui_rx) =
380 asupersync::channel::mpsc::channel::<ExtensionUiRequest>(64);
381 manager.set_ui_sender(extension_ui_tx);
382
383 let out_tx_ui = out_tx.clone();
384 let ui_state = rpc_ui_state
385 .as_ref()
386 .map(Arc::clone)
387 .expect("rpc ui state should exist when extension manager exists");
388 let manager_ui = (*manager).clone();
389 let runtime_handle_ui = options.runtime_handle.clone();
390 options.runtime_handle.spawn(async move {
391 const MAX_UI_PENDING_REQUESTS: usize = 64;
392 let cx = AgentCx::for_request();
393 while let Ok(request) = extension_ui_rx.recv(&cx).await {
394 if request.expects_response() {
395 let emit_now = {
396 let Ok(mut guard) = ui_state.lock(&cx).await else {
397 return;
398 };
399 if guard.active.is_none() {
400 guard.active = Some(request.clone());
401 true
402 } else if guard.queue.len() < MAX_UI_PENDING_REQUESTS {
403 guard.queue.push_back(request.clone());
404 false
405 } else {
406 drop(guard);
407 let _ = manager_ui.respond_ui(ExtensionUiResponse {
408 id: request.id.clone(),
409 value: None,
410 cancelled: true,
411 });
412 false
413 }
414 };
415
416 if emit_now {
417 rpc_emit_extension_ui_request(
418 &runtime_handle_ui,
419 Arc::clone(&ui_state),
420 manager_ui.clone(),
421 out_tx_ui.clone(),
422 request,
423 );
424 }
425 } else {
426 let rpc_event = request.to_rpc_event();
428 let _ = out_tx_ui.send(event(&rpc_event));
429 }
430 }
431 });
432 }
433
434 while let Ok(line) = in_rx.recv(&cx).await {
435 if line.trim().is_empty() {
436 continue;
437 }
438
439 let parsed: Value = match serde_json::from_str(&line) {
440 Ok(v) => v,
441 Err(err) => {
442 let resp = response_error(None, "parse", format!("Failed to parse command: {err}"));
443 let _ = out_tx.send(resp);
444 continue;
445 }
446 };
447
448 let Some(command_type_raw) = parsed.get("type").and_then(Value::as_str) else {
449 let resp = response_error(None, "parse", "Missing command type".to_string());
450 let _ = out_tx.send(resp);
451 continue;
452 };
453 let command_type = normalize_command_type(command_type_raw);
454
455 let id = parsed.get("id").and_then(Value::as_str).map(str::to_string);
456
457 match command_type {
458 "prompt" => {
459 let Some(message) = parsed
460 .get("message")
461 .and_then(Value::as_str)
462 .map(String::from)
463 else {
464 let resp = response_error(id, "prompt", "Missing message".to_string());
465 let _ = out_tx.send(resp);
466 continue;
467 };
468
469 let images = match parse_prompt_images(parsed.get("images")) {
470 Ok(images) => images,
471 Err(err) => {
472 let resp = response_error_with_hints(id, "prompt", &err);
473 let _ = out_tx.send(resp);
474 continue;
475 }
476 };
477
478 let streaming_behavior =
479 match parse_streaming_behavior(parsed.get("streamingBehavior")) {
480 Ok(value) => value,
481 Err(err) => {
482 let resp = response_error_with_hints(id, "prompt", &err);
483 let _ = out_tx.send(resp);
484 continue;
485 }
486 };
487
488 let expanded = options.resources.expand_input(&message);
489
490 if is_streaming.load(Ordering::SeqCst) {
491 if streaming_behavior.is_none() {
492 let resp = response_error(
493 id,
494 "prompt",
495 "Agent is currently streaming; specify streamingBehavior".to_string(),
496 );
497 let _ = out_tx.send(resp);
498 continue;
499 }
500
501 let queued_result = {
502 let mut state = shared_state
503 .lock(&cx)
504 .await
505 .map_err(|err| Error::session(format!("state lock failed: {err}")))?;
506 match streaming_behavior {
507 Some(StreamingBehavior::Steer) => {
508 state.push_steering(build_user_message(&expanded, &images))
509 }
510 Some(StreamingBehavior::FollowUp) => {
511 state.push_follow_up(build_user_message(&expanded, &images))
512 }
513 None => Ok(()), }
515 };
516
517 match queued_result {
518 Ok(()) => {
519 let _ = out_tx.send(response_ok(id, "prompt", None));
520 }
521 Err(err) => {
522 let resp = response_error_with_hints(id, "prompt", &err);
523 let _ = out_tx.send(resp);
524 }
525 }
526 continue;
527 }
528
529 let _ = out_tx.send(response_ok(id, "prompt", None));
531
532 is_streaming.store(true, Ordering::SeqCst);
533
534 let out_tx = out_tx.clone();
535 let session = Arc::clone(&session);
536 let shared_state = Arc::clone(&shared_state);
537 let is_streaming = Arc::clone(&is_streaming);
538 let is_compacting = Arc::clone(&is_compacting);
539 let abort_handle_slot = Arc::clone(&abort_handle);
540 let retry_abort = retry_abort.clone();
541 let options = options.clone();
542 let expanded = expanded.clone();
543 let runtime_handle = options.runtime_handle.clone();
544 runtime_handle.spawn(async move {
545 let cx = AgentCx::for_request();
546 run_prompt_with_retry(
547 session,
548 shared_state,
549 is_streaming,
550 is_compacting,
551 abort_handle_slot,
552 out_tx,
553 retry_abort,
554 options,
555 expanded,
556 images,
557 cx,
558 )
559 .await;
560 });
561 }
562
563 "steer" => {
564 let Some(message) = parsed
565 .get("message")
566 .and_then(Value::as_str)
567 .map(String::from)
568 else {
569 let resp = response_error(id, "steer", "Missing message".to_string());
570 let _ = out_tx.send(resp);
571 continue;
572 };
573
574 let expanded = options.resources.expand_input(&message);
575 if is_extension_command(&message, &expanded) {
576 let resp = response_error(
577 id,
578 "steer",
579 "Extension commands are not allowed with steer".to_string(),
580 );
581 let _ = out_tx.send(resp);
582 continue;
583 }
584
585 if is_streaming.load(Ordering::SeqCst) {
586 let result = shared_state
587 .lock(&cx)
588 .await
589 .map_err(|err| Error::session(format!("state lock failed: {err}")))?
590 .push_steering(build_user_message(&expanded, &[]));
591
592 match result {
593 Ok(()) => {
594 let _ = out_tx.send(response_ok(id, "steer", None));
595 }
596 Err(err) => {
597 let _ = out_tx.send(response_error_with_hints(id, "steer", &err));
598 }
599 }
600 continue;
601 }
602
603 let _ = out_tx.send(response_ok(id, "steer", None));
604
605 is_streaming.store(true, Ordering::SeqCst);
606
607 let out_tx = out_tx.clone();
608 let session = Arc::clone(&session);
609 let shared_state = Arc::clone(&shared_state);
610 let is_streaming = Arc::clone(&is_streaming);
611 let is_compacting = Arc::clone(&is_compacting);
612 let abort_handle_slot = Arc::clone(&abort_handle);
613 let retry_abort = retry_abort.clone();
614 let options = options.clone();
615 let expanded = expanded.clone();
616 let runtime_handle = options.runtime_handle.clone();
617 runtime_handle.spawn(async move {
618 let cx = AgentCx::for_request();
619 run_prompt_with_retry(
620 session,
621 shared_state,
622 is_streaming,
623 is_compacting,
624 abort_handle_slot,
625 out_tx,
626 retry_abort,
627 options,
628 expanded,
629 Vec::new(),
630 cx,
631 )
632 .await;
633 });
634 }
635
636 "follow_up" => {
637 let Some(message) = parsed
638 .get("message")
639 .and_then(Value::as_str)
640 .map(String::from)
641 else {
642 let resp = response_error(id, "follow_up", "Missing message".to_string());
643 let _ = out_tx.send(resp);
644 continue;
645 };
646
647 let expanded = options.resources.expand_input(&message);
648 if is_extension_command(&message, &expanded) {
649 let resp = response_error(
650 id,
651 "follow_up",
652 "Extension commands are not allowed with follow_up".to_string(),
653 );
654 let _ = out_tx.send(resp);
655 continue;
656 }
657
658 if is_streaming.load(Ordering::SeqCst) {
659 let result = shared_state
660 .lock(&cx)
661 .await
662 .map_err(|err| Error::session(format!("state lock failed: {err}")))?
663 .push_follow_up(build_user_message(&expanded, &[]));
664
665 match result {
666 Ok(()) => {
667 let _ = out_tx.send(response_ok(id, "follow_up", None));
668 }
669 Err(err) => {
670 let _ = out_tx.send(response_error_with_hints(id, "follow_up", &err));
671 }
672 }
673 continue;
674 }
675
676 let _ = out_tx.send(response_ok(id, "follow_up", None));
677
678 is_streaming.store(true, Ordering::SeqCst);
679
680 let out_tx = out_tx.clone();
681 let session = Arc::clone(&session);
682 let shared_state = Arc::clone(&shared_state);
683 let is_streaming = Arc::clone(&is_streaming);
684 let is_compacting = Arc::clone(&is_compacting);
685 let abort_handle_slot = Arc::clone(&abort_handle);
686 let retry_abort = retry_abort.clone();
687 let options = options.clone();
688 let expanded = expanded.clone();
689 let runtime_handle = options.runtime_handle.clone();
690 runtime_handle.spawn(async move {
691 let cx = AgentCx::for_request();
692 run_prompt_with_retry(
693 session,
694 shared_state,
695 is_streaming,
696 is_compacting,
697 abort_handle_slot,
698 out_tx,
699 retry_abort,
700 options,
701 expanded,
702 Vec::new(),
703 cx,
704 )
705 .await;
706 });
707 }
708
709 "abort" => {
710 let handle = abort_handle
711 .lock(&cx)
712 .await
713 .map_err(|err| Error::session(format!("abort lock failed: {err}")))?
714 .clone();
715 if let Some(handle) = handle {
716 handle.abort();
717 }
718 let _ = out_tx.send(response_ok(id, "abort", None));
719 }
720
721 "get_state" => {
722 let snapshot = {
723 let state = shared_state
724 .lock(&cx)
725 .await
726 .map_err(|err| Error::session(format!("state lock failed: {err}")))?;
727 RpcStateSnapshot::from(&*state)
728 };
729 let data = {
730 let inner_session = session_handle.lock(&cx).await.map_err(|err| {
731 Error::session(format!("inner session lock failed: {err}"))
732 })?;
733 session_state(
734 &inner_session,
735 &options,
736 &snapshot,
737 is_streaming.load(Ordering::SeqCst),
738 is_compacting.load(Ordering::SeqCst),
739 )
740 };
741 let _ = out_tx.send(response_ok(id, "get_state", Some(data)));
742 }
743
744 "get_session_stats" => {
745 let data = {
746 let inner_session = session_handle.lock(&cx).await.map_err(|err| {
747 Error::session(format!("inner session lock failed: {err}"))
748 })?;
749 session_stats(&inner_session)
750 };
751 let _ = out_tx.send(response_ok(id, "get_session_stats", Some(data)));
752 }
753
754 "get_messages" => {
755 let messages = {
756 let inner_session = session_handle.lock(&cx).await.map_err(|err| {
757 Error::session(format!("inner session lock failed: {err}"))
758 })?;
759 inner_session
760 .entries_for_current_path()
761 .iter()
762 .filter_map(|entry| match entry {
763 crate::session::SessionEntry::Message(msg) => match msg.message {
764 SessionMessage::User { .. }
765 | SessionMessage::Assistant { .. }
766 | SessionMessage::ToolResult { .. }
767 | SessionMessage::BashExecution { .. } => Some(msg.message.clone()),
768 _ => None,
769 },
770 _ => None,
771 })
772 .collect::<Vec<_>>()
773 };
774 let messages = messages
775 .into_iter()
776 .map(rpc_session_message_value)
777 .collect::<Vec<_>>();
778 let _ = out_tx.send(response_ok(
779 id,
780 "get_messages",
781 Some(json!({ "messages": messages })),
782 ));
783 }
784
785 "get_available_models" => {
786 let models = options
787 .available_models
788 .iter()
789 .map(rpc_model_from_entry)
790 .collect::<Vec<_>>();
791 let _ = out_tx.send(response_ok(
792 id,
793 "get_available_models",
794 Some(json!({ "models": models })),
795 ));
796 }
797
798 "set_model" => {
799 let Some(provider) = parsed.get("provider").and_then(Value::as_str) else {
800 let _ = out_tx.send(response_error(
801 id,
802 "set_model",
803 "Missing provider".to_string(),
804 ));
805 continue;
806 };
807 let Some(model_id) = parsed.get("modelId").and_then(Value::as_str) else {
808 let _ = out_tx.send(response_error(
809 id,
810 "set_model",
811 "Missing modelId".to_string(),
812 ));
813 continue;
814 };
815
816 let Some(entry) = options
817 .available_models
818 .iter()
819 .find(|m| {
820 provider_ids_match(&m.model.provider, provider)
821 && m.model.id.eq_ignore_ascii_case(model_id)
822 })
823 .cloned()
824 else {
825 let _ = out_tx.send(response_error(
826 id,
827 "set_model",
828 format!("Model not found: {provider}/{model_id}"),
829 ));
830 continue;
831 };
832
833 let key = resolve_model_key(&options.auth, &entry);
834 if model_requires_configured_credential(&entry) && key.is_none() {
835 let err = Error::auth(format!(
836 "Missing credentials for {}/{}",
837 entry.model.provider, entry.model.id
838 ));
839 let _ = out_tx.send(response_error_with_hints(id, "set_model", &err));
840 continue;
841 }
842
843 let result: Result<()> = async {
844 let mut guard = session
845 .lock(&cx)
846 .await
847 .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
848 let provider_impl = providers::create_provider(
849 &entry,
850 guard
851 .extensions
852 .as_ref()
853 .map(crate::extensions::ExtensionRegion::manager),
854 )?;
855 guard.agent.set_provider(provider_impl);
856 guard.agent.stream_options_mut().api_key.clone_from(&key);
857 guard
858 .agent
859 .stream_options_mut()
860 .headers
861 .clone_from(&entry.headers);
862
863 apply_model_change(&mut guard, &entry).await?;
864
865 let current_thinking = guard
866 .agent
867 .stream_options()
868 .thinking_level
869 .unwrap_or_default();
870 let clamped = entry.clamp_thinking_level(current_thinking);
871 if clamped != current_thinking {
872 apply_thinking_level(&mut guard, clamped).await?;
873 }
874 Ok(())
875 }
876 .await;
877
878 match result {
879 Ok(()) => {
880 let _ = out_tx.send(response_ok(
881 id,
882 "set_model",
883 Some(rpc_model_from_entry(&entry)),
884 ));
885 }
886 Err(err) => {
887 let _ = out_tx.send(response_error_with_hints(id, "set_model", &err));
888 }
889 }
890 }
891
892 "cycle_model" => {
893 let result = async {
894 let mut guard = session
895 .lock(&cx)
896 .await
897 .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
898 cycle_model_for_rpc(&mut guard, &options).await
899 }
900 .await;
901
902 match result {
903 Ok(Some((entry, thinking_level, is_scoped))) => {
904 let _ = out_tx.send(response_ok(
905 id,
906 "cycle_model",
907 Some(json!({
908 "model": rpc_model_from_entry(&entry),
909 "thinkingLevel": thinking_level.to_string(),
910 "isScoped": is_scoped,
911 })),
912 ));
913 }
914 Ok(None) => {
915 let _ =
916 out_tx.send(response_ok(id.clone(), "cycle_model", Some(Value::Null)));
917 }
918 Err(err) => {
919 let _ = out_tx.send(response_error_with_hints(id, "cycle_model", &err));
920 }
921 }
922 }
923
924 "set_thinking_level" => {
925 let Some(level) = parsed.get("level").and_then(Value::as_str) else {
926 let _ = out_tx.send(response_error(
927 id,
928 "set_thinking_level",
929 "Missing level".to_string(),
930 ));
931 continue;
932 };
933 let level = match parse_thinking_level(level) {
934 Ok(level) => level,
935 Err(err) => {
936 let _ =
937 out_tx.send(response_error_with_hints(id, "set_thinking_level", &err));
938 continue;
939 }
940 };
941
942 {
943 let mut guard = session
944 .lock(&cx)
945 .await
946 .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
947 let level = {
948 let inner_session = guard.session.lock(&cx).await.map_err(|err| {
949 Error::session(format!("inner session lock failed: {err}"))
950 })?;
951 current_model_entry(&inner_session, &options)
952 .map_or(level, |entry| entry.clamp_thinking_level(level))
953 };
954 if let Err(err) = apply_thinking_level(&mut guard, level).await {
955 let _ = out_tx.send(response_error_with_hints(
956 id.clone(),
957 "set_thinking_level",
958 &err,
959 ));
960 continue;
961 }
962 }
963 let _ = out_tx.send(response_ok(id, "set_thinking_level", None));
964 }
965
966 "cycle_thinking_level" => {
967 let next = {
968 let mut guard = session
969 .lock(&cx)
970 .await
971 .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
972 let entry = {
973 let inner_session = guard.session.lock(&cx).await.map_err(|err| {
974 Error::session(format!("inner session lock failed: {err}"))
975 })?;
976 current_model_entry(&inner_session, &options).cloned()
977 };
978 let Some(entry) = entry else {
979 let _ =
980 out_tx.send(response_ok(id, "cycle_thinking_level", Some(Value::Null)));
981 continue;
982 };
983 if !entry.model.reasoning {
984 let _ =
985 out_tx.send(response_ok(id, "cycle_thinking_level", Some(Value::Null)));
986 continue;
987 }
988
989 let levels = available_thinking_levels(&entry);
990 let current = guard
991 .agent
992 .stream_options()
993 .thinking_level
994 .unwrap_or_default();
995 let current_index = levels
996 .iter()
997 .position(|level| *level == current)
998 .unwrap_or(0);
999 let next = levels[(current_index + 1) % levels.len()];
1000 if let Err(err) = apply_thinking_level(&mut guard, next).await {
1001 let _ = out_tx.send(response_error_with_hints(
1002 id.clone(),
1003 "cycle_thinking_level",
1004 &err,
1005 ));
1006 continue;
1007 }
1008 next
1009 };
1010 let _ = out_tx.send(response_ok(
1011 id,
1012 "cycle_thinking_level",
1013 Some(json!({ "level": next.to_string() })),
1014 ));
1015 }
1016
1017 "set_steering_mode" => {
1018 let Some(mode) = parsed.get("mode").and_then(Value::as_str) else {
1019 let _ = out_tx.send(response_error(
1020 id,
1021 "set_steering_mode",
1022 "Missing mode".to_string(),
1023 ));
1024 continue;
1025 };
1026 let Some(mode) = parse_queue_mode(Some(mode)) else {
1027 let _ = out_tx.send(response_error(
1028 id,
1029 "set_steering_mode",
1030 "Invalid steering mode".to_string(),
1031 ));
1032 continue;
1033 };
1034 let mut state = shared_state
1035 .lock(&cx)
1036 .await
1037 .map_err(|err| Error::session(format!("state lock failed: {err}")))?;
1038 state.steering_mode = mode;
1039 drop(state);
1040 let _ = out_tx.send(response_ok(id, "set_steering_mode", None));
1041 }
1042
1043 "set_follow_up_mode" => {
1044 let Some(mode) = parsed.get("mode").and_then(Value::as_str) else {
1045 let _ = out_tx.send(response_error(
1046 id,
1047 "set_follow_up_mode",
1048 "Missing mode".to_string(),
1049 ));
1050 continue;
1051 };
1052 let Some(mode) = parse_queue_mode(Some(mode)) else {
1053 let _ = out_tx.send(response_error(
1054 id,
1055 "set_follow_up_mode",
1056 "Invalid follow-up mode".to_string(),
1057 ));
1058 continue;
1059 };
1060 let mut state = shared_state
1061 .lock(&cx)
1062 .await
1063 .map_err(|err| Error::session(format!("state lock failed: {err}")))?;
1064 state.follow_up_mode = mode;
1065 drop(state);
1066 let _ = out_tx.send(response_ok(id, "set_follow_up_mode", None));
1067 }
1068
1069 "set_auto_compaction" => {
1070 let Some(enabled) = parsed.get("enabled").and_then(Value::as_bool) else {
1071 let _ = out_tx.send(response_error(
1072 id,
1073 "set_auto_compaction",
1074 "Missing enabled".to_string(),
1075 ));
1076 continue;
1077 };
1078 let mut state = shared_state
1079 .lock(&cx)
1080 .await
1081 .map_err(|err| Error::session(format!("state lock failed: {err}")))?;
1082 state.auto_compaction_enabled = enabled;
1083 drop(state);
1084 let _ = out_tx.send(response_ok(id, "set_auto_compaction", None));
1085 }
1086
1087 "set_auto_retry" => {
1088 let Some(enabled) = parsed.get("enabled").and_then(Value::as_bool) else {
1089 let _ = out_tx.send(response_error(
1090 id,
1091 "set_auto_retry",
1092 "Missing enabled".to_string(),
1093 ));
1094 continue;
1095 };
1096 let mut state = shared_state
1097 .lock(&cx)
1098 .await
1099 .map_err(|err| Error::session(format!("state lock failed: {err}")))?;
1100 state.auto_retry_enabled = enabled;
1101 drop(state);
1102 let _ = out_tx.send(response_ok(id, "set_auto_retry", None));
1103 }
1104
1105 "abort_retry" => {
1106 retry_abort.store(true, Ordering::SeqCst);
1107 let _ = out_tx.send(response_ok(id, "abort_retry", None));
1108 }
1109
1110 "set_session_name" => {
1111 let Some(name) = parsed.get("name").and_then(Value::as_str) else {
1112 let _ = out_tx.send(response_error(
1113 id,
1114 "set_session_name",
1115 "Missing name".to_string(),
1116 ));
1117 continue;
1118 };
1119 let result: Result<()> = async {
1120 let mut guard = session
1121 .lock(&cx)
1122 .await
1123 .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
1124 {
1125 let mut inner_session = guard.session.lock(&cx).await.map_err(|err| {
1126 Error::session(format!("inner session lock failed: {err}"))
1127 })?;
1128 inner_session.append_session_info(Some(name.to_string()));
1129 }
1130 guard.persist_session().await?;
1131 Ok(())
1132 }
1133 .await;
1134
1135 match result {
1136 Ok(()) => {
1137 let _ = out_tx.send(response_ok(id, "set_session_name", None));
1138 }
1139 Err(err) => {
1140 let _ =
1141 out_tx.send(response_error_with_hints(id, "set_session_name", &err));
1142 }
1143 }
1144 }
1145
1146 "get_last_assistant_text" => {
1147 let text = {
1148 let inner_session = session_handle.lock(&cx).await.map_err(|err| {
1149 Error::session(format!("inner session lock failed: {err}"))
1150 })?;
1151 last_assistant_text(&inner_session)
1152 };
1153 let _ = out_tx.send(response_ok(
1154 id,
1155 "get_last_assistant_text",
1156 Some(json!({ "text": text })),
1157 ));
1158 }
1159
1160 "export_html" => {
1161 let output_path = parsed
1162 .get("outputPath")
1163 .and_then(Value::as_str)
1164 .map(str::to_string);
1165 let snapshot = {
1170 let guard = session
1171 .lock(&cx)
1172 .await
1173 .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
1174 let inner = guard.session.lock(&cx).await.map_err(|err| {
1175 Error::session(format!("inner session lock failed: {err}"))
1176 })?;
1177 inner.export_snapshot()
1178 };
1179 match export_html_snapshot(&snapshot, output_path.as_deref()).await {
1180 Ok(path) => {
1181 let _ = out_tx.send(response_ok(
1182 id,
1183 "export_html",
1184 Some(json!({ "path": path })),
1185 ));
1186 }
1187 Err(err) => {
1188 let _ = out_tx.send(response_error_with_hints(id, "export_html", &err));
1189 }
1190 }
1191 }
1192
1193 "bash" => {
1194 let Some(command) = parsed.get("command").and_then(Value::as_str) else {
1195 let _ = out_tx.send(response_error(id, "bash", "Missing command".to_string()));
1196 continue;
1197 };
1198
1199 let mut running = bash_state
1200 .lock(&cx)
1201 .await
1202 .map_err(|err| Error::session(format!("bash state lock failed: {err}")))?;
1203 if running.is_some() {
1204 let _ = out_tx.send(response_error(
1205 id,
1206 "bash",
1207 "Bash command already running".to_string(),
1208 ));
1209 continue;
1210 }
1211
1212 let run_id = uuid::Uuid::new_v4().to_string();
1213 let (abort_tx, abort_rx) = oneshot::channel();
1214 *running = Some(RunningBash {
1215 id: run_id.clone(),
1216 abort_tx,
1217 });
1218
1219 let out_tx = out_tx.clone();
1220 let session = Arc::clone(&session);
1221 let bash_state = Arc::clone(&bash_state);
1222 let command = command.to_string();
1223 let id_clone = id.clone();
1224 let runtime_handle = options.runtime_handle.clone();
1225
1226 runtime_handle.spawn(async move {
1227 let cx = AgentCx::for_request();
1228 let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
1229 let result = run_bash_rpc(&cwd, &command, abort_rx).await;
1230
1231 let response = match result {
1232 Ok(result) => {
1233 if let Ok(mut guard) = session.lock(&cx).await {
1234 if let Ok(mut inner_session) = guard.session.lock(&cx).await {
1235 inner_session.append_message(SessionMessage::BashExecution {
1236 command: command.clone(),
1237 output: result.output.clone(),
1238 exit_code: result.exit_code,
1239 cancelled: Some(result.cancelled),
1240 truncated: Some(result.truncated),
1241 full_output_path: result.full_output_path.clone(),
1242 timestamp: Some(chrono::Utc::now().timestamp_millis()),
1243 extra: std::collections::HashMap::default(),
1244 });
1245 }
1246 let _ = guard.persist_session().await;
1247 }
1248
1249 response_ok(
1250 id_clone,
1251 "bash",
1252 Some(json!({
1253 "output": result.output,
1254 "exitCode": result.exit_code,
1255 "cancelled": result.cancelled,
1256 "truncated": result.truncated,
1257 "fullOutputPath": result.full_output_path,
1258 })),
1259 )
1260 }
1261 Err(err) => response_error_with_hints(id_clone, "bash", &err),
1262 };
1263
1264 let _ = out_tx.send(response);
1265 if let Ok(mut running) = bash_state.lock(&cx).await {
1266 if running.as_ref().is_some_and(|r| r.id == run_id) {
1267 *running = None;
1268 }
1269 }
1270 });
1271 }
1272
1273 "abort_bash" => {
1274 let mut running = bash_state
1275 .lock(&cx)
1276 .await
1277 .map_err(|err| Error::session(format!("bash state lock failed: {err}")))?;
1278 if let Some(running_bash) = running.take() {
1279 let _ = running_bash.abort_tx.send(&cx, ());
1280 }
1281 let _ = out_tx.send(response_ok(id, "abort_bash", None));
1282 }
1283
1284 "compact" => {
1285 let custom_instructions = parsed
1286 .get("customInstructions")
1287 .and_then(Value::as_str)
1288 .map(str::to_string);
1289
1290 let result: Result<Value> = async {
1291 let mut guard = session
1292 .lock(&cx)
1293 .await
1294 .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
1295 let path_entries = {
1296 let mut inner_session = guard.session.lock(&cx).await.map_err(|err| {
1297 Error::session(format!("inner session lock failed: {err}"))
1298 })?;
1299 inner_session.ensure_entry_ids();
1300 inner_session
1301 .entries_for_current_path()
1302 .into_iter()
1303 .cloned()
1304 .collect::<Vec<_>>()
1305 };
1306
1307 let key = guard
1308 .agent
1309 .stream_options()
1310 .api_key
1311 .as_deref()
1312 .ok_or_else(|| Error::auth("Missing API key for compaction"))?;
1313
1314 let provider = guard.agent.provider();
1315
1316 let settings = ResolvedCompactionSettings {
1317 enabled: options.config.compaction_enabled(),
1318 reserve_tokens: options.config.compaction_reserve_tokens(),
1319 keep_recent_tokens: options.config.compaction_keep_recent_tokens(),
1320 ..Default::default()
1321 };
1322
1323 let prep = prepare_compaction(&path_entries, settings).ok_or_else(|| {
1324 Error::session(
1325 "Compaction not available (already compacted or missing IDs)",
1326 )
1327 })?;
1328
1329 is_compacting.store(true, Ordering::SeqCst);
1330 let compact_res =
1331 compact(prep, provider, key, custom_instructions.as_deref()).await;
1332 is_compacting.store(false, Ordering::SeqCst);
1333 let result_data = compact_res?;
1334
1335 let details_value = compaction_details_to_value(&result_data.details)?;
1336
1337 let messages = {
1338 let mut inner_session = guard.session.lock(&cx).await.map_err(|err| {
1339 Error::session(format!("inner session lock failed: {err}"))
1340 })?;
1341 inner_session.append_compaction(
1342 result_data.summary.clone(),
1343 result_data.first_kept_entry_id.clone(),
1344 result_data.tokens_before,
1345 Some(details_value.clone()),
1346 None,
1347 );
1348 inner_session.to_messages_for_current_path()
1349 };
1350 guard.persist_session().await?;
1351 guard.agent.replace_messages(messages);
1352
1353 Ok(json!({
1354 "summary": result_data.summary,
1355 "firstKeptEntryId": result_data.first_kept_entry_id,
1356 "tokensBefore": result_data.tokens_before,
1357 "details": details_value,
1358 }))
1359 }
1360 .await;
1361
1362 match result {
1363 Ok(data) => {
1364 let _ = out_tx.send(response_ok(id, "compact", Some(data)));
1365 }
1366 Err(err) => {
1367 let _ = out_tx.send(response_error_with_hints(id, "compact", &err));
1368 }
1369 }
1370 }
1371
1372 "new_session" => {
1373 let parent = parsed
1374 .get("parentSession")
1375 .and_then(Value::as_str)
1376 .map(str::to_string);
1377 {
1378 let mut guard = session
1379 .lock(&cx)
1380 .await
1381 .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
1382 let (session_dir, provider, model_id, thinking_level) = {
1383 let inner_session = guard.session.lock(&cx).await.map_err(|err| {
1384 Error::session(format!("inner session lock failed: {err}"))
1385 })?;
1386 (
1387 inner_session.session_dir.clone(),
1388 inner_session.header.provider.clone(),
1389 inner_session.header.model_id.clone(),
1390 inner_session.header.thinking_level.clone(),
1391 )
1392 };
1393 let mut new_session = if guard.save_enabled() {
1394 crate::session::Session::create_with_dir(session_dir)
1395 } else {
1396 crate::session::Session::in_memory()
1397 };
1398 new_session.header.parent_session = parent;
1399 new_session.header.provider.clone_from(&provider);
1401 new_session.header.model_id.clone_from(&model_id);
1402 new_session
1403 .header
1404 .thinking_level
1405 .clone_from(&thinking_level);
1406
1407 let session_id = new_session.header.id.clone();
1408 {
1409 let mut inner_session = guard.session.lock(&cx).await.map_err(|err| {
1410 Error::session(format!("inner session lock failed: {err}"))
1411 })?;
1412 *inner_session = new_session;
1413 }
1414 guard.agent.clear_messages();
1415 guard.agent.stream_options_mut().session_id = Some(session_id);
1416 }
1417 {
1418 let mut state = shared_state
1419 .lock(&cx)
1420 .await
1421 .map_err(|err| Error::session(format!("state lock failed: {err}")))?;
1422 state.steering.clear();
1423 state.follow_up.clear();
1424 }
1425 let _ = out_tx.send(response_ok(
1426 id,
1427 "new_session",
1428 Some(json!({ "cancelled": false })),
1429 ));
1430 }
1431
1432 "switch_session" => {
1433 let Some(session_path) = parsed.get("sessionPath").and_then(Value::as_str) else {
1434 let _ = out_tx.send(response_error(
1435 id,
1436 "switch_session",
1437 "Missing sessionPath".to_string(),
1438 ));
1439 continue;
1440 };
1441
1442 let loaded = crate::session::Session::open(session_path).await;
1443 match loaded {
1444 Ok(new_session) => {
1445 let messages = new_session.to_messages_for_current_path();
1446 let session_id = new_session.header.id.clone();
1447 let mut guard = session
1448 .lock(&cx)
1449 .await
1450 .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
1451 {
1452 let mut inner_session =
1453 guard.session.lock(&cx).await.map_err(|err| {
1454 Error::session(format!("inner session lock failed: {err}"))
1455 })?;
1456 *inner_session = new_session;
1457 }
1458 guard.agent.replace_messages(messages);
1459 guard.agent.stream_options_mut().session_id = Some(session_id);
1460 let _ = out_tx.send(response_ok(
1461 id,
1462 "switch_session",
1463 Some(json!({ "cancelled": false })),
1464 ));
1465 let mut state = shared_state
1466 .lock(&cx)
1467 .await
1468 .map_err(|err| Error::session(format!("state lock failed: {err}")))?;
1469 state.steering.clear();
1470 state.follow_up.clear();
1471 }
1472 Err(err) => {
1473 let _ = out_tx.send(response_error_with_hints(id, "switch_session", &err));
1474 }
1475 }
1476 }
1477
1478 "fork" => {
1479 let Some(entry_id) = parsed.get("entryId").and_then(Value::as_str) else {
1480 let _ = out_tx.send(response_error(id, "fork", "Missing entryId".to_string()));
1481 continue;
1482 };
1483
1484 let result: Result<String> =
1485 async {
1486 let (fork_plan, parent_path, session_dir, save_enabled, header_snapshot) = {
1488 let guard = session.lock(&cx).await.map_err(|err| {
1489 Error::session(format!("session lock failed: {err}"))
1490 })?;
1491 let inner = guard.session.lock(&cx).await.map_err(|err| {
1492 Error::session(format!("inner session lock failed: {err}"))
1493 })?;
1494 let plan = inner.plan_fork_from_user_message(entry_id)?;
1495 let parent_path = inner.path.as_ref().map(|p| p.display().to_string());
1496 let session_dir = inner.session_dir.clone();
1497 let header = inner.header.clone();
1498 (plan, parent_path, session_dir, guard.save_enabled(), header)
1499 };
1501
1502 let crate::session::ForkPlan {
1504 entries,
1505 leaf_id,
1506 selected_text,
1507 } = fork_plan;
1508
1509 let mut new_session = if save_enabled {
1510 crate::session::Session::create_with_dir(session_dir)
1511 } else {
1512 crate::session::Session::in_memory()
1513 };
1514 new_session.header.parent_session = parent_path;
1515 new_session
1516 .header
1517 .provider
1518 .clone_from(&header_snapshot.provider);
1519 new_session
1520 .header
1521 .model_id
1522 .clone_from(&header_snapshot.model_id);
1523 new_session
1524 .header
1525 .thinking_level
1526 .clone_from(&header_snapshot.thinking_level);
1527 new_session.entries = entries;
1528 new_session.leaf_id = leaf_id;
1529 new_session.ensure_entry_ids();
1530
1531 let messages = new_session.to_messages_for_current_path();
1532 let session_id = new_session.header.id.clone();
1533
1534 {
1536 let mut guard = session.lock(&cx).await.map_err(|err| {
1537 Error::session(format!("session lock failed: {err}"))
1538 })?;
1539 let mut inner = guard.session.lock(&cx).await.map_err(|err| {
1540 Error::session(format!("inner session lock failed: {err}"))
1541 })?;
1542 *inner = new_session;
1543 drop(inner);
1544 guard.agent.replace_messages(messages);
1545 guard.agent.stream_options_mut().session_id = Some(session_id);
1546 }
1547
1548 {
1549 let mut state = shared_state.lock(&cx).await.map_err(|err| {
1550 Error::session(format!("state lock failed: {err}"))
1551 })?;
1552 state.steering.clear();
1553 state.follow_up.clear();
1554 }
1555
1556 Ok(selected_text)
1557 }
1558 .await;
1559
1560 match result {
1561 Ok(selected_text) => {
1562 let _ = out_tx.send(response_ok(
1563 id,
1564 "fork",
1565 Some(json!({ "text": selected_text, "cancelled": false })),
1566 ));
1567 }
1568 Err(err) => {
1569 let _ = out_tx.send(response_error_with_hints(id, "fork", &err));
1570 }
1571 }
1572 }
1573
1574 "get_fork_messages" => {
1575 let path_entries = {
1577 let guard = session
1578 .lock(&cx)
1579 .await
1580 .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
1581 let inner_session = guard.session.lock(&cx).await.map_err(|err| {
1582 Error::session(format!("inner session lock failed: {err}"))
1583 })?;
1584 inner_session
1585 .entries_for_current_path()
1586 .into_iter()
1587 .cloned()
1588 .collect::<Vec<_>>()
1589 };
1590 let messages = fork_messages_from_entries(&path_entries);
1591 let _ = out_tx.send(response_ok(
1592 id,
1593 "get_fork_messages",
1594 Some(json!({ "messages": messages })),
1595 ));
1596 }
1597
1598 "get_commands" => {
1599 let commands = options.resources.list_commands();
1600 let _ = out_tx.send(response_ok(
1601 id,
1602 "get_commands",
1603 Some(json!({ "commands": commands })),
1604 ));
1605 }
1606
1607 "extension_ui_response" => {
1608 if let (Some(manager), Some(ui_state)) =
1609 (rpc_extension_manager.as_ref(), rpc_ui_state.as_ref())
1610 {
1611 let Some(request_id) = rpc_parse_extension_ui_response_id(&parsed) else {
1612 let _ = out_tx.send(response_error(
1613 id,
1614 "extension_ui_response",
1615 "Missing requestId (or id) field",
1616 ));
1617 continue;
1618 };
1619
1620 let (response, next_request) = {
1621 let Ok(mut guard) = ui_state.lock(&cx).await else {
1622 let _ = out_tx.send(response_error(
1623 id,
1624 "extension_ui_response",
1625 "Extension UI bridge unavailable",
1626 ));
1627 continue;
1628 };
1629
1630 let Some(active) = guard.active.clone() else {
1631 let _ = out_tx.send(response_error(
1632 id,
1633 "extension_ui_response",
1634 "No active extension UI request",
1635 ));
1636 continue;
1637 };
1638
1639 if active.id != request_id {
1640 let _ = out_tx.send(response_error(
1641 id,
1642 "extension_ui_response",
1643 format!(
1644 "Unexpected requestId: {request_id} (active: {})",
1645 active.id
1646 ),
1647 ));
1648 continue;
1649 }
1650
1651 let response = match rpc_parse_extension_ui_response(&parsed, &active) {
1652 Ok(response) => response,
1653 Err(message) => {
1654 let _ = out_tx.send(response_error(
1655 id,
1656 "extension_ui_response",
1657 message,
1658 ));
1659 continue;
1660 }
1661 };
1662
1663 guard.active = None;
1664 let next = guard.queue.pop_front();
1665 if let Some(ref next) = next {
1666 guard.active = Some(next.clone());
1667 }
1668 (response, next)
1669 };
1670
1671 let resolved = manager.respond_ui(response);
1672 let _ = out_tx.send(response_ok(
1673 id,
1674 "extension_ui_response",
1675 Some(json!({ "resolved": resolved })),
1676 ));
1677
1678 if let Some(next) = next_request {
1679 rpc_emit_extension_ui_request(
1680 &options.runtime_handle,
1681 Arc::clone(ui_state),
1682 (*manager).clone(),
1683 out_tx.clone(),
1684 next,
1685 );
1686 }
1687 } else {
1688 let _ = out_tx.send(response_ok(id, "extension_ui_response", None));
1689 }
1690 }
1691
1692 _ => {
1693 let _ = out_tx.send(response_error(
1694 id,
1695 command_type_raw,
1696 format!("Unknown command: {command_type_raw}"),
1697 ));
1698 }
1699 }
1700 }
1701
1702 let extension_region = session
1706 .lock(&cx)
1707 .await
1708 .ok()
1709 .and_then(|mut guard| guard.extensions.take());
1710 if let Some(ext) = extension_region {
1711 ext.shutdown().await;
1712 }
1713
1714 Ok(())
1715}
1716
1717#[allow(clippy::too_many_lines)]
1722async fn run_prompt_with_retry(
1723 session: Arc<Mutex<AgentSession>>,
1724 shared_state: Arc<Mutex<RpcSharedState>>,
1725 is_streaming: Arc<AtomicBool>,
1726 is_compacting: Arc<AtomicBool>,
1727 abort_handle_slot: Arc<Mutex<Option<AbortHandle>>>,
1728 out_tx: std::sync::mpsc::Sender<String>,
1729 retry_abort: Arc<AtomicBool>,
1730 options: RpcOptions,
1731 message: String,
1732 images: Vec<ImageContent>,
1733 cx: AgentCx,
1734) {
1735 retry_abort.store(false, Ordering::SeqCst);
1736 is_streaming.store(true, Ordering::SeqCst);
1737
1738 let max_retries = options.config.retry_max_retries();
1739 let mut retry_count: u32 = 0;
1740 let mut success = false;
1741 let mut final_error: Option<String> = None;
1742 let mut final_error_hints: Option<Value> = None;
1743
1744 loop {
1745 let (abort_handle, abort_signal) = AbortHandle::new();
1746 if let Ok(mut guard) = OwnedMutexGuard::lock(Arc::clone(&abort_handle_slot), &cx).await {
1747 *guard = Some(abort_handle);
1748 } else {
1749 is_streaming.store(false, Ordering::SeqCst);
1750 return;
1751 }
1752
1753 let runtime_for_events = options.runtime_handle.clone();
1754
1755 let result = {
1756 let mut guard = match OwnedMutexGuard::lock(Arc::clone(&session), &cx).await {
1757 Ok(guard) => guard,
1758 Err(err) => {
1759 final_error = Some(format!("session lock failed: {err}"));
1760 final_error_hints = None;
1761 break;
1762 }
1763 };
1764 let extensions = guard.extensions.as_ref().map(|r| r.manager().clone());
1765 let runtime_for_events_handler = runtime_for_events.clone();
1766 let event_tx = out_tx.clone();
1767 let coalescer = extensions
1768 .as_ref()
1769 .map(|m| crate::extensions::EventCoalescer::new(m.clone()));
1770 let event_handler = move |event: AgentEvent| {
1771 let serialized = if let AgentEvent::AgentEnd {
1772 messages, error, ..
1773 } = &event
1774 {
1775 json!({
1776 "type": "agent_end",
1777 "messages": messages,
1778 "error": error,
1779 })
1780 .to_string()
1781 } else {
1782 serde_json::to_string(&event).unwrap_or_else(|err| {
1783 json!({
1784 "type": "event_serialize_error",
1785 "error": err.to_string(),
1786 })
1787 .to_string()
1788 })
1789 };
1790 let _ = event_tx.send(serialized);
1791 if let Some(coal) = &coalescer {
1794 coal.dispatch_agent_event_lazy(&event, &runtime_for_events_handler);
1795 }
1796 };
1797
1798 if images.is_empty() {
1799 guard
1800 .run_text_with_abort(message.clone(), Some(abort_signal), event_handler)
1801 .await
1802 } else {
1803 let mut blocks = vec![ContentBlock::Text(TextContent::new(message.clone()))];
1804 for image in &images {
1805 blocks.push(ContentBlock::Image(image.clone()));
1806 }
1807 guard
1808 .run_with_content_with_abort(blocks, Some(abort_signal), event_handler)
1809 .await
1810 }
1811 };
1812
1813 if let Ok(mut guard) = OwnedMutexGuard::lock(Arc::clone(&abort_handle_slot), &cx).await {
1814 *guard = None;
1815 }
1816
1817 match result {
1818 Ok(message) => {
1819 if matches!(message.stop_reason, StopReason::Error | StopReason::Aborted) {
1820 final_error = message
1821 .error_message
1822 .clone()
1823 .or_else(|| Some("Request error".to_string()));
1824 final_error_hints = None;
1825 if message.stop_reason == StopReason::Aborted {
1826 break;
1827 }
1828 if let Some(ref err_msg) = final_error {
1831 let context_window = if let Ok(guard) =
1832 OwnedMutexGuard::lock(Arc::clone(&session), &cx).await
1833 {
1834 guard.session.lock(&cx).await.map_or(None, |inner| {
1835 current_model_entry(&inner, &options)
1836 .map(|e| e.model.context_window)
1837 })
1838 } else {
1839 None
1840 };
1841 if !crate::error::is_retryable_error(
1842 err_msg,
1843 Some(message.usage.input),
1844 context_window,
1845 ) {
1846 break;
1847 }
1848 }
1849 } else {
1850 success = true;
1851 break;
1852 }
1853 }
1854 Err(err) => {
1855 let err_str = err.to_string();
1856 if !crate::error::is_retryable_error(&err_str, None, None) {
1859 final_error = Some(err_str);
1860 final_error_hints = Some(error_hints_value(&err));
1861 break;
1862 }
1863 final_error = Some(err_str);
1864 final_error_hints = Some(error_hints_value(&err));
1865 }
1866 }
1867
1868 let retry_enabled = OwnedMutexGuard::lock(Arc::clone(&shared_state), &cx)
1869 .await
1870 .is_ok_and(|state| state.auto_retry_enabled);
1871 if !retry_enabled || retry_count >= max_retries {
1872 break;
1873 }
1874
1875 retry_count += 1;
1876 let delay_ms = retry_delay_ms(&options.config, retry_count);
1877 let error_message = final_error
1878 .clone()
1879 .unwrap_or_else(|| "Request error".to_string());
1880 let _ = out_tx.send(event(&json!({
1881 "type": "auto_retry_start",
1882 "attempt": retry_count,
1883 "maxAttempts": max_retries,
1884 "delayMs": delay_ms,
1885 "errorMessage": error_message,
1886 })));
1887
1888 let delay = Duration::from_millis(delay_ms as u64);
1889 let start = std::time::Instant::now();
1890 while start.elapsed() < delay {
1891 if retry_abort.load(Ordering::SeqCst) {
1892 break;
1893 }
1894 sleep(wall_now(), Duration::from_millis(50)).await;
1895 }
1896
1897 if retry_abort.load(Ordering::SeqCst) {
1898 final_error = Some("Retry aborted".to_string());
1899 break;
1900 }
1901 }
1902
1903 if retry_count > 0 {
1904 let _ = out_tx.send(event(&json!({
1905 "type": "auto_retry_end",
1906 "success": success,
1907 "attempt": retry_count,
1908 "finalError": if success { Value::Null } else { json!(final_error.clone()) },
1909 })));
1910 }
1911
1912 is_streaming.store(false, Ordering::SeqCst);
1913
1914 if !success {
1915 if let Some(err) = final_error {
1916 let mut payload = json!({
1917 "type": "agent_end",
1918 "messages": [],
1919 "error": err
1920 });
1921 if let Some(hints) = final_error_hints {
1922 payload["errorHints"] = hints;
1923 }
1924 let _ = out_tx.send(event(&payload));
1925 }
1926 return;
1927 }
1928
1929 let auto_compaction_enabled = OwnedMutexGuard::lock(Arc::clone(&shared_state), &cx)
1930 .await
1931 .is_ok_and(|state| state.auto_compaction_enabled);
1932 if auto_compaction_enabled {
1933 maybe_auto_compact(session, options, is_compacting, out_tx).await;
1934 }
1935}
1936
1937fn response_ok(id: Option<String>, command: &str, data: Option<Value>) -> String {
1942 let mut resp = json!({
1943 "type": "response",
1944 "command": command,
1945 "success": true,
1946 });
1947 if let Some(id) = id {
1948 resp["id"] = Value::String(id);
1949 }
1950 if let Some(data) = data {
1951 resp["data"] = data;
1952 }
1953 resp.to_string()
1954}
1955
1956fn response_error(id: Option<String>, command: &str, error: impl Into<String>) -> String {
1957 let mut resp = json!({
1958 "type": "response",
1959 "command": command,
1960 "success": false,
1961 "error": error.into(),
1962 });
1963 if let Some(id) = id {
1964 resp["id"] = Value::String(id);
1965 }
1966 resp.to_string()
1967}
1968
1969fn response_error_with_hints(id: Option<String>, command: &str, error: &Error) -> String {
1970 let mut resp = json!({
1971 "type": "response",
1972 "command": command,
1973 "success": false,
1974 "error": error.to_string(),
1975 "errorHints": error_hints_value(error),
1976 });
1977 if let Some(id) = id {
1978 resp["id"] = Value::String(id);
1979 }
1980 resp.to_string()
1981}
1982
1983fn event(value: &Value) -> String {
1984 value.to_string()
1985}
1986
1987fn rpc_emit_extension_ui_request(
1988 runtime_handle: &RuntimeHandle,
1989 ui_state: Arc<Mutex<RpcUiBridgeState>>,
1990 manager: ExtensionManager,
1991 out_tx_ui: std::sync::mpsc::Sender<String>,
1992 request: ExtensionUiRequest,
1993) {
1994 let rpc_event = request.to_rpc_event();
1996 let _ = out_tx_ui.send(event(&rpc_event));
1997
1998 if !request.expects_response() {
1999 return;
2000 }
2001
2002 let Some(timeout_ms) = request.effective_timeout_ms() else {
2005 return;
2006 };
2007
2008 let fire_ms = timeout_ms.saturating_sub(10).max(1);
2010 let request_id = request.id;
2011 let ui_state_timeout = Arc::clone(&ui_state);
2012 let manager_timeout = manager;
2013 let out_tx_timeout = out_tx_ui;
2014 let runtime_handle_inner = runtime_handle.clone();
2015
2016 runtime_handle.spawn(async move {
2017 sleep(wall_now(), Duration::from_millis(fire_ms)).await;
2018 let cx = AgentCx::for_request();
2019
2020 let next = {
2021 let Ok(mut guard) = ui_state_timeout.lock(cx.cx()).await else {
2022 return;
2023 };
2024
2025 let Some(active) = guard.active.as_ref() else {
2026 return;
2027 };
2028
2029 if active.id != request_id {
2031 return;
2032 }
2033
2034 let _ = manager_timeout.respond_ui(ExtensionUiResponse {
2036 id: request_id,
2037 value: None,
2038 cancelled: true,
2039 });
2040
2041 guard.active = None;
2042 let next = guard.queue.pop_front();
2043 if let Some(ref next) = next {
2044 guard.active = Some(next.clone());
2045 }
2046 next
2047 };
2048
2049 if let Some(next) = next {
2050 rpc_emit_extension_ui_request(
2051 &runtime_handle_inner,
2052 ui_state_timeout,
2053 manager_timeout,
2054 out_tx_timeout,
2055 next,
2056 );
2057 }
2058 });
2059}
2060
2061fn rpc_parse_extension_ui_response_id(parsed: &Value) -> Option<String> {
2062 let request_id = parsed
2063 .get("requestId")
2064 .and_then(Value::as_str)
2065 .map(str::trim)
2066 .filter(|value| !value.is_empty())
2067 .map(String::from);
2068
2069 request_id.or_else(|| {
2070 parsed
2071 .get("id")
2072 .and_then(Value::as_str)
2073 .map(str::trim)
2074 .filter(|value| !value.is_empty())
2075 .map(String::from)
2076 })
2077}
2078
2079fn rpc_parse_extension_ui_response(
2080 parsed: &Value,
2081 active: &ExtensionUiRequest,
2082) -> std::result::Result<ExtensionUiResponse, String> {
2083 let cancelled = parsed
2084 .get("cancelled")
2085 .and_then(Value::as_bool)
2086 .unwrap_or(false);
2087
2088 if cancelled {
2089 return Ok(ExtensionUiResponse {
2090 id: active.id.clone(),
2091 value: None,
2092 cancelled: true,
2093 });
2094 }
2095
2096 match active.method.as_str() {
2097 "confirm" => {
2098 let value = parsed
2099 .get("confirmed")
2100 .and_then(Value::as_bool)
2101 .or_else(|| parsed.get("value").and_then(Value::as_bool))
2102 .ok_or_else(|| "confirm requires boolean `confirmed` (or `value`)".to_string())?;
2103 Ok(ExtensionUiResponse {
2104 id: active.id.clone(),
2105 value: Some(Value::Bool(value)),
2106 cancelled: false,
2107 })
2108 }
2109 "select" => {
2110 let Some(value) = parsed.get("value") else {
2111 return Err("select requires `value` field".to_string());
2112 };
2113
2114 let options = active
2115 .payload
2116 .get("options")
2117 .and_then(Value::as_array)
2118 .ok_or_else(|| "select request missing `options` array".to_string())?;
2119
2120 let mut allowed = Vec::with_capacity(options.len());
2121 for opt in options {
2122 match opt {
2123 Value::String(s) => allowed.push(Value::String(s.clone())),
2124 Value::Object(map) => {
2125 let label = map
2126 .get("label")
2127 .and_then(Value::as_str)
2128 .unwrap_or("")
2129 .trim();
2130 if label.is_empty() {
2131 continue;
2132 }
2133 if let Some(v) = map.get("value") {
2134 allowed.push(v.clone());
2135 } else {
2136 allowed.push(Value::String(label.to_string()));
2137 }
2138 }
2139 _ => {}
2140 }
2141 }
2142
2143 if !allowed.iter().any(|candidate| candidate == value) {
2144 return Err("select response value did not match any option".to_string());
2145 }
2146
2147 Ok(ExtensionUiResponse {
2148 id: active.id.clone(),
2149 value: Some(value.clone()),
2150 cancelled: false,
2151 })
2152 }
2153 "input" | "editor" => {
2154 let Some(value) = parsed.get("value") else {
2155 return Err(format!("{} requires `value` field", active.method));
2156 };
2157 if !value.is_string() {
2158 return Err(format!("{} requires string `value`", active.method));
2159 }
2160 Ok(ExtensionUiResponse {
2161 id: active.id.clone(),
2162 value: Some(value.clone()),
2163 cancelled: false,
2164 })
2165 }
2166 "notify" => Ok(ExtensionUiResponse {
2167 id: active.id.clone(),
2168 value: None,
2169 cancelled: false,
2170 }),
2171 other => Err(format!("Unsupported extension UI method: {other}")),
2172 }
2173}
2174
2175#[cfg(test)]
2176mod ui_bridge_tests {
2177 use super::*;
2178
2179 #[test]
2180 fn parse_extension_ui_response_id_prefers_request_id() {
2181 let value = json!({"type":"extension_ui_response","id":"legacy","requestId":"canonical"});
2182 assert_eq!(
2183 rpc_parse_extension_ui_response_id(&value),
2184 Some("canonical".to_string())
2185 );
2186 }
2187
2188 #[test]
2189 fn parse_extension_ui_response_id_accepts_id_alias() {
2190 let value = json!({"type":"extension_ui_response","id":"legacy"});
2191 assert_eq!(
2192 rpc_parse_extension_ui_response_id(&value),
2193 Some("legacy".to_string())
2194 );
2195 }
2196
2197 #[test]
2198 fn parse_confirm_response_accepts_confirmed_alias() {
2199 let active = ExtensionUiRequest::new("req-1", "confirm", json!({"title":"t"}));
2200 let value = json!({"type":"extension_ui_response","requestId":"req-1","confirmed":true});
2201 let resp = rpc_parse_extension_ui_response(&value, &active).expect("parse confirm");
2202 assert!(!resp.cancelled);
2203 assert_eq!(resp.value, Some(json!(true)));
2204 }
2205
2206 #[test]
2207 fn parse_confirm_response_accepts_value_bool() {
2208 let active = ExtensionUiRequest::new("req-1", "confirm", json!({"title":"t"}));
2209 let value = json!({"type":"extension_ui_response","requestId":"req-1","value":false});
2210 let resp = rpc_parse_extension_ui_response(&value, &active).expect("parse confirm");
2211 assert!(!resp.cancelled);
2212 assert_eq!(resp.value, Some(json!(false)));
2213 }
2214
2215 #[test]
2216 fn parse_cancelled_response_wins_over_value() {
2217 let active = ExtensionUiRequest::new("req-1", "confirm", json!({"title":"t"}));
2218 let value = json!({"type":"extension_ui_response","requestId":"req-1","cancelled":true,"value":true});
2219 let resp = rpc_parse_extension_ui_response(&value, &active).expect("parse cancel");
2220 assert!(resp.cancelled);
2221 assert_eq!(resp.value, None);
2222 }
2223
2224 #[test]
2225 fn parse_select_response_validates_against_options() {
2226 let active = ExtensionUiRequest::new(
2227 "req-1",
2228 "select",
2229 json!({"title":"pick","options":["A","B"]}),
2230 );
2231 let ok_value = json!({"type":"extension_ui_response","requestId":"req-1","value":"B"});
2232 let ok = rpc_parse_extension_ui_response(&ok_value, &active).expect("parse select ok");
2233 assert_eq!(ok.value, Some(json!("B")));
2234
2235 let bad_value = json!({"type":"extension_ui_response","requestId":"req-1","value":"C"});
2236 assert!(
2237 rpc_parse_extension_ui_response(&bad_value, &active).is_err(),
2238 "invalid selection should error"
2239 );
2240 }
2241
2242 #[test]
2243 fn parse_input_requires_string_value() {
2244 let active = ExtensionUiRequest::new("req-1", "input", json!({"title":"t"}));
2245 let ok_value = json!({"type":"extension_ui_response","requestId":"req-1","value":"hi"});
2246 let ok = rpc_parse_extension_ui_response(&ok_value, &active).expect("parse input ok");
2247 assert_eq!(ok.value, Some(json!("hi")));
2248
2249 let bad_value = json!({"type":"extension_ui_response","requestId":"req-1","value":123});
2250 assert!(
2251 rpc_parse_extension_ui_response(&bad_value, &active).is_err(),
2252 "non-string input should error"
2253 );
2254 }
2255
2256 #[test]
2257 fn parse_editor_requires_string_value() {
2258 let active = ExtensionUiRequest::new("req-1", "editor", json!({"title":"t"}));
2259 let ok = json!({"requestId":"req-1","value":"multi\nline"});
2260 let resp = rpc_parse_extension_ui_response(&ok, &active).expect("editor ok");
2261 assert_eq!(resp.value, Some(json!("multi\nline")));
2262
2263 let bad = json!({"requestId":"req-1","value":42});
2264 assert!(
2265 rpc_parse_extension_ui_response(&bad, &active).is_err(),
2266 "editor needs string"
2267 );
2268 }
2269
2270 #[test]
2271 fn parse_notify_returns_no_value() {
2272 let active = ExtensionUiRequest::new("req-1", "notify", json!({"title":"t"}));
2273 let val = json!({"requestId":"req-1"});
2274 let resp = rpc_parse_extension_ui_response(&val, &active).expect("notify ok");
2275 assert!(!resp.cancelled);
2276 assert!(resp.value.is_none());
2277 }
2278
2279 #[test]
2280 fn parse_unsupported_method_errors() {
2281 let active = ExtensionUiRequest::new("req-1", "custom_method", json!({}));
2282 let val = json!({"requestId":"req-1","value":"x"});
2283 let err = rpc_parse_extension_ui_response(&val, &active).unwrap_err();
2284 assert!(err.contains("Unsupported"), "err={err}");
2285 }
2286
2287 #[test]
2288 fn parse_select_missing_value_field() {
2289 let active =
2290 ExtensionUiRequest::new("req-1", "select", json!({"title":"pick","options":["A"]}));
2291 let val = json!({"requestId":"req-1"});
2292 let err = rpc_parse_extension_ui_response(&val, &active).unwrap_err();
2293 assert!(err.contains("value"), "err={err}");
2294 }
2295
2296 #[test]
2297 fn parse_confirm_missing_value_errors() {
2298 let active = ExtensionUiRequest::new("req-1", "confirm", json!({"title":"t"}));
2299 let val = json!({"requestId":"req-1"});
2300 let err = rpc_parse_extension_ui_response(&val, &active).unwrap_err();
2301 assert!(err.contains("confirm"), "err={err}");
2302 }
2303
2304 #[test]
2305 fn parse_select_with_label_value_objects() {
2306 let active = ExtensionUiRequest::new(
2307 "req-1",
2308 "select",
2309 json!({
2310 "title": "pick",
2311 "options": [
2312 {"label": "Alpha", "value": "a"},
2313 {"label": "Beta", "value": "b"},
2314 ]
2315 }),
2316 );
2317 let val = json!({"requestId":"req-1","value":"a"});
2318 let resp = rpc_parse_extension_ui_response(&val, &active).expect("select by value");
2319 assert_eq!(resp.value, Some(json!("a")));
2320 }
2321
2322 #[test]
2323 fn parse_id_rejects_empty_and_whitespace() {
2324 let val = json!({"requestId":" ","id":""});
2325 assert!(rpc_parse_extension_ui_response_id(&val).is_none());
2326 }
2327
2328 #[test]
2329 fn bridge_state_default_is_empty() {
2330 let state = RpcUiBridgeState::default();
2331 assert!(state.active.is_none());
2332 assert!(state.queue.is_empty());
2333 }
2334}
2335
2336fn error_hints_value(error: &Error) -> Value {
2337 let hint = error_hints::hints_for_error(error);
2338 json!({
2339 "summary": hint.summary,
2340 "hints": hint.hints,
2341 "contextFields": hint.context_fields,
2342 })
2343}
2344
2345fn rpc_session_message_value(message: SessionMessage) -> Value {
2346 let mut value =
2347 serde_json::to_value(message).expect("SessionMessage should always serialize to JSON");
2348 rpc_flatten_content_blocks(&mut value);
2349 value
2350}
2351
2352fn rpc_flatten_content_blocks(value: &mut Value) {
2353 let Value::Object(message_obj) = value else {
2354 return;
2355 };
2356 let Some(content) = message_obj.get_mut("content") else {
2357 return;
2358 };
2359 let Value::Array(blocks) = content else {
2360 return;
2361 };
2362
2363 for block in blocks {
2364 let Value::Object(block_obj) = block else {
2365 continue;
2366 };
2367 let Some(inner) = block_obj.remove("0") else {
2368 continue;
2369 };
2370 let Value::Object(inner_obj) = inner else {
2371 block_obj.insert("0".to_string(), inner);
2372 continue;
2373 };
2374 for (key, value) in inner_obj {
2375 block_obj.entry(key).or_insert(value);
2376 }
2377 }
2378}
2379
2380fn retry_delay_ms(config: &Config, attempt: u32) -> u32 {
2381 let base = u64::from(config.retry_base_delay_ms());
2382 let max = u64::from(config.retry_max_delay_ms());
2383 let shift = attempt.saturating_sub(1);
2384 let multiplier = 1u64.checked_shl(shift).unwrap_or(u64::MAX);
2385 let delay = base.saturating_mul(multiplier).min(max);
2386 u32::try_from(delay).unwrap_or(u32::MAX)
2387}
2388
2389#[cfg(test)]
2390mod retry_tests {
2391 use super::*;
2392 use crate::agent::{Agent, AgentConfig, AgentSession};
2393 use crate::model::{AssistantMessage, Usage};
2394 use crate::provider::Provider;
2395 use crate::resources::ResourceLoader;
2396 use crate::session::Session;
2397 use crate::tools::ToolRegistry;
2398 use async_trait::async_trait;
2399 use futures::stream;
2400 use std::path::Path;
2401 use std::pin::Pin;
2402 use std::sync::atomic::{AtomicUsize, Ordering};
2403
2404 #[derive(Debug)]
2405 struct FlakyProvider {
2406 calls: AtomicUsize,
2407 }
2408
2409 impl FlakyProvider {
2410 const fn new() -> Self {
2411 Self {
2412 calls: AtomicUsize::new(0),
2413 }
2414 }
2415 }
2416
2417 #[async_trait]
2418 #[allow(clippy::unnecessary_literal_bound)]
2419 impl Provider for FlakyProvider {
2420 fn name(&self) -> &str {
2421 "test-provider"
2422 }
2423
2424 fn api(&self) -> &str {
2425 "test-api"
2426 }
2427
2428 fn model_id(&self) -> &str {
2429 "test-model"
2430 }
2431
2432 async fn stream(
2433 &self,
2434 _context: &crate::provider::Context<'_>,
2435 _options: &crate::provider::StreamOptions,
2436 ) -> crate::error::Result<
2437 Pin<
2438 Box<
2439 dyn futures::Stream<Item = crate::error::Result<crate::model::StreamEvent>>
2440 + Send,
2441 >,
2442 >,
2443 > {
2444 let call = self.calls.fetch_add(1, Ordering::SeqCst);
2445
2446 let mut partial = AssistantMessage {
2447 content: Vec::new(),
2448 api: self.api().to_string(),
2449 provider: self.name().to_string(),
2450 model: self.model_id().to_string(),
2451 usage: Usage::default(),
2452 stop_reason: StopReason::Stop,
2453 error_message: None,
2454 timestamp: 0,
2455 };
2456
2457 let events = if call == 0 {
2458 partial.stop_reason = StopReason::Error;
2460 partial.error_message = Some("server error".to_string());
2461 vec![
2462 Ok(crate::model::StreamEvent::Start {
2463 partial: partial.clone(),
2464 }),
2465 Ok(crate::model::StreamEvent::Error {
2466 reason: StopReason::Error,
2467 error: partial,
2468 }),
2469 ]
2470 } else {
2471 vec![
2473 Ok(crate::model::StreamEvent::Start {
2474 partial: partial.clone(),
2475 }),
2476 Ok(crate::model::StreamEvent::Done {
2477 reason: StopReason::Stop,
2478 message: partial,
2479 }),
2480 ]
2481 };
2482
2483 Ok(Box::pin(stream::iter(events)))
2484 }
2485 }
2486
2487 #[derive(Debug)]
2488 struct AlwaysErrorProvider;
2489
2490 #[async_trait]
2491 #[allow(clippy::unnecessary_literal_bound)]
2492 impl Provider for AlwaysErrorProvider {
2493 fn name(&self) -> &str {
2494 "test-provider"
2495 }
2496
2497 fn api(&self) -> &str {
2498 "test-api"
2499 }
2500
2501 fn model_id(&self) -> &str {
2502 "test-model"
2503 }
2504
2505 async fn stream(
2506 &self,
2507 _context: &crate::provider::Context<'_>,
2508 _options: &crate::provider::StreamOptions,
2509 ) -> crate::error::Result<
2510 Pin<
2511 Box<
2512 dyn futures::Stream<Item = crate::error::Result<crate::model::StreamEvent>>
2513 + Send,
2514 >,
2515 >,
2516 > {
2517 let mut partial = AssistantMessage {
2518 content: Vec::new(),
2519 api: self.api().to_string(),
2520 provider: self.name().to_string(),
2521 model: self.model_id().to_string(),
2522 usage: Usage::default(),
2523 stop_reason: StopReason::Error,
2524 error_message: Some("server error".to_string()),
2525 timestamp: 0,
2526 };
2527
2528 let events = vec![
2529 Ok(crate::model::StreamEvent::Start {
2530 partial: partial.clone(),
2531 }),
2532 Ok(crate::model::StreamEvent::Error {
2533 reason: StopReason::Error,
2534 error: {
2535 partial.stop_reason = StopReason::Error;
2536 partial
2537 },
2538 }),
2539 ];
2540
2541 Ok(Box::pin(stream::iter(events)))
2542 }
2543 }
2544
2545 #[test]
2546 fn rpc_auto_retry_retries_then_succeeds() {
2547 let runtime = asupersync::runtime::RuntimeBuilder::new()
2548 .blocking_threads(1, 8)
2549 .build()
2550 .expect("runtime build");
2551 let runtime_handle = runtime.handle();
2552
2553 runtime.block_on(async move {
2554 let provider = Arc::new(FlakyProvider::new());
2555 let tools = ToolRegistry::new(&[], Path::new("."), None);
2556 let agent = Agent::new(provider, tools, AgentConfig::default());
2557 let inner_session = Arc::new(Mutex::new(Session::in_memory()));
2558 let agent_session = AgentSession::new(
2559 agent,
2560 inner_session,
2561 false,
2562 crate::compaction::ResolvedCompactionSettings::default(),
2563 );
2564
2565 let session = Arc::new(Mutex::new(agent_session));
2566
2567 let mut config = Config::default();
2568 config.retry = Some(crate::config::RetrySettings {
2569 enabled: Some(true),
2570 max_retries: Some(1),
2571 base_delay_ms: Some(1),
2572 max_delay_ms: Some(1),
2573 });
2574
2575 let mut shared = RpcSharedState::new(&config);
2576 shared.auto_compaction_enabled = false;
2577 let shared_state = Arc::new(Mutex::new(shared));
2578
2579 let is_streaming = Arc::new(AtomicBool::new(false));
2580 let is_compacting = Arc::new(AtomicBool::new(false));
2581 let abort_handle_slot: Arc<Mutex<Option<AbortHandle>>> = Arc::new(Mutex::new(None));
2582 let retry_abort = Arc::new(AtomicBool::new(false));
2583 let (out_tx, out_rx) = std::sync::mpsc::channel::<String>();
2584
2585 let auth_path = tempfile::tempdir()
2586 .expect("tempdir")
2587 .path()
2588 .join("auth.json");
2589 let auth = AuthStorage::load(auth_path).expect("auth load");
2590
2591 let options = RpcOptions {
2592 config,
2593 resources: ResourceLoader::empty(false),
2594 available_models: Vec::new(),
2595 scoped_models: Vec::new(),
2596 auth,
2597 runtime_handle,
2598 };
2599
2600 run_prompt_with_retry(
2601 session,
2602 shared_state,
2603 is_streaming,
2604 is_compacting,
2605 abort_handle_slot,
2606 out_tx,
2607 retry_abort,
2608 options,
2609 "hello".to_string(),
2610 Vec::new(),
2611 AgentCx::for_request(),
2612 )
2613 .await;
2614
2615 let mut saw_retry_start = false;
2616 let mut saw_retry_end_success = false;
2617
2618 for line in out_rx.try_iter() {
2619 let Ok(value) = serde_json::from_str::<Value>(&line) else {
2620 continue;
2621 };
2622 let Some(kind) = value.get("type").and_then(Value::as_str) else {
2623 continue;
2624 };
2625 match kind {
2626 "auto_retry_start" => {
2627 saw_retry_start = true;
2628 }
2629 "auto_retry_end" => {
2630 if value.get("success").and_then(Value::as_bool) == Some(true) {
2631 saw_retry_end_success = true;
2632 }
2633 }
2634 _ => {}
2635 }
2636 }
2637
2638 assert!(saw_retry_start, "missing auto_retry_start event");
2639 assert!(
2640 saw_retry_end_success,
2641 "missing successful auto_retry_end event"
2642 );
2643 });
2644 }
2645
2646 #[test]
2647 fn rpc_abort_retry_emits_ordered_retry_timeline() {
2648 let runtime = asupersync::runtime::RuntimeBuilder::new()
2649 .blocking_threads(1, 8)
2650 .build()
2651 .expect("runtime build");
2652 let runtime_handle = runtime.handle();
2653
2654 runtime.block_on(async move {
2655 let provider = Arc::new(AlwaysErrorProvider);
2656 let tools = ToolRegistry::new(&[], Path::new("."), None);
2657 let agent = Agent::new(provider, tools, AgentConfig::default());
2658 let inner_session = Arc::new(Mutex::new(Session::in_memory()));
2659 let agent_session = AgentSession::new(
2660 agent,
2661 inner_session,
2662 false,
2663 crate::compaction::ResolvedCompactionSettings::default(),
2664 );
2665
2666 let session = Arc::new(Mutex::new(agent_session));
2667
2668 let mut config = Config::default();
2669 config.retry = Some(crate::config::RetrySettings {
2670 enabled: Some(true),
2671 max_retries: Some(3),
2672 base_delay_ms: Some(100),
2673 max_delay_ms: Some(100),
2674 });
2675
2676 let mut shared = RpcSharedState::new(&config);
2677 shared.auto_compaction_enabled = false;
2678 let shared_state = Arc::new(Mutex::new(shared));
2679
2680 let is_streaming = Arc::new(AtomicBool::new(false));
2681 let is_compacting = Arc::new(AtomicBool::new(false));
2682 let abort_handle_slot: Arc<Mutex<Option<AbortHandle>>> = Arc::new(Mutex::new(None));
2683 let retry_abort = Arc::new(AtomicBool::new(false));
2684 let (out_tx, out_rx) = std::sync::mpsc::channel::<String>();
2685
2686 let auth_path = tempfile::tempdir()
2687 .expect("tempdir")
2688 .path()
2689 .join("auth.json");
2690 let auth = AuthStorage::load(auth_path).expect("auth load");
2691
2692 let options = RpcOptions {
2693 config,
2694 resources: ResourceLoader::empty(false),
2695 available_models: Vec::new(),
2696 scoped_models: Vec::new(),
2697 auth,
2698 runtime_handle,
2699 };
2700
2701 let retry_abort_for_thread = Arc::clone(&retry_abort);
2702 let abort_thread = std::thread::spawn(move || {
2703 std::thread::sleep(std::time::Duration::from_millis(10));
2704 retry_abort_for_thread.store(true, Ordering::SeqCst);
2705 });
2706
2707 run_prompt_with_retry(
2708 session,
2709 shared_state,
2710 is_streaming,
2711 is_compacting,
2712 abort_handle_slot,
2713 out_tx,
2714 retry_abort,
2715 options,
2716 "hello".to_string(),
2717 Vec::new(),
2718 AgentCx::for_request(),
2719 )
2720 .await;
2721 abort_thread.join().expect("abort thread join");
2722
2723 let mut timeline = Vec::new();
2724 let mut last_agent_end_error = None::<String>;
2725
2726 for line in out_rx.try_iter() {
2727 let Ok(value) = serde_json::from_str::<Value>(&line) else {
2728 continue;
2729 };
2730 let Some(kind) = value.get("type").and_then(Value::as_str) else {
2731 continue;
2732 };
2733 timeline.push(kind.to_string());
2734 if kind == "agent_end" {
2735 last_agent_end_error = value
2736 .get("error")
2737 .and_then(Value::as_str)
2738 .map(str::to_string);
2739 }
2740 }
2741
2742 let retry_start_idx = timeline
2743 .iter()
2744 .position(|kind| kind == "auto_retry_start")
2745 .expect("missing auto_retry_start");
2746 let retry_end_idx = timeline
2747 .iter()
2748 .position(|kind| kind == "auto_retry_end")
2749 .expect("missing auto_retry_end");
2750 let agent_end_idx = timeline
2751 .iter()
2752 .rposition(|kind| kind == "agent_end")
2753 .expect("missing agent_end");
2754
2755 assert!(
2756 retry_start_idx < retry_end_idx && retry_end_idx < agent_end_idx,
2757 "unexpected retry timeline ordering: {timeline:?}"
2758 );
2759 assert_eq!(
2760 last_agent_end_error.as_deref(),
2761 Some("Retry aborted"),
2762 "expected retry-abort terminal error, timeline: {timeline:?}"
2763 );
2764 });
2765 }
2766}
2767
2768fn should_auto_compact(tokens_before: u64, context_window: u32, reserve_tokens: u32) -> bool {
2769 let reserve = u64::from(reserve_tokens);
2770 let window = u64::from(context_window);
2771 tokens_before > window.saturating_sub(reserve)
2772}
2773
2774#[allow(clippy::too_many_lines)]
2775async fn maybe_auto_compact(
2776 session: Arc<Mutex<AgentSession>>,
2777 options: RpcOptions,
2778 is_compacting: Arc<AtomicBool>,
2779 out_tx: std::sync::mpsc::Sender<String>,
2780) {
2781 let cx = AgentCx::for_request();
2782 let (path_entries, context_window, reserve_tokens, settings) = {
2783 let Ok(guard) = session.lock(cx.cx()).await else {
2784 return;
2785 };
2786 let (path_entries, context_window) = {
2787 let Ok(mut inner_session) = guard.session.lock(cx.cx()).await else {
2788 return;
2789 };
2790 inner_session.ensure_entry_ids();
2791 let Some(entry) = current_model_entry(&inner_session, &options) else {
2792 return;
2793 };
2794 let path_entries = inner_session
2795 .entries_for_current_path()
2796 .into_iter()
2797 .cloned()
2798 .collect::<Vec<_>>();
2799 (path_entries, entry.model.context_window)
2800 };
2801
2802 let reserve_tokens = options.config.compaction_reserve_tokens();
2803 let settings = ResolvedCompactionSettings {
2804 enabled: true,
2805 reserve_tokens,
2806 keep_recent_tokens: options.config.compaction_keep_recent_tokens(),
2807 ..Default::default()
2808 };
2809
2810 (path_entries, context_window, reserve_tokens, settings)
2811 };
2812
2813 let Some(prep) = prepare_compaction(&path_entries, settings) else {
2814 return;
2815 };
2816 if !should_auto_compact(prep.tokens_before, context_window, reserve_tokens) {
2817 return;
2818 }
2819
2820 let _ = out_tx.send(event(&json!({
2821 "type": "auto_compaction_start",
2822 "reason": "threshold",
2823 })));
2824 is_compacting.store(true, Ordering::SeqCst);
2825
2826 let (provider, key) = {
2827 let Ok(guard) = session.lock(cx.cx()).await else {
2828 is_compacting.store(false, Ordering::SeqCst);
2829 return;
2830 };
2831 let Some(key) = guard.agent.stream_options().api_key.clone() else {
2832 is_compacting.store(false, Ordering::SeqCst);
2833 let _ = out_tx.send(event(&json!({
2834 "type": "auto_compaction_end",
2835 "result": Value::Null,
2836 "aborted": false,
2837 "willRetry": false,
2838 "errorMessage": "Missing API key for compaction",
2839 })));
2840 return;
2841 };
2842 (guard.agent.provider(), key)
2843 };
2844
2845 let result = compact(prep, provider, &key, None).await;
2846 is_compacting.store(false, Ordering::SeqCst);
2847
2848 match result {
2849 Ok(result) => {
2850 let details_value = match compaction_details_to_value(&result.details) {
2851 Ok(value) => value,
2852 Err(err) => {
2853 let _ = out_tx.send(event(&json!({
2854 "type": "auto_compaction_end",
2855 "result": Value::Null,
2856 "aborted": false,
2857 "willRetry": false,
2858 "errorMessage": err.to_string(),
2859 })));
2860 return;
2861 }
2862 };
2863
2864 let Ok(mut guard) = session.lock(cx.cx()).await else {
2865 return;
2866 };
2867 let messages = {
2868 let Ok(mut inner_session) = guard.session.lock(cx.cx()).await else {
2869 return;
2870 };
2871 inner_session.append_compaction(
2872 result.summary.clone(),
2873 result.first_kept_entry_id.clone(),
2874 result.tokens_before,
2875 Some(details_value.clone()),
2876 None,
2877 );
2878 inner_session.to_messages_for_current_path()
2879 };
2880 let _ = guard.persist_session().await;
2881 guard.agent.replace_messages(messages);
2882 drop(guard);
2883
2884 let _ = out_tx.send(event(&json!({
2885 "type": "auto_compaction_end",
2886 "result": {
2887 "summary": result.summary,
2888 "firstKeptEntryId": result.first_kept_entry_id,
2889 "tokensBefore": result.tokens_before,
2890 "details": details_value,
2891 },
2892 "aborted": false,
2893 "willRetry": false,
2894 })));
2895 }
2896 Err(err) => {
2897 let _ = out_tx.send(event(&json!({
2898 "type": "auto_compaction_end",
2899 "result": Value::Null,
2900 "aborted": false,
2901 "willRetry": false,
2902 "errorMessage": err.to_string(),
2903 })));
2904 }
2905 }
2906}
2907
2908fn rpc_model_from_entry(entry: &ModelEntry) -> Value {
2909 let input = entry
2910 .model
2911 .input
2912 .iter()
2913 .map(|t| match t {
2914 crate::provider::InputType::Text => "text",
2915 crate::provider::InputType::Image => "image",
2916 })
2917 .collect::<Vec<_>>();
2918
2919 json!({
2920 "id": entry.model.id,
2921 "name": entry.model.name,
2922 "api": entry.model.api,
2923 "provider": entry.model.provider,
2924 "baseUrl": entry.model.base_url,
2925 "reasoning": entry.model.reasoning,
2926 "input": input,
2927 "contextWindow": entry.model.context_window,
2928 "maxTokens": entry.model.max_tokens,
2929 "cost": entry.model.cost,
2930 })
2931}
2932
2933fn session_state(
2934 session: &crate::session::Session,
2935 options: &RpcOptions,
2936 snapshot: &RpcStateSnapshot,
2937 is_streaming: bool,
2938 is_compacting: bool,
2939) -> Value {
2940 let model = session
2941 .header
2942 .provider
2943 .as_deref()
2944 .zip(session.header.model_id.as_deref())
2945 .and_then(|(provider, model_id)| {
2946 options.available_models.iter().find(|m| {
2947 provider_ids_match(&m.model.provider, provider)
2948 && m.model.id.eq_ignore_ascii_case(model_id)
2949 })
2950 })
2951 .map(rpc_model_from_entry);
2952
2953 let message_count = session
2954 .entries_for_current_path()
2955 .iter()
2956 .filter(|entry| matches!(entry, crate::session::SessionEntry::Message(_)))
2957 .count();
2958
2959 let session_name = session
2960 .entries_for_current_path()
2961 .iter()
2962 .rev()
2963 .find_map(|entry| {
2964 let crate::session::SessionEntry::SessionInfo(info) = entry else {
2965 return None;
2966 };
2967 info.name.clone()
2968 });
2969
2970 let mut state = serde_json::Map::new();
2971 state.insert("model".to_string(), model.unwrap_or(Value::Null));
2972 state.insert(
2973 "thinkingLevel".to_string(),
2974 Value::String(
2975 session
2976 .header
2977 .thinking_level
2978 .clone()
2979 .unwrap_or_else(|| "off".to_string()),
2980 ),
2981 );
2982 state.insert("isStreaming".to_string(), Value::Bool(is_streaming));
2983 state.insert("isCompacting".to_string(), Value::Bool(is_compacting));
2984 state.insert(
2985 "steeringMode".to_string(),
2986 Value::String(snapshot.steering_mode.as_str().to_string()),
2987 );
2988 state.insert(
2989 "followUpMode".to_string(),
2990 Value::String(snapshot.follow_up_mode.as_str().to_string()),
2991 );
2992 state.insert(
2993 "sessionFile".to_string(),
2994 session
2995 .path
2996 .as_ref()
2997 .map_or(Value::Null, |p| Value::String(p.display().to_string())),
2998 );
2999 state.insert(
3000 "sessionId".to_string(),
3001 Value::String(session.header.id.clone()),
3002 );
3003 state.insert(
3004 "sessionName".to_string(),
3005 session_name.map_or(Value::Null, Value::String),
3006 );
3007 state.insert(
3008 "autoCompactionEnabled".to_string(),
3009 Value::Bool(snapshot.auto_compaction_enabled),
3010 );
3011 state.insert(
3012 "messageCount".to_string(),
3013 Value::Number(message_count.into()),
3014 );
3015 state.insert(
3016 "pendingMessageCount".to_string(),
3017 Value::Number(snapshot.pending_count().into()),
3018 );
3019 state.insert(
3020 "durabilityMode".to_string(),
3021 Value::String(session.autosave_durability_mode().as_str().to_string()),
3022 );
3023 Value::Object(state)
3024}
3025
3026fn session_stats(session: &crate::session::Session) -> Value {
3027 let mut user_messages: u64 = 0;
3028 let mut assistant_messages: u64 = 0;
3029 let mut tool_results: u64 = 0;
3030 let mut tool_calls: u64 = 0;
3031
3032 let mut total_input: u64 = 0;
3033 let mut total_output: u64 = 0;
3034 let mut total_cache_read: u64 = 0;
3035 let mut total_cache_write: u64 = 0;
3036 let mut total_cost: f64 = 0.0;
3037
3038 let messages = session.to_messages_for_current_path();
3039
3040 for message in &messages {
3041 match message {
3042 Message::User(_) | Message::Custom(_) => user_messages += 1,
3043 Message::Assistant(message) => {
3044 assistant_messages += 1;
3045 tool_calls += message
3046 .content
3047 .iter()
3048 .filter(|block| matches!(block, ContentBlock::ToolCall(_)))
3049 .count() as u64;
3050 total_input += message.usage.input;
3051 total_output += message.usage.output;
3052 total_cache_read += message.usage.cache_read;
3053 total_cache_write += message.usage.cache_write;
3054 total_cost += message.usage.cost.total;
3055 }
3056 Message::ToolResult(_) => tool_results += 1,
3057 }
3058 }
3059
3060 let total_messages = messages.len() as u64;
3061
3062 let total_tokens = total_input + total_output + total_cache_read + total_cache_write;
3063 let autosave = session.autosave_metrics();
3064 let pending_message_count = autosave.pending_mutations as u64;
3065 let durability_mode = session.autosave_durability_mode();
3066 let durability_mode_label = match durability_mode {
3067 crate::session::AutosaveDurabilityMode::Strict => "strict",
3068 crate::session::AutosaveDurabilityMode::Balanced => "balanced",
3069 crate::session::AutosaveDurabilityMode::Throughput => "throughput",
3070 };
3071 let (status_event, status_severity, status_summary, status_action, status_sli_ids) =
3072 if pending_message_count == 0 {
3073 (
3074 "session.persistence.healthy",
3075 "ok",
3076 "Persistence queue is clear.",
3077 "No action required.",
3078 vec!["sli_resume_ready_p95_ms"],
3079 )
3080 } else {
3081 let summary = match durability_mode {
3082 crate::session::AutosaveDurabilityMode::Strict => {
3083 "Pending persistence backlog under strict durability mode."
3084 }
3085 crate::session::AutosaveDurabilityMode::Balanced => {
3086 "Pending persistence backlog under balanced durability mode."
3087 }
3088 crate::session::AutosaveDurabilityMode::Throughput => {
3089 "Pending persistence backlog under throughput durability mode."
3090 }
3091 };
3092 let action = match durability_mode {
3093 crate::session::AutosaveDurabilityMode::Throughput => {
3094 "Expect deferred writes; trigger manual save before critical transitions."
3095 }
3096 _ => "Allow autosave flush to complete or trigger manual save before exit.",
3097 };
3098 (
3099 "session.persistence.backlog",
3100 "warning",
3101 summary,
3102 action,
3103 vec![
3104 "sli_resume_ready_p95_ms",
3105 "sli_failure_recovery_success_rate",
3106 ],
3107 )
3108 };
3109
3110 let mut data = serde_json::Map::new();
3111 data.insert(
3112 "sessionFile".to_string(),
3113 session
3114 .path
3115 .as_ref()
3116 .map_or(Value::Null, |p| Value::String(p.display().to_string())),
3117 );
3118 data.insert(
3119 "sessionId".to_string(),
3120 Value::String(session.header.id.clone()),
3121 );
3122 data.insert(
3123 "userMessages".to_string(),
3124 Value::Number(user_messages.into()),
3125 );
3126 data.insert(
3127 "assistantMessages".to_string(),
3128 Value::Number(assistant_messages.into()),
3129 );
3130 data.insert("toolCalls".to_string(), Value::Number(tool_calls.into()));
3131 data.insert(
3132 "toolResults".to_string(),
3133 Value::Number(tool_results.into()),
3134 );
3135 data.insert(
3136 "totalMessages".to_string(),
3137 Value::Number(total_messages.into()),
3138 );
3139 data.insert(
3140 "durabilityMode".to_string(),
3141 Value::String(durability_mode_label.to_string()),
3142 );
3143 data.insert(
3144 "pendingMessageCount".to_string(),
3145 Value::Number(pending_message_count.into()),
3146 );
3147 data.insert(
3148 "tokens".to_string(),
3149 json!({
3150 "input": total_input,
3151 "output": total_output,
3152 "cacheRead": total_cache_read,
3153 "cacheWrite": total_cache_write,
3154 "total": total_tokens,
3155 }),
3156 );
3157 data.insert(
3158 "persistenceStatus".to_string(),
3159 json!({
3160 "event": status_event,
3161 "severity": status_severity,
3162 "summary": status_summary,
3163 "action": status_action,
3164 "sliIds": status_sli_ids,
3165 "pendingMessageCount": pending_message_count,
3166 "flushCounters": {
3167 "started": autosave.flush_started,
3168 "succeeded": autosave.flush_succeeded,
3169 "failed": autosave.flush_failed,
3170 },
3171 }),
3172 );
3173 data.insert(
3174 "uxEventMarkers".to_string(),
3175 json!([
3176 {
3177 "event": status_event,
3178 "severity": status_severity,
3179 "durabilityMode": durability_mode_label,
3180 "pendingMessageCount": pending_message_count,
3181 "sliIds": status_sli_ids,
3182 }
3183 ]),
3184 );
3185 data.insert("cost".to_string(), Value::from(total_cost));
3186 Value::Object(data)
3187}
3188
3189fn last_assistant_text(session: &crate::session::Session) -> Option<String> {
3190 let entries = session.entries_for_current_path();
3191 for entry in entries.into_iter().rev() {
3192 let crate::session::SessionEntry::Message(msg_entry) = entry else {
3193 continue;
3194 };
3195 let SessionMessage::Assistant { message } = &msg_entry.message else {
3196 continue;
3197 };
3198 let mut text = String::new();
3199 for block in &message.content {
3200 if let ContentBlock::Text(t) = block {
3201 text.push_str(&t.text);
3202 }
3203 }
3204 if !text.is_empty() {
3205 return Some(text);
3206 }
3207 }
3208 None
3209}
3210
3211async fn export_html_snapshot(
3216 snapshot: &crate::session::ExportSnapshot,
3217 output_path: Option<&str>,
3218) -> Result<String> {
3219 let html = snapshot.to_html();
3220
3221 let path = output_path.map_or_else(
3222 || {
3223 snapshot.path.as_ref().map_or_else(
3224 || {
3225 let ts = chrono::Utc::now().format("%Y-%m-%dT%H-%M-%S%.3fZ");
3226 PathBuf::from(format!("pi-session-{ts}.html"))
3227 },
3228 |session_path| {
3229 let basename = session_path
3230 .file_stem()
3231 .and_then(|s| s.to_str())
3232 .unwrap_or("session");
3233 PathBuf::from(format!("pi-session-{basename}.html"))
3234 },
3235 )
3236 },
3237 PathBuf::from,
3238 );
3239
3240 if let Some(parent) = path.parent().filter(|p| !p.as_os_str().is_empty()) {
3241 asupersync::fs::create_dir_all(parent).await?;
3242 }
3243 asupersync::fs::write(&path, html).await?;
3244 Ok(path.display().to_string())
3245}
3246
3247#[derive(Debug, Clone)]
3248struct BashRpcResult {
3249 output: String,
3250 exit_code: i32,
3251 cancelled: bool,
3252 truncated: bool,
3253 full_output_path: Option<String>,
3254}
3255
3256const fn line_count_from_newline_count(
3257 total_bytes: usize,
3258 newline_count: usize,
3259 last_byte_was_newline: bool,
3260) -> usize {
3261 if total_bytes == 0 {
3262 0
3263 } else if last_byte_was_newline {
3264 newline_count
3265 } else {
3266 newline_count.saturating_add(1)
3267 }
3268}
3269
3270async fn ingest_bash_rpc_chunk(
3271 bytes: Vec<u8>,
3272 chunks: &mut VecDeque<Vec<u8>>,
3273 chunks_bytes: &mut usize,
3274 total_bytes: &mut usize,
3275 total_lines: &mut usize,
3276 last_byte_was_newline: &mut bool,
3277 temp_file: &mut Option<asupersync::fs::File>,
3278 temp_file_path: &mut Option<PathBuf>,
3279 spill_failed: &mut bool,
3280 max_chunks_bytes: usize,
3281) {
3282 if bytes.is_empty() {
3283 return;
3284 }
3285
3286 *last_byte_was_newline = bytes.last().is_some_and(|byte| *byte == b'\n');
3287 *total_bytes = total_bytes.saturating_add(bytes.len());
3288 *total_lines = total_lines.saturating_add(memchr_iter(b'\n', &bytes).count());
3289
3290 if *total_bytes > DEFAULT_MAX_BYTES && temp_file.is_none() && !*spill_failed {
3292 let id_full = uuid::Uuid::new_v4().simple().to_string();
3293 let id = &id_full[..16];
3294 let path = std::env::temp_dir().join(format!("pi-rpc-bash-{id}.log"));
3295
3296 let expected_inode: Option<u64> = {
3298 let mut options = std::fs::OpenOptions::new();
3299 options.write(true).create_new(true);
3300 #[cfg(unix)]
3301 {
3302 use std::os::unix::fs::OpenOptionsExt;
3303 options.mode(0o600);
3304 }
3305
3306 match options.open(&path) {
3307 Ok(file) => {
3308 #[cfg(unix)]
3309 {
3310 use std::os::unix::fs::MetadataExt;
3311 file.metadata().ok().map(|m| m.ino())
3312 }
3313 #[cfg(not(unix))]
3314 {
3315 None
3316 }
3317 }
3318 Err(e) => {
3319 tracing::warn!("Failed to create bash temp file: {e}");
3320 None
3321 }
3322 }
3323 };
3324
3325 if expected_inode.is_some() || !cfg!(unix) {
3326 match asupersync::fs::OpenOptions::new()
3328 .append(true)
3329 .open(&path)
3330 .await
3331 {
3332 Ok(mut file) => {
3333 let mut identity_match = true;
3335 #[cfg(unix)]
3336 if let Some(expected) = expected_inode {
3337 use std::os::unix::fs::MetadataExt;
3338 match file.metadata().await {
3339 Ok(meta) => {
3340 if meta.ino() != expected {
3341 tracing::warn!(
3342 "Temp file identity mismatch (possible TOCTOU attack)"
3343 );
3344 identity_match = false;
3345 }
3346 }
3347 Err(e) => {
3348 tracing::warn!("Failed to stat temp file: {e}");
3349 identity_match = false;
3350 }
3351 }
3352 }
3353
3354 if identity_match {
3355 for existing in chunks.iter() {
3357 use asupersync::io::AsyncWriteExt;
3358 if let Err(e) = file.write_all(existing).await {
3359 tracing::warn!("Failed to flush bash chunk to temp file: {e}");
3360 *spill_failed = true;
3361 break;
3362 }
3363 }
3364 if !*spill_failed {
3365 *temp_file = Some(file);
3366 *temp_file_path = Some(path);
3367 }
3368 } else {
3369 let _ = std::fs::remove_file(&path);
3370 *spill_failed = true;
3371 }
3372 }
3373 Err(e) => {
3374 tracing::warn!("Failed to reopen bash temp file async: {e}");
3375 let _ = std::fs::remove_file(&path);
3377 *spill_failed = true;
3378 }
3379 }
3380 } else {
3381 *spill_failed = true;
3382 }
3383 }
3384
3385 if let Some(file) = temp_file.as_mut() {
3387 if *total_bytes <= crate::tools::BASH_FILE_LIMIT_BYTES {
3388 use asupersync::io::AsyncWriteExt;
3389 if let Err(e) = file.write_all(&bytes).await {
3390 tracing::warn!("Failed to write bash chunk to temp file: {e}");
3391 *spill_failed = true;
3392 *temp_file = None;
3393 }
3394 } else {
3395 if !*spill_failed {
3397 tracing::warn!("Bash output exceeded hard limit; stopping file log");
3398 *spill_failed = true;
3399 *temp_file = None;
3400 }
3401 }
3402 }
3403
3404 *chunks_bytes = chunks_bytes.saturating_add(bytes.len());
3406 chunks.push_back(bytes);
3407 while *chunks_bytes > max_chunks_bytes && chunks.len() > 1 {
3408 if let Some(front) = chunks.pop_front() {
3409 *chunks_bytes = chunks_bytes.saturating_sub(front.len());
3410 }
3411 }
3412}
3413
3414async fn run_bash_rpc(
3415 cwd: &std::path::Path,
3416 command: &str,
3417 abort_rx: oneshot::Receiver<()>,
3418) -> Result<BashRpcResult> {
3419 #[derive(Clone, Copy)]
3420 enum StreamKind {
3421 Stdout,
3422 Stderr,
3423 }
3424
3425 struct StreamChunk {
3426 kind: StreamKind,
3427 bytes: Vec<u8>,
3428 }
3429
3430 fn pump_stream(
3431 mut reader: impl std::io::Read,
3432 tx: std::sync::mpsc::SyncSender<StreamChunk>,
3433 kind: StreamKind,
3434 ) {
3435 let mut buf = [0u8; 8192];
3436 loop {
3437 let read = match reader.read(&mut buf) {
3438 Ok(0) | Err(_) => break,
3439 Ok(read) => read,
3440 };
3441 let chunk = StreamChunk {
3442 kind,
3443 bytes: buf[..read].to_vec(),
3444 };
3445 if tx.send(chunk).is_err() {
3446 break;
3447 }
3448 }
3449 }
3450
3451 let shell = ["/bin/bash", "/usr/bin/bash", "/usr/local/bin/bash"]
3452 .into_iter()
3453 .find(|p| std::path::Path::new(p).exists())
3454 .unwrap_or("sh");
3455
3456 let command = format!("trap 'code=$?; wait; exit $code' EXIT\n{command}");
3457
3458 let mut child = std::process::Command::new(shell)
3459 .arg("-c")
3460 .arg(&command)
3461 .current_dir(cwd)
3462 .stdin(std::process::Stdio::null())
3463 .stdout(std::process::Stdio::piped())
3464 .stderr(std::process::Stdio::piped())
3465 .spawn()
3466 .map_err(|e| Error::tool("bash", format!("Failed to spawn shell: {e}")))?;
3467
3468 let Some(stdout) = child.stdout.take() else {
3469 return Err(Error::tool("bash", "Missing stdout".to_string()));
3470 };
3471 let Some(stderr) = child.stderr.take() else {
3472 return Err(Error::tool("bash", "Missing stderr".to_string()));
3473 };
3474
3475 let mut guard = crate::tools::ProcessGuard::new(child, true);
3476
3477 let (tx, rx) = std::sync::mpsc::sync_channel::<StreamChunk>(128);
3478 let tx_stdout = tx.clone();
3479 let _stdout_handle =
3480 std::thread::spawn(move || pump_stream(stdout, tx_stdout, StreamKind::Stdout));
3481 let _stderr_handle = std::thread::spawn(move || pump_stream(stderr, tx, StreamKind::Stderr));
3482
3483 let tick = Duration::from_millis(10);
3484
3485 let mut chunks: VecDeque<Vec<u8>> = VecDeque::new();
3487 let mut chunks_bytes = 0usize;
3488 let mut total_bytes = 0usize;
3489 let mut total_lines = 0usize;
3490 let mut last_byte_was_newline = false;
3491 let mut temp_file: Option<asupersync::fs::File> = None;
3492 let mut temp_file_path: Option<PathBuf> = None;
3493 let max_chunks_bytes = DEFAULT_MAX_BYTES * 2;
3494
3495 let mut cancelled = false;
3496 let mut spill_failed = false;
3497
3498 let exit_code = loop {
3499 while let Ok(chunk) = rx.try_recv() {
3500 ingest_bash_rpc_chunk(
3501 chunk.bytes,
3502 &mut chunks,
3503 &mut chunks_bytes,
3504 &mut total_bytes,
3505 &mut total_lines,
3506 &mut last_byte_was_newline,
3507 &mut temp_file,
3508 &mut temp_file_path,
3509 &mut spill_failed,
3510 max_chunks_bytes,
3511 )
3512 .await;
3513 }
3514
3515 if !cancelled && abort_rx.try_recv().is_ok() {
3516 cancelled = true;
3517 if let Ok(Some(status)) = guard.kill() {
3518 break status.code().unwrap_or(-1);
3519 }
3520 }
3521
3522 match guard.try_wait_child() {
3523 Ok(Some(status)) => break status.code().unwrap_or(-1),
3524 Ok(None) => {}
3525 Err(err) => {
3526 return Err(Error::tool(
3527 "bash",
3528 format!("Failed to wait for process: {err}"),
3529 ));
3530 }
3531 }
3532
3533 sleep(wall_now(), tick).await;
3534 };
3535
3536 let drain_deadline = Instant::now() + Duration::from_secs(2);
3538 let mut drain_timed_out = false;
3539 loop {
3540 match rx.try_recv() {
3541 Ok(chunk) => {
3542 ingest_bash_rpc_chunk(
3543 chunk.bytes,
3544 &mut chunks,
3545 &mut chunks_bytes,
3546 &mut total_bytes,
3547 &mut total_lines,
3548 &mut last_byte_was_newline,
3549 &mut temp_file,
3550 &mut temp_file_path,
3551 &mut spill_failed,
3552 max_chunks_bytes,
3553 )
3554 .await;
3555 }
3556 Err(std::sync::mpsc::TryRecvError::Empty) => {
3557 if Instant::now() >= drain_deadline {
3558 drain_timed_out = true;
3559 break;
3560 }
3561 sleep(wall_now(), tick).await;
3562 }
3563 Err(std::sync::mpsc::TryRecvError::Disconnected) => break,
3564 }
3565 }
3566
3567 drop(rx);
3574
3575 drop(temp_file);
3578
3579 let mut combined = Vec::with_capacity(chunks_bytes);
3581 for chunk in chunks {
3582 combined.extend_from_slice(&chunk);
3583 }
3584 let tail_output = String::from_utf8_lossy(&combined).to_string();
3585
3586 let mut truncation = truncate_tail(tail_output, DEFAULT_MAX_LINES, DEFAULT_MAX_BYTES);
3587 if total_bytes > chunks_bytes {
3588 truncation.truncated = true;
3589 truncation.truncated_by = Some(crate::tools::TruncatedBy::Bytes);
3590 truncation.total_bytes = total_bytes;
3591 truncation.total_lines =
3592 line_count_from_newline_count(total_bytes, total_lines, last_byte_was_newline);
3593 } else if drain_timed_out {
3594 truncation.truncated = true;
3595 truncation.truncated_by = Some(crate::tools::TruncatedBy::Bytes);
3596 }
3597 let will_truncate = truncation.truncated;
3598
3599 let mut output_text = if truncation.content.is_empty() {
3600 "(no output)".to_string()
3601 } else {
3602 truncation.content
3603 };
3604
3605 if drain_timed_out {
3606 output_text.push_str("\n... [Output truncated: drain timeout]");
3607 }
3608
3609 Ok(BashRpcResult {
3610 output: output_text,
3611 exit_code,
3612 cancelled,
3613 truncated: will_truncate,
3614 full_output_path: temp_file_path.map(|p| p.display().to_string()),
3615 })
3616}
3617
3618fn parse_prompt_images(value: Option<&Value>) -> Result<Vec<ImageContent>> {
3619 let Some(value) = value else {
3620 return Ok(Vec::new());
3621 };
3622 let Some(arr) = value.as_array() else {
3623 return Err(Error::validation("images must be an array"));
3624 };
3625
3626 let mut images = Vec::new();
3627 for item in arr {
3628 let Some(obj) = item.as_object() else {
3629 continue;
3630 };
3631 let item_type = obj.get("type").and_then(Value::as_str).unwrap_or("");
3632 if item_type != "image" {
3633 continue;
3634 }
3635 let Some(source) = obj.get("source").and_then(Value::as_object) else {
3636 continue;
3637 };
3638 let source_type = source.get("type").and_then(Value::as_str).unwrap_or("");
3639 if source_type != "base64" {
3640 continue;
3641 }
3642 let Some(media_type) = source.get("mediaType").and_then(Value::as_str) else {
3643 continue;
3644 };
3645 let Some(data) = source.get("data").and_then(Value::as_str) else {
3646 continue;
3647 };
3648 images.push(ImageContent {
3649 data: data.to_string(),
3650 mime_type: media_type.to_string(),
3651 });
3652 }
3653 Ok(images)
3654}
3655
3656fn resolve_model_key(auth: &AuthStorage, entry: &ModelEntry) -> Option<String> {
3657 normalize_api_key_opt(auth.resolve_api_key(&entry.model.provider, None))
3658 .or_else(|| normalize_api_key_opt(entry.api_key.clone()))
3659}
3660
3661fn normalize_api_key_opt(api_key: Option<String>) -> Option<String> {
3662 api_key.and_then(|key| {
3663 let trimmed = key.trim();
3664 (!trimmed.is_empty()).then(|| trimmed.to_string())
3665 })
3666}
3667
3668fn model_requires_configured_credential(entry: &ModelEntry) -> bool {
3669 let provider = entry.model.provider.as_str();
3670 entry.auth_header
3671 || provider_metadata(provider).is_some_and(|meta| !meta.auth_env_keys.is_empty())
3672 || entry.oauth_config.is_some()
3673}
3674
3675fn parse_thinking_level(level: &str) -> Result<crate::model::ThinkingLevel> {
3676 level.parse().map_err(|err: String| Error::validation(err))
3677}
3678
3679fn current_model_entry<'a>(
3680 session: &crate::session::Session,
3681 options: &'a RpcOptions,
3682) -> Option<&'a ModelEntry> {
3683 let provider = session.header.provider.as_deref()?;
3684 let model_id = session.header.model_id.as_deref()?;
3685 options.available_models.iter().find(|m| {
3686 provider_ids_match(&m.model.provider, provider) && m.model.id.eq_ignore_ascii_case(model_id)
3687 })
3688}
3689
3690async fn apply_thinking_level(
3691 guard: &mut AgentSession,
3692 level: crate::model::ThinkingLevel,
3693) -> Result<()> {
3694 let cx = AgentCx::for_request();
3695 {
3696 let mut inner_session = guard
3697 .session
3698 .lock(cx.cx())
3699 .await
3700 .map_err(|err| Error::session(format!("inner session lock failed: {err}")))?;
3701 inner_session.header.thinking_level = Some(level.to_string());
3702 inner_session.append_thinking_level_change(level.to_string());
3703 }
3704 guard.agent.stream_options_mut().thinking_level = Some(level);
3705 guard.persist_session().await
3706}
3707
3708async fn apply_model_change(guard: &mut AgentSession, entry: &ModelEntry) -> Result<()> {
3709 let cx = AgentCx::for_request();
3710 {
3711 let mut inner_session = guard
3712 .session
3713 .lock(cx.cx())
3714 .await
3715 .map_err(|err| Error::session(format!("inner session lock failed: {err}")))?;
3716 inner_session.header.provider = Some(entry.model.provider.clone());
3717 inner_session.header.model_id = Some(entry.model.id.clone());
3718 inner_session.append_model_change(entry.model.provider.clone(), entry.model.id.clone());
3719 }
3720 guard.persist_session().await
3721}
3722
3723fn fork_messages_from_entries(entries: &[crate::session::SessionEntry]) -> Vec<Value> {
3728 let mut result = Vec::new();
3729
3730 for entry in entries {
3731 let crate::session::SessionEntry::Message(m) = entry else {
3732 continue;
3733 };
3734 let SessionMessage::User { content, .. } = &m.message else {
3735 continue;
3736 };
3737 let entry_id = m.base.id.clone().unwrap_or_default();
3738 let text = extract_user_text(content);
3739 result.push(json!({
3740 "entryId": entry_id,
3741 "text": text,
3742 }));
3743 }
3744
3745 result
3746}
3747
3748fn extract_user_text(content: &crate::model::UserContent) -> Option<String> {
3749 match content {
3750 crate::model::UserContent::Text(text) => Some(text.clone()),
3751 crate::model::UserContent::Blocks(blocks) => blocks.iter().find_map(|b| {
3752 if let ContentBlock::Text(t) = b {
3753 Some(t.text.clone())
3754 } else {
3755 None
3756 }
3757 }),
3758 }
3759}
3760
3761fn available_thinking_levels(entry: &ModelEntry) -> Vec<crate::model::ThinkingLevel> {
3764 use crate::model::ThinkingLevel;
3765 if entry.model.reasoning {
3766 let mut levels = vec![
3767 ThinkingLevel::Off,
3768 ThinkingLevel::Minimal,
3769 ThinkingLevel::Low,
3770 ThinkingLevel::Medium,
3771 ThinkingLevel::High,
3772 ];
3773 if entry.supports_xhigh() {
3774 levels.push(ThinkingLevel::XHigh);
3775 }
3776 levels
3777 } else {
3778 vec![ThinkingLevel::Off]
3779 }
3780}
3781
3782async fn cycle_model_for_rpc(
3785 guard: &mut AgentSession,
3786 options: &RpcOptions,
3787) -> Result<Option<(ModelEntry, crate::model::ThinkingLevel, bool)>> {
3788 let (candidates, is_scoped) = if options.scoped_models.is_empty() {
3789 (options.available_models.clone(), false)
3790 } else {
3791 (
3792 options
3793 .scoped_models
3794 .iter()
3795 .map(|sm| sm.model.clone())
3796 .collect::<Vec<_>>(),
3797 true,
3798 )
3799 };
3800
3801 if candidates.len() <= 1 {
3802 return Ok(None);
3803 }
3804
3805 let cx = AgentCx::for_request();
3806 let (current_provider, current_model_id) = {
3807 let inner_session = guard
3808 .session
3809 .lock(cx.cx())
3810 .await
3811 .map_err(|err| Error::session(format!("inner session lock failed: {err}")))?;
3812 (
3813 inner_session.header.provider.clone(),
3814 inner_session.header.model_id.clone(),
3815 )
3816 };
3817
3818 let current_index = candidates.iter().position(|entry| {
3819 current_provider
3820 .as_deref()
3821 .is_some_and(|provider| provider_ids_match(provider, &entry.model.provider))
3822 && current_model_id
3823 .as_deref()
3824 .is_some_and(|model_id| model_id.eq_ignore_ascii_case(&entry.model.id))
3825 });
3826
3827 let next_index = current_index.map_or(0, |idx| (idx + 1) % candidates.len());
3828
3829 let next_entry = candidates[next_index].clone();
3830 let provider_impl = crate::providers::create_provider(
3831 &next_entry,
3832 guard
3833 .extensions
3834 .as_ref()
3835 .map(crate::extensions::ExtensionRegion::manager),
3836 )?;
3837 guard.agent.set_provider(provider_impl);
3838
3839 let key = resolve_model_key(&options.auth, &next_entry);
3840 if model_requires_configured_credential(&next_entry) && key.is_none() {
3841 return Err(Error::auth(format!(
3842 "Missing credentials for {}/{}",
3843 next_entry.model.provider, next_entry.model.id
3844 )));
3845 }
3846 guard.agent.stream_options_mut().api_key.clone_from(&key);
3847 guard
3848 .agent
3849 .stream_options_mut()
3850 .headers
3851 .clone_from(&next_entry.headers);
3852
3853 apply_model_change(guard, &next_entry).await?;
3854
3855 let desired_thinking = if is_scoped {
3856 options.scoped_models[next_index]
3857 .thinking_level
3858 .unwrap_or(crate::model::ThinkingLevel::Off)
3859 } else {
3860 guard
3861 .agent
3862 .stream_options()
3863 .thinking_level
3864 .unwrap_or_default()
3865 };
3866
3867 let next_thinking = next_entry.clamp_thinking_level(desired_thinking);
3868 apply_thinking_level(guard, next_thinking).await?;
3869
3870 Ok(Some((next_entry, next_thinking, is_scoped)))
3871}
3872
3873#[cfg(test)]
3874mod tests {
3875 use super::*;
3876 use crate::auth::AuthCredential;
3877 use crate::model::{
3878 ContentBlock, ImageContent, TextContent, ThinkingLevel, UserContent, UserMessage,
3879 };
3880 use crate::provider::{InputType, Model, ModelCost};
3881 use crate::session::Session;
3882 use serde_json::json;
3883 use std::collections::HashMap;
3884
3885 fn dummy_model(id: &str, reasoning: bool) -> Model {
3890 Model {
3891 id: id.to_string(),
3892 name: id.to_string(),
3893 api: "anthropic".to_string(),
3894 provider: "anthropic".to_string(),
3895 base_url: "https://api.anthropic.com".to_string(),
3896 reasoning,
3897 input: vec![InputType::Text],
3898 cost: ModelCost {
3899 input: 3.0,
3900 output: 15.0,
3901 cache_read: 0.3,
3902 cache_write: 3.75,
3903 },
3904 context_window: 200_000,
3905 max_tokens: 8192,
3906 headers: HashMap::new(),
3907 }
3908 }
3909
3910 fn dummy_entry(id: &str, reasoning: bool) -> ModelEntry {
3911 ModelEntry {
3912 model: dummy_model(id, reasoning),
3913 api_key: None,
3914 headers: HashMap::new(),
3915 auth_header: false,
3916 compat: None,
3917 oauth_config: None,
3918 }
3919 }
3920
3921 fn rpc_options_with_models(available_models: Vec<ModelEntry>) -> RpcOptions {
3922 let runtime = asupersync::runtime::RuntimeBuilder::new()
3923 .blocking_threads(1, 1)
3924 .build()
3925 .expect("runtime build");
3926 let runtime_handle = runtime.handle();
3927
3928 let auth_path = tempfile::tempdir()
3929 .expect("tempdir")
3930 .path()
3931 .join("auth.json");
3932 let auth = AuthStorage::load(auth_path).expect("auth load");
3933
3934 RpcOptions {
3935 config: Config::default(),
3936 resources: ResourceLoader::empty(false),
3937 available_models,
3938 scoped_models: Vec::new(),
3939 auth,
3940 runtime_handle,
3941 }
3942 }
3943
3944 #[test]
3945 fn line_count_from_newline_count_matches_trailing_newline_semantics() {
3946 assert_eq!(line_count_from_newline_count(0, 0, false), 0);
3947 assert_eq!(line_count_from_newline_count(2, 1, true), 1);
3948 assert_eq!(line_count_from_newline_count(1, 0, false), 1);
3949 assert_eq!(line_count_from_newline_count(3, 1, false), 2);
3950 }
3951
3952 #[test]
3957 fn parse_queue_mode_all() {
3958 assert_eq!(parse_queue_mode(Some("all")), Some(QueueMode::All));
3959 }
3960
3961 #[test]
3962 fn parse_queue_mode_one_at_a_time() {
3963 assert_eq!(
3964 parse_queue_mode(Some("one-at-a-time")),
3965 Some(QueueMode::OneAtATime)
3966 );
3967 }
3968
3969 #[test]
3970 fn parse_queue_mode_none_value() {
3971 assert_eq!(parse_queue_mode(None), None);
3972 }
3973
3974 #[test]
3975 fn parse_queue_mode_unknown_returns_none() {
3976 assert_eq!(parse_queue_mode(Some("batch")), None);
3977 assert_eq!(parse_queue_mode(Some("")), None);
3978 }
3979
3980 #[test]
3981 fn parse_queue_mode_trims_whitespace() {
3982 assert_eq!(parse_queue_mode(Some(" all ")), Some(QueueMode::All));
3983 }
3984
3985 #[test]
3986 fn provider_ids_match_accepts_aliases() {
3987 assert!(provider_ids_match("openrouter", "open-router"));
3988 assert!(provider_ids_match("google-gemini-cli", "gemini-cli"));
3989 assert!(!provider_ids_match("openai", "anthropic"));
3990 }
3991
3992 #[test]
3993 fn resolve_model_key_prefers_stored_auth_key_over_inline_entry_key() {
3994 let mut entry = dummy_entry("gpt-4o-mini", true);
3995 entry.model.provider = "openai".to_string();
3996 entry.auth_header = true;
3997 entry.api_key = Some("inline-model-key".to_string());
3998
3999 let auth_path = tempfile::tempdir()
4000 .expect("tempdir")
4001 .path()
4002 .join("auth.json");
4003 let mut auth = AuthStorage::load(auth_path).expect("auth load");
4004 auth.set(
4005 "openai".to_string(),
4006 AuthCredential::ApiKey {
4007 key: "stored-auth-key".to_string(),
4008 },
4009 );
4010
4011 assert_eq!(
4012 resolve_model_key(&auth, &entry).as_deref(),
4013 Some("stored-auth-key")
4014 );
4015 }
4016
4017 #[test]
4018 fn resolve_model_key_ignores_blank_inline_key_and_falls_back_to_auth_storage() {
4019 let mut entry = dummy_entry("gpt-4o-mini", true);
4020 entry.model.provider = "openai".to_string();
4021 entry.auth_header = true;
4022 entry.api_key = Some(" ".to_string());
4023
4024 let auth_path = tempfile::tempdir()
4025 .expect("tempdir")
4026 .path()
4027 .join("auth.json");
4028 let mut auth = AuthStorage::load(auth_path).expect("auth load");
4029 auth.set(
4030 "openai".to_string(),
4031 AuthCredential::ApiKey {
4032 key: "stored-auth-key".to_string(),
4033 },
4034 );
4035
4036 assert_eq!(
4037 resolve_model_key(&auth, &entry).as_deref(),
4038 Some("stored-auth-key")
4039 );
4040 }
4041
4042 #[test]
4043 fn unknown_keyless_model_does_not_require_credentials() {
4044 let mut entry = dummy_entry("dev-model", false);
4045 entry.model.provider = "acme-local".to_string();
4046 entry.auth_header = false;
4047 entry.oauth_config = None;
4048
4049 assert!(!model_requires_configured_credential(&entry));
4050 }
4051
4052 #[test]
4053 fn anthropic_model_requires_credentials_even_without_auth_header() {
4054 let mut entry = dummy_entry("claude-sonnet-4-6", true);
4055 entry.model.provider = "anthropic".to_string();
4056 entry.auth_header = false;
4057 entry.oauth_config = None;
4058
4059 assert!(model_requires_configured_credential(&entry));
4060 }
4061
4062 #[test]
4067 fn parse_streaming_behavior_steer() {
4068 let val = json!("steer");
4069 let result = parse_streaming_behavior(Some(&val)).unwrap();
4070 assert_eq!(result, Some(StreamingBehavior::Steer));
4071 }
4072
4073 #[test]
4074 fn parse_streaming_behavior_follow_up_hyphenated() {
4075 let val = json!("follow-up");
4076 let result = parse_streaming_behavior(Some(&val)).unwrap();
4077 assert_eq!(result, Some(StreamingBehavior::FollowUp));
4078 }
4079
4080 #[test]
4081 fn parse_streaming_behavior_follow_up_camel() {
4082 let val = json!("followUp");
4083 let result = parse_streaming_behavior(Some(&val)).unwrap();
4084 assert_eq!(result, Some(StreamingBehavior::FollowUp));
4085 }
4086
4087 #[test]
4088 fn parse_streaming_behavior_none() {
4089 let result = parse_streaming_behavior(None).unwrap();
4090 assert_eq!(result, None);
4091 }
4092
4093 #[test]
4094 fn parse_streaming_behavior_invalid_string() {
4095 let val = json!("invalid");
4096 assert!(parse_streaming_behavior(Some(&val)).is_err());
4097 }
4098
4099 #[test]
4100 fn parse_streaming_behavior_non_string_errors() {
4101 let val = json!(42);
4102 assert!(parse_streaming_behavior(Some(&val)).is_err());
4103 }
4104
4105 #[test]
4110 fn normalize_command_type_passthrough() {
4111 assert_eq!(normalize_command_type("prompt"), "prompt");
4112 assert_eq!(normalize_command_type("compact"), "compact");
4113 }
4114
4115 #[test]
4116 fn normalize_command_type_follow_up_aliases() {
4117 assert_eq!(normalize_command_type("follow-up"), "follow_up");
4118 assert_eq!(normalize_command_type("followUp"), "follow_up");
4119 assert_eq!(normalize_command_type("queue-follow-up"), "follow_up");
4120 assert_eq!(normalize_command_type("queueFollowUp"), "follow_up");
4121 }
4122
4123 #[test]
4124 fn normalize_command_type_kebab_and_camel_aliases() {
4125 assert_eq!(normalize_command_type("get-state"), "get_state");
4126 assert_eq!(normalize_command_type("getState"), "get_state");
4127 assert_eq!(normalize_command_type("set-model"), "set_model");
4128 assert_eq!(normalize_command_type("setModel"), "set_model");
4129 assert_eq!(
4130 normalize_command_type("set-steering-mode"),
4131 "set_steering_mode"
4132 );
4133 assert_eq!(
4134 normalize_command_type("setSteeringMode"),
4135 "set_steering_mode"
4136 );
4137 assert_eq!(
4138 normalize_command_type("set-follow-up-mode"),
4139 "set_follow_up_mode"
4140 );
4141 assert_eq!(
4142 normalize_command_type("setFollowUpMode"),
4143 "set_follow_up_mode"
4144 );
4145 assert_eq!(
4146 normalize_command_type("set-auto-compaction"),
4147 "set_auto_compaction"
4148 );
4149 assert_eq!(
4150 normalize_command_type("setAutoCompaction"),
4151 "set_auto_compaction"
4152 );
4153 assert_eq!(normalize_command_type("set-auto-retry"), "set_auto_retry");
4154 assert_eq!(normalize_command_type("setAutoRetry"), "set_auto_retry");
4155 }
4156
4157 #[test]
4162 fn build_user_message_text_only() {
4163 let msg = build_user_message("hello", &[]);
4164 match msg {
4165 Message::User(UserMessage {
4166 content: UserContent::Text(text),
4167 ..
4168 }) => assert_eq!(text, "hello"),
4169 other => panic!("expected text user message, got {other:?}"),
4170 }
4171 }
4172
4173 #[test]
4174 fn build_user_message_with_images() {
4175 let images = vec![ImageContent {
4176 data: "base64data".to_string(),
4177 mime_type: "image/png".to_string(),
4178 }];
4179 let msg = build_user_message("look at this", &images);
4180 match msg {
4181 Message::User(UserMessage {
4182 content: UserContent::Blocks(blocks),
4183 ..
4184 }) => {
4185 assert_eq!(blocks.len(), 2);
4186 assert!(matches!(&blocks[0], ContentBlock::Text(_)));
4187 assert!(matches!(&blocks[1], ContentBlock::Image(_)));
4188 }
4189 other => panic!("expected blocks user message, got {other:?}"),
4190 }
4191 }
4192
4193 #[test]
4198 fn is_extension_command_slash_unchanged() {
4199 assert!(is_extension_command("/mycommand", "/mycommand"));
4200 }
4201
4202 #[test]
4203 fn is_extension_command_expanded_returns_false() {
4204 assert!(!is_extension_command(
4206 "/prompt-name",
4207 "This is the expanded prompt text."
4208 ));
4209 }
4210
4211 #[test]
4212 fn is_extension_command_no_slash() {
4213 assert!(!is_extension_command("hello", "hello"));
4214 }
4215
4216 #[test]
4217 fn is_extension_command_leading_whitespace() {
4218 assert!(is_extension_command(" /cmd", " /cmd"));
4219 }
4220
4221 #[test]
4226 fn try_send_line_with_backpressure_enqueues_when_capacity_available() {
4227 let (tx, _rx) = mpsc::channel::<String>(1);
4228 assert!(try_send_line_with_backpressure(&tx, "line".to_string()));
4229 assert!(matches!(
4230 tx.try_send("next".to_string()),
4231 Err(mpsc::SendError::Full(_))
4232 ));
4233 }
4234
4235 #[test]
4236 fn try_send_line_with_backpressure_stops_when_receiver_closed() {
4237 let (tx, rx) = mpsc::channel::<String>(1);
4238 drop(rx);
4239 assert!(!try_send_line_with_backpressure(&tx, "line".to_string()));
4240 }
4241
4242 #[test]
4243 fn try_send_line_with_backpressure_waits_until_capacity_is_available() {
4244 let (tx, rx) = mpsc::channel::<String>(1);
4245 tx.try_send("occupied".to_string())
4246 .expect("seed initial occupied slot");
4247
4248 let expected = "delayed-line".to_string();
4249 let expected_for_thread = expected.clone();
4250 let recv_handle = std::thread::spawn(move || {
4251 std::thread::sleep(Duration::from_millis(30));
4252 let deadline = Instant::now() + Duration::from_millis(300);
4253 let mut received = Vec::new();
4254 while received.len() < 2 && Instant::now() < deadline {
4255 if let Ok(msg) = rx.try_recv() {
4256 received.push(msg);
4257 } else {
4258 std::thread::sleep(Duration::from_millis(5));
4259 }
4260 }
4261 assert_eq!(received.len(), 2, "should receive both queued lines");
4262 let first = received.remove(0);
4263 let second = received.remove(0);
4264 assert_eq!(first, "occupied");
4265 assert_eq!(second, expected_for_thread);
4266 });
4267
4268 assert!(try_send_line_with_backpressure(&tx, expected));
4269 drop(tx);
4270 recv_handle.join().expect("receiver thread should finish");
4271 }
4272
4273 #[test]
4274 fn try_send_line_with_backpressure_preserves_large_payload() {
4275 let (tx, rx) = mpsc::channel::<String>(1);
4276 tx.try_send("busy".to_string())
4277 .expect("seed initial busy slot");
4278
4279 let large = "x".repeat(256 * 1024);
4280 let large_for_thread = large.clone();
4281 let recv_handle = std::thread::spawn(move || {
4282 std::thread::sleep(Duration::from_millis(30));
4283 let deadline = Instant::now() + Duration::from_millis(500);
4284 let mut received = Vec::new();
4285 while received.len() < 2 && Instant::now() < deadline {
4286 if let Ok(msg) = rx.try_recv() {
4287 received.push(msg);
4288 } else {
4289 std::thread::sleep(Duration::from_millis(5));
4290 }
4291 }
4292 assert_eq!(received.len(), 2, "should receive busy + payload lines");
4293 let payload = received.remove(1);
4294 assert_eq!(payload.len(), large_for_thread.len());
4295 assert_eq!(payload, large_for_thread);
4296 });
4297
4298 assert!(try_send_line_with_backpressure(&tx, large));
4299 drop(tx);
4300 recv_handle.join().expect("receiver thread should finish");
4301 }
4302
4303 #[test]
4304 fn try_send_line_with_backpressure_detects_disconnect_while_waiting() {
4305 let (tx, rx) = mpsc::channel::<String>(1);
4306 tx.try_send("busy".to_string())
4307 .expect("seed initial busy slot");
4308
4309 let drop_handle = std::thread::spawn(move || {
4310 std::thread::sleep(Duration::from_millis(30));
4311 drop(rx);
4312 });
4313
4314 assert!(
4315 !try_send_line_with_backpressure(&tx, "line-after-disconnect".to_string()),
4316 "send should stop after receiver disconnects while channel is full"
4317 );
4318 drop_handle.join().expect("drop thread should finish");
4319 }
4320
4321 #[test]
4322 fn try_send_line_with_backpressure_high_volume_preserves_order_and_count() {
4323 let (tx, rx) = mpsc::channel::<String>(4);
4324 let lines: Vec<String> = (0..256)
4325 .map(|idx| format!("line-{idx:03}: {}", "x".repeat(64)))
4326 .collect();
4327 let expected = lines.clone();
4328
4329 let recv_handle = std::thread::spawn(move || {
4330 let deadline = Instant::now() + Duration::from_secs(4);
4331 let mut received = Vec::new();
4332 while received.len() < expected.len() && Instant::now() < deadline {
4333 if let Ok(msg) = rx.try_recv() {
4334 received.push(msg);
4335 }
4336 std::thread::sleep(Duration::from_millis(1));
4337 }
4338 assert_eq!(
4339 received.len(),
4340 expected.len(),
4341 "should receive every line under sustained backpressure"
4342 );
4343 assert_eq!(received, expected, "line ordering must remain stable");
4344 });
4345
4346 for line in lines {
4347 assert!(try_send_line_with_backpressure(&tx, line));
4348 }
4349 drop(tx);
4350 recv_handle.join().expect("receiver thread should finish");
4351 }
4352
4353 #[test]
4354 fn try_send_line_with_backpressure_preserves_partial_line_without_newline() {
4355 let (tx, rx) = mpsc::channel::<String>(1);
4356 tx.try_send("busy".to_string())
4357 .expect("seed initial busy slot");
4358
4359 let partial_json = "{\"type\":\"prompt\",\"message\":\"tail-fragment-ascii\"".to_string();
4360 let expected = partial_json.clone();
4361
4362 let recv_handle = std::thread::spawn(move || {
4363 std::thread::sleep(Duration::from_millis(25));
4364 let first = rx.try_recv().expect("seeded line should be available");
4365 assert_eq!(first, "busy");
4366 let deadline = Instant::now() + Duration::from_millis(500);
4367 let second = loop {
4368 if let Ok(line) = rx.try_recv() {
4369 break line;
4370 }
4371 assert!(
4372 Instant::now() < deadline,
4373 "partial payload should be available"
4374 );
4375 std::thread::sleep(Duration::from_millis(5));
4376 };
4377 assert_eq!(second, expected);
4378 });
4379
4380 assert!(try_send_line_with_backpressure(&tx, partial_json));
4381 drop(tx);
4382 recv_handle.join().expect("receiver thread should finish");
4383 }
4384
4385 #[test]
4390 fn snapshot_pending_count() {
4391 let snapshot = RpcStateSnapshot {
4392 steering_count: 3,
4393 follow_up_count: 7,
4394 steering_mode: QueueMode::All,
4395 follow_up_mode: QueueMode::OneAtATime,
4396 auto_compaction_enabled: false,
4397 auto_retry_enabled: true,
4398 };
4399 assert_eq!(snapshot.pending_count(), 10);
4400 }
4401
4402 #[test]
4403 fn snapshot_pending_count_zero() {
4404 let snapshot = RpcStateSnapshot {
4405 steering_count: 0,
4406 follow_up_count: 0,
4407 steering_mode: QueueMode::All,
4408 follow_up_mode: QueueMode::All,
4409 auto_compaction_enabled: false,
4410 auto_retry_enabled: false,
4411 };
4412 assert_eq!(snapshot.pending_count(), 0);
4413 }
4414
4415 #[test]
4420 fn retry_delay_first_attempt_is_base() {
4421 let config = Config::default();
4422 assert_eq!(retry_delay_ms(&config, 0), config.retry_base_delay_ms());
4424 assert_eq!(retry_delay_ms(&config, 1), config.retry_base_delay_ms());
4425 }
4426
4427 #[test]
4428 fn retry_delay_doubles_each_attempt() {
4429 let config = Config::default();
4430 let base = config.retry_base_delay_ms();
4431 assert_eq!(retry_delay_ms(&config, 2), base * 2);
4433 assert_eq!(retry_delay_ms(&config, 3), base * 4);
4434 }
4435
4436 #[test]
4437 fn retry_delay_capped_at_max() {
4438 let config = Config::default();
4439 let max = config.retry_max_delay_ms();
4440 let delay = retry_delay_ms(&config, 30);
4442 assert_eq!(delay, max);
4443 }
4444
4445 #[test]
4446 fn retry_delay_saturates_on_overflow() {
4447 let config = Config::default();
4448 let delay = retry_delay_ms(&config, u32::MAX);
4450 assert!(delay <= config.retry_max_delay_ms());
4451 }
4452
4453 #[test]
4458 fn auto_compact_below_threshold() {
4459 assert!(!should_auto_compact(50_000, 200_000, 40_000));
4461 }
4462
4463 #[test]
4464 fn auto_compact_above_threshold() {
4465 assert!(should_auto_compact(170_000, 200_000, 40_000));
4467 }
4468
4469 #[test]
4470 fn auto_compact_exact_threshold() {
4471 assert!(!should_auto_compact(160_000, 200_000, 40_000));
4473 }
4474
4475 #[test]
4476 fn auto_compact_reserve_exceeds_window() {
4477 assert!(should_auto_compact(1, 100, 200));
4479 }
4480
4481 #[test]
4482 fn auto_compact_zero_tokens() {
4483 assert!(!should_auto_compact(0, 200_000, 40_000));
4484 }
4485
4486 #[test]
4491 fn flatten_content_blocks_unwraps_inner_0() {
4492 let mut value = json!({
4493 "content": [
4494 {"0": {"type": "text", "text": "hello"}}
4495 ]
4496 });
4497 rpc_flatten_content_blocks(&mut value);
4498 let blocks = value["content"].as_array().unwrap();
4499 assert_eq!(blocks[0]["type"], "text");
4500 assert_eq!(blocks[0]["text"], "hello");
4501 assert!(blocks[0].get("0").is_none());
4502 }
4503
4504 #[test]
4505 fn flatten_content_blocks_preserves_non_wrapped() {
4506 let mut value = json!({
4507 "content": [
4508 {"type": "text", "text": "already flat"}
4509 ]
4510 });
4511 rpc_flatten_content_blocks(&mut value);
4512 let blocks = value["content"].as_array().unwrap();
4513 assert_eq!(blocks[0]["type"], "text");
4514 assert_eq!(blocks[0]["text"], "already flat");
4515 }
4516
4517 #[test]
4518 fn flatten_content_blocks_no_content_field() {
4519 let mut value = json!({"role": "assistant"});
4520 rpc_flatten_content_blocks(&mut value); assert_eq!(value, json!({"role": "assistant"}));
4522 }
4523
4524 #[test]
4525 fn flatten_content_blocks_non_object() {
4526 let mut value = json!("just a string");
4527 rpc_flatten_content_blocks(&mut value); }
4529
4530 #[test]
4531 fn flatten_content_blocks_existing_keys_not_overwritten() {
4532 let mut value = json!({
4534 "content": [
4535 {"type": "existing", "0": {"type": "inner", "extra": "data"}}
4536 ]
4537 });
4538 rpc_flatten_content_blocks(&mut value);
4539 let blocks = value["content"].as_array().unwrap();
4540 assert_eq!(blocks[0]["type"], "existing");
4542 assert_eq!(blocks[0]["extra"], "data");
4544 }
4545
4546 #[test]
4551 fn parse_prompt_images_none() {
4552 let images = parse_prompt_images(None).unwrap();
4553 assert!(images.is_empty());
4554 }
4555
4556 #[test]
4557 fn parse_prompt_images_empty_array() {
4558 let val = json!([]);
4559 let images = parse_prompt_images(Some(&val)).unwrap();
4560 assert!(images.is_empty());
4561 }
4562
4563 #[test]
4564 fn parse_prompt_images_valid() {
4565 let val = json!([{
4566 "type": "image",
4567 "source": {
4568 "type": "base64",
4569 "mediaType": "image/png",
4570 "data": "iVBORw0KGgo="
4571 }
4572 }]);
4573 let images = parse_prompt_images(Some(&val)).unwrap();
4574 assert_eq!(images.len(), 1);
4575 assert_eq!(images[0].mime_type, "image/png");
4576 assert_eq!(images[0].data, "iVBORw0KGgo=");
4577 }
4578
4579 #[test]
4580 fn parse_prompt_images_skips_non_image_type() {
4581 let val = json!([{
4582 "type": "text",
4583 "text": "hello"
4584 }]);
4585 let images = parse_prompt_images(Some(&val)).unwrap();
4586 assert!(images.is_empty());
4587 }
4588
4589 #[test]
4590 fn parse_prompt_images_skips_non_base64_source() {
4591 let val = json!([{
4592 "type": "image",
4593 "source": {
4594 "type": "url",
4595 "url": "https://example.com/img.png"
4596 }
4597 }]);
4598 let images = parse_prompt_images(Some(&val)).unwrap();
4599 assert!(images.is_empty());
4600 }
4601
4602 #[test]
4603 fn parse_prompt_images_not_array_errors() {
4604 let val = json!("not-an-array");
4605 assert!(parse_prompt_images(Some(&val)).is_err());
4606 }
4607
4608 #[test]
4609 fn parse_prompt_images_multiple_valid() {
4610 let val = json!([
4611 {
4612 "type": "image",
4613 "source": {"type": "base64", "mediaType": "image/jpeg", "data": "abc"}
4614 },
4615 {
4616 "type": "image",
4617 "source": {"type": "base64", "mediaType": "image/webp", "data": "def"}
4618 }
4619 ]);
4620 let images = parse_prompt_images(Some(&val)).unwrap();
4621 assert_eq!(images.len(), 2);
4622 assert_eq!(images[0].mime_type, "image/jpeg");
4623 assert_eq!(images[1].mime_type, "image/webp");
4624 }
4625
4626 #[test]
4631 fn extract_user_text_from_text_content() {
4632 let content = UserContent::Text("hello world".to_string());
4633 assert_eq!(extract_user_text(&content), Some("hello world".to_string()));
4634 }
4635
4636 #[test]
4637 fn extract_user_text_from_blocks() {
4638 let content = UserContent::Blocks(vec![
4639 ContentBlock::Image(ImageContent {
4640 data: String::new(),
4641 mime_type: "image/png".to_string(),
4642 }),
4643 ContentBlock::Text(TextContent::new("found it")),
4644 ]);
4645 assert_eq!(extract_user_text(&content), Some("found it".to_string()));
4646 }
4647
4648 #[test]
4649 fn extract_user_text_blocks_no_text() {
4650 let content = UserContent::Blocks(vec![ContentBlock::Image(ImageContent {
4651 data: String::new(),
4652 mime_type: "image/png".to_string(),
4653 })]);
4654 assert_eq!(extract_user_text(&content), None);
4655 }
4656
4657 #[test]
4662 fn parse_thinking_level_all_variants() {
4663 assert_eq!(parse_thinking_level("off").unwrap(), ThinkingLevel::Off);
4664 assert_eq!(parse_thinking_level("none").unwrap(), ThinkingLevel::Off);
4665 assert_eq!(parse_thinking_level("0").unwrap(), ThinkingLevel::Off);
4666 assert_eq!(
4667 parse_thinking_level("minimal").unwrap(),
4668 ThinkingLevel::Minimal
4669 );
4670 assert_eq!(parse_thinking_level("min").unwrap(), ThinkingLevel::Minimal);
4671 assert_eq!(parse_thinking_level("low").unwrap(), ThinkingLevel::Low);
4672 assert_eq!(parse_thinking_level("1").unwrap(), ThinkingLevel::Low);
4673 assert_eq!(
4674 parse_thinking_level("medium").unwrap(),
4675 ThinkingLevel::Medium
4676 );
4677 assert_eq!(parse_thinking_level("med").unwrap(), ThinkingLevel::Medium);
4678 assert_eq!(parse_thinking_level("2").unwrap(), ThinkingLevel::Medium);
4679 assert_eq!(parse_thinking_level("high").unwrap(), ThinkingLevel::High);
4680 assert_eq!(parse_thinking_level("3").unwrap(), ThinkingLevel::High);
4681 assert_eq!(parse_thinking_level("xhigh").unwrap(), ThinkingLevel::XHigh);
4682 assert_eq!(parse_thinking_level("4").unwrap(), ThinkingLevel::XHigh);
4683 }
4684
4685 #[test]
4686 fn parse_thinking_level_case_insensitive() {
4687 assert_eq!(parse_thinking_level("HIGH").unwrap(), ThinkingLevel::High);
4688 assert_eq!(
4689 parse_thinking_level("Medium").unwrap(),
4690 ThinkingLevel::Medium
4691 );
4692 assert_eq!(parse_thinking_level(" Off ").unwrap(), ThinkingLevel::Off);
4693 }
4694
4695 #[test]
4696 fn parse_thinking_level_invalid() {
4697 assert!(parse_thinking_level("invalid").is_err());
4698 assert!(parse_thinking_level("").is_err());
4699 assert!(parse_thinking_level("5").is_err());
4700 }
4701
4702 #[test]
4707 fn supports_xhigh_known_models() {
4708 assert!(dummy_entry("gpt-5.1-codex-max", true).supports_xhigh());
4709 assert!(dummy_entry("gpt-5.2", true).supports_xhigh());
4710 assert!(dummy_entry("gpt-5.2-codex", true).supports_xhigh());
4711 assert!(dummy_entry("gpt-5.3-codex", true).supports_xhigh());
4712 }
4713
4714 #[test]
4715 fn supports_xhigh_unknown_models() {
4716 assert!(!dummy_entry("claude-opus-4-6", true).supports_xhigh());
4717 assert!(!dummy_entry("gpt-4o", true).supports_xhigh());
4718 assert!(!dummy_entry("", true).supports_xhigh());
4719 }
4720
4721 #[test]
4722 fn clamp_thinking_non_reasoning_model() {
4723 let entry = dummy_entry("claude-3-haiku", false);
4724 assert_eq!(
4725 entry.clamp_thinking_level(ThinkingLevel::High),
4726 ThinkingLevel::Off
4727 );
4728 }
4729
4730 #[test]
4731 fn clamp_thinking_xhigh_without_support() {
4732 let entry = dummy_entry("claude-opus-4-6", true);
4733 assert_eq!(
4734 entry.clamp_thinking_level(ThinkingLevel::XHigh),
4735 ThinkingLevel::High
4736 );
4737 }
4738
4739 #[test]
4740 fn clamp_thinking_xhigh_with_support() {
4741 let entry = dummy_entry("gpt-5.2", true);
4742 assert_eq!(
4743 entry.clamp_thinking_level(ThinkingLevel::XHigh),
4744 ThinkingLevel::XHigh
4745 );
4746 }
4747
4748 #[test]
4749 fn clamp_thinking_normal_level_passthrough() {
4750 let entry = dummy_entry("claude-opus-4-6", true);
4751 assert_eq!(
4752 entry.clamp_thinking_level(ThinkingLevel::Medium),
4753 ThinkingLevel::Medium
4754 );
4755 }
4756
4757 #[test]
4762 fn available_thinking_levels_non_reasoning() {
4763 let entry = dummy_entry("gpt-4o-mini", false);
4764 let levels = available_thinking_levels(&entry);
4765 assert_eq!(levels, vec![ThinkingLevel::Off]);
4766 }
4767
4768 #[test]
4769 fn available_thinking_levels_reasoning_no_xhigh() {
4770 let entry = dummy_entry("claude-opus-4-6", true);
4771 let levels = available_thinking_levels(&entry);
4772 assert_eq!(
4773 levels,
4774 vec![
4775 ThinkingLevel::Off,
4776 ThinkingLevel::Minimal,
4777 ThinkingLevel::Low,
4778 ThinkingLevel::Medium,
4779 ThinkingLevel::High,
4780 ]
4781 );
4782 }
4783
4784 #[test]
4785 fn available_thinking_levels_reasoning_with_xhigh() {
4786 let entry = dummy_entry("gpt-5.2", true);
4787 let levels = available_thinking_levels(&entry);
4788 assert_eq!(
4789 levels,
4790 vec![
4791 ThinkingLevel::Off,
4792 ThinkingLevel::Minimal,
4793 ThinkingLevel::Low,
4794 ThinkingLevel::Medium,
4795 ThinkingLevel::High,
4796 ThinkingLevel::XHigh,
4797 ]
4798 );
4799 }
4800
4801 #[test]
4806 fn rpc_model_from_entry_basic() {
4807 let entry = dummy_entry("claude-opus-4-6", true);
4808 let value = rpc_model_from_entry(&entry);
4809 assert_eq!(value["id"], "claude-opus-4-6");
4810 assert_eq!(value["name"], "claude-opus-4-6");
4811 assert_eq!(value["provider"], "anthropic");
4812 assert_eq!(value["reasoning"], true);
4813 assert_eq!(value["contextWindow"], 200_000);
4814 assert_eq!(value["maxTokens"], 8192);
4815 }
4816
4817 #[test]
4818 fn rpc_model_from_entry_input_types() {
4819 let mut entry = dummy_entry("gpt-4o", false);
4820 entry.model.input = vec![InputType::Text, InputType::Image];
4821 let value = rpc_model_from_entry(&entry);
4822 let input = value["input"].as_array().unwrap();
4823 assert_eq!(input.len(), 2);
4824 assert_eq!(input[0], "text");
4825 assert_eq!(input[1], "image");
4826 }
4827
4828 #[test]
4829 fn rpc_model_from_entry_cost_present() {
4830 let entry = dummy_entry("test-model", false);
4831 let value = rpc_model_from_entry(&entry);
4832 assert!(value.get("cost").is_some());
4833 let cost = &value["cost"];
4834 assert_eq!(cost["input"], 3.0);
4835 assert_eq!(cost["output"], 15.0);
4836 }
4837
4838 #[test]
4839 fn current_model_entry_matches_provider_alias_and_model_case() {
4840 let mut model = dummy_entry("gpt-4o-mini", true);
4841 model.model.provider = "openrouter".to_string();
4842 let options = rpc_options_with_models(vec![model]);
4843
4844 let mut session = Session::in_memory();
4845 session.header.provider = Some("open-router".to_string());
4846 session.header.model_id = Some("GPT-4O-MINI".to_string());
4847
4848 let resolved = current_model_entry(&session, &options).expect("resolve aliased model");
4849 assert_eq!(resolved.model.provider, "openrouter");
4850 assert_eq!(resolved.model.id, "gpt-4o-mini");
4851 }
4852
4853 #[test]
4854 fn session_state_resolves_model_for_provider_alias() {
4855 let mut model = dummy_entry("gpt-4o-mini", true);
4856 model.model.provider = "openrouter".to_string();
4857 let options = rpc_options_with_models(vec![model]);
4858
4859 let mut session = Session::in_memory();
4860 session.header.provider = Some("open-router".to_string());
4861 session.header.model_id = Some("gpt-4o-mini".to_string());
4862
4863 let snapshot = RpcStateSnapshot {
4864 steering_count: 0,
4865 follow_up_count: 0,
4866 steering_mode: QueueMode::OneAtATime,
4867 follow_up_mode: QueueMode::OneAtATime,
4868 auto_compaction_enabled: false,
4869 auto_retry_enabled: false,
4870 };
4871
4872 let state = session_state(&session, &options, &snapshot, false, false);
4873 assert_eq!(state["model"]["provider"], "openrouter");
4874 assert_eq!(state["model"]["id"], "gpt-4o-mini");
4875 }
4876
4877 #[test]
4882 fn error_hints_value_produces_expected_shape() {
4883 let error = Error::validation("test error");
4884 let value = error_hints_value(&error);
4885 assert!(value.get("summary").is_some());
4886 assert!(value.get("hints").is_some());
4887 assert!(value.get("contextFields").is_some());
4888 assert!(value["hints"].is_array());
4889 }
4890
4891 #[test]
4896 fn parse_ui_response_id_empty_string() {
4897 let value = json!({"requestId": ""});
4898 assert_eq!(rpc_parse_extension_ui_response_id(&value), None);
4899 }
4900
4901 #[test]
4902 fn parse_ui_response_id_whitespace_only() {
4903 let value = json!({"requestId": " "});
4904 assert_eq!(rpc_parse_extension_ui_response_id(&value), None);
4905 }
4906
4907 #[test]
4908 fn parse_ui_response_id_trims() {
4909 let value = json!({"requestId": " req-1 "});
4910 assert_eq!(
4911 rpc_parse_extension_ui_response_id(&value),
4912 Some("req-1".to_string())
4913 );
4914 }
4915
4916 #[test]
4917 fn parse_ui_response_id_prefers_request_id_over_id_alias() {
4918 let value = json!({"requestId": "req-1", "id": "legacy-id"});
4919 assert_eq!(
4920 rpc_parse_extension_ui_response_id(&value),
4921 Some("req-1".to_string())
4922 );
4923 }
4924
4925 #[test]
4926 fn parse_ui_response_id_falls_back_to_id_alias_when_request_id_not_string() {
4927 let value = json!({"requestId": 123, "id": "legacy-id"});
4928 assert_eq!(
4929 rpc_parse_extension_ui_response_id(&value),
4930 Some("legacy-id".to_string())
4931 );
4932 }
4933
4934 #[test]
4935 fn parse_ui_response_id_falls_back_to_id_alias_when_request_id_blank() {
4936 let value = json!({"requestId": "", "id": "legacy-id"});
4937 assert_eq!(
4938 rpc_parse_extension_ui_response_id(&value),
4939 Some("legacy-id".to_string())
4940 );
4941 }
4942
4943 #[test]
4944 fn parse_ui_response_id_falls_back_to_id_alias_when_request_id_whitespace() {
4945 let value = json!({"requestId": " ", "id": "legacy-id"});
4946 assert_eq!(
4947 rpc_parse_extension_ui_response_id(&value),
4948 Some("legacy-id".to_string())
4949 );
4950 }
4951
4952 #[test]
4953 fn parse_ui_response_id_neither_field() {
4954 let value = json!({"type": "something"});
4955 assert_eq!(rpc_parse_extension_ui_response_id(&value), None);
4956 }
4957
4958 #[test]
4963 fn parse_editor_response_requires_string() {
4964 let active = ExtensionUiRequest::new("req-1", "editor", json!({"title": "t"}));
4965 let ok = json!({"type": "extension_ui_response", "requestId": "req-1", "value": "code"});
4966 assert!(rpc_parse_extension_ui_response(&ok, &active).is_ok());
4967
4968 let bad = json!({"type": "extension_ui_response", "requestId": "req-1", "value": 42});
4969 assert!(rpc_parse_extension_ui_response(&bad, &active).is_err());
4970 }
4971
4972 #[test]
4973 fn parse_notify_response_returns_ack() {
4974 let active = ExtensionUiRequest::new("req-1", "notify", json!({"title": "t"}));
4975 let val = json!({"type": "extension_ui_response", "requestId": "req-1"});
4976 let resp = rpc_parse_extension_ui_response(&val, &active).unwrap();
4977 assert!(!resp.cancelled);
4978 }
4979
4980 #[test]
4981 fn parse_unknown_method_errors() {
4982 let active = ExtensionUiRequest::new("req-1", "unknown_method", json!({}));
4983 let val = json!({"type": "extension_ui_response", "requestId": "req-1"});
4984 assert!(rpc_parse_extension_ui_response(&val, &active).is_err());
4985 }
4986
4987 #[test]
4988 fn parse_select_with_object_options() {
4989 let active = ExtensionUiRequest::new(
4990 "req-1",
4991 "select",
4992 json!({"title": "pick", "options": [{"label": "Alpha", "value": "a"}, {"label": "Beta"}]}),
4993 );
4994 let val_a = json!({"type": "extension_ui_response", "requestId": "req-1", "value": "a"});
4996 let resp = rpc_parse_extension_ui_response(&val_a, &active).unwrap();
4997 assert_eq!(resp.value, Some(json!("a")));
4998
4999 let val_b = json!({"type": "extension_ui_response", "requestId": "req-1", "value": "Beta"});
5001 let resp = rpc_parse_extension_ui_response(&val_b, &active).unwrap();
5002 assert_eq!(resp.value, Some(json!("Beta")));
5003 }
5004}