Skip to main content

pi/
rpc.rs

1//! RPC mode: headless JSON protocol over stdin/stdout.
2//!
3//! This implements a compatibility subset of pi-mono's RPC protocol
4//! (see legacy `docs/rpc.md` in `legacy_pi_mono_code`).
5
6#![allow(clippy::significant_drop_tightening)]
7#![allow(clippy::too_many_arguments)]
8#![allow(clippy::too_many_lines)]
9#![allow(clippy::cast_possible_truncation)]
10#![allow(clippy::cast_lossless)]
11#![allow(clippy::ignored_unit_patterns)]
12#![allow(clippy::needless_pass_by_value)]
13
14use crate::agent::{AbortHandle, AgentEvent, AgentSession, QueueMode};
15use crate::agent_cx::AgentCx;
16use crate::auth::AuthStorage;
17use crate::compaction::{
18    ResolvedCompactionSettings, compact, compaction_details_to_value, prepare_compaction,
19};
20use crate::config::Config;
21use crate::error::{Error, Result};
22use crate::error_hints;
23use crate::extensions::{ExtensionManager, ExtensionUiRequest, ExtensionUiResponse};
24use crate::model::{
25    ContentBlock, ImageContent, Message, StopReason, TextContent, UserContent, UserMessage,
26};
27use crate::models::ModelEntry;
28use crate::provider_metadata::{canonical_provider_id, provider_metadata};
29use crate::providers;
30use crate::resources::ResourceLoader;
31use crate::session::SessionMessage;
32use crate::tools::{DEFAULT_MAX_BYTES, DEFAULT_MAX_LINES, truncate_tail};
33use asupersync::channel::{mpsc, oneshot};
34use asupersync::runtime::RuntimeHandle;
35use asupersync::sync::{Mutex, OwnedMutexGuard};
36use asupersync::time::{sleep, wall_now};
37use memchr::memchr_iter;
38use serde_json::{Value, json};
39use std::collections::VecDeque;
40use std::io::{self, BufRead, Write};
41use std::path::PathBuf;
42use std::sync::Arc;
43use std::sync::atomic::{AtomicBool, Ordering};
44use std::time::{Duration, Instant};
45
46fn provider_ids_match(left: &str, right: &str) -> bool {
47    let left = left.trim();
48    let right = right.trim();
49    if left.eq_ignore_ascii_case(right) {
50        return true;
51    }
52
53    let left_canonical = canonical_provider_id(left).unwrap_or(left);
54    let right_canonical = canonical_provider_id(right).unwrap_or(right);
55
56    left_canonical.eq_ignore_ascii_case(right)
57        || right_canonical.eq_ignore_ascii_case(left)
58        || left_canonical.eq_ignore_ascii_case(right_canonical)
59}
60
61#[derive(Clone)]
62pub struct RpcOptions {
63    pub config: Config,
64    pub resources: ResourceLoader,
65    pub available_models: Vec<ModelEntry>,
66    pub scoped_models: Vec<RpcScopedModel>,
67    pub auth: AuthStorage,
68    pub runtime_handle: RuntimeHandle,
69}
70
71#[derive(Debug, Clone)]
72pub struct RpcScopedModel {
73    pub model: ModelEntry,
74    pub thinking_level: Option<crate::model::ThinkingLevel>,
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78enum StreamingBehavior {
79    Steer,
80    FollowUp,
81}
82
83#[derive(Debug, Clone)]
84struct RpcStateSnapshot {
85    steering_count: usize,
86    follow_up_count: usize,
87    steering_mode: QueueMode,
88    follow_up_mode: QueueMode,
89    auto_compaction_enabled: bool,
90    auto_retry_enabled: bool,
91}
92
93impl From<&RpcSharedState> for RpcStateSnapshot {
94    fn from(state: &RpcSharedState) -> Self {
95        Self {
96            steering_count: state.steering.len(),
97            follow_up_count: state.follow_up.len(),
98            steering_mode: state.steering_mode,
99            follow_up_mode: state.follow_up_mode,
100            auto_compaction_enabled: state.auto_compaction_enabled,
101            auto_retry_enabled: state.auto_retry_enabled,
102        }
103    }
104}
105
106impl RpcStateSnapshot {
107    const fn pending_count(&self) -> usize {
108        self.steering_count + self.follow_up_count
109    }
110}
111
112use crate::config::parse_queue_mode;
113
114fn parse_streaming_behavior(value: Option<&Value>) -> Result<Option<StreamingBehavior>> {
115    let Some(value) = value else {
116        return Ok(None);
117    };
118    let Some(s) = value.as_str() else {
119        return Err(Error::validation("streamingBehavior must be a string"));
120    };
121    match s {
122        "steer" => Ok(Some(StreamingBehavior::Steer)),
123        "follow-up" | "followUp" => Ok(Some(StreamingBehavior::FollowUp)),
124        _ => Err(Error::validation(format!("Invalid streamingBehavior: {s}"))),
125    }
126}
127
128fn normalize_command_type(command_type: &str) -> &str {
129    match command_type {
130        "follow-up" | "followUp" | "queue-follow-up" | "queueFollowUp" => "follow_up",
131        "get-state" | "getState" => "get_state",
132        "set-model" | "setModel" => "set_model",
133        "set-steering-mode" | "setSteeringMode" => "set_steering_mode",
134        "set-follow-up-mode" | "setFollowUpMode" => "set_follow_up_mode",
135        "set-auto-compaction" | "setAutoCompaction" => "set_auto_compaction",
136        "set-auto-retry" | "setAutoRetry" => "set_auto_retry",
137        _ => command_type,
138    }
139}
140
141fn build_user_message(text: &str, images: &[ImageContent]) -> Message {
142    let timestamp = chrono::Utc::now().timestamp_millis();
143    if images.is_empty() {
144        return Message::User(UserMessage {
145            content: UserContent::Text(text.to_string()),
146            timestamp,
147        });
148    }
149    let mut blocks = vec![ContentBlock::Text(TextContent::new(text.to_string()))];
150    for image in images {
151        blocks.push(ContentBlock::Image(image.clone()));
152    }
153    Message::User(UserMessage {
154        content: UserContent::Blocks(blocks),
155        timestamp,
156    })
157}
158
159fn is_extension_command(message: &str, expanded: &str) -> bool {
160    // Extension commands start with `/` but are not expanded by the resource loader
161    // (skills and prompt templates are expanded before queueing/sending).
162    message.trim_start().starts_with('/') && message == expanded
163}
164
165fn try_send_line_with_backpressure(tx: &mpsc::Sender<String>, mut line: String) -> bool {
166    loop {
167        match tx.try_send(line) {
168            Ok(()) => return true,
169            Err(mpsc::SendError::Full(unsent)) => {
170                line = unsent;
171                std::thread::sleep(Duration::from_millis(10));
172            }
173            Err(mpsc::SendError::Disconnected(_) | mpsc::SendError::Cancelled(_)) => {
174                return false;
175            }
176        }
177    }
178}
179
180#[derive(Debug)]
181struct RpcSharedState {
182    steering: VecDeque<Message>,
183    follow_up: VecDeque<Message>,
184    steering_mode: QueueMode,
185    follow_up_mode: QueueMode,
186    auto_compaction_enabled: bool,
187    auto_retry_enabled: bool,
188}
189
190const MAX_RPC_PENDING_MESSAGES: usize = 128;
191
192impl RpcSharedState {
193    fn new(config: &Config) -> Self {
194        Self {
195            steering: VecDeque::new(),
196            follow_up: VecDeque::new(),
197            steering_mode: config.steering_queue_mode(),
198            follow_up_mode: config.follow_up_queue_mode(),
199            auto_compaction_enabled: config.compaction_enabled(),
200            auto_retry_enabled: config.retry_enabled(),
201        }
202    }
203
204    fn pending_count(&self) -> usize {
205        self.steering.len() + self.follow_up.len()
206    }
207
208    fn push_steering(&mut self, message: Message) -> Result<()> {
209        if self.steering.len() >= MAX_RPC_PENDING_MESSAGES {
210            return Err(Error::session(
211                "Steering queue is full (Do you have too many pending commands?)",
212            ));
213        }
214        self.steering.push_back(message);
215        Ok(())
216    }
217
218    fn push_follow_up(&mut self, message: Message) -> Result<()> {
219        if self.follow_up.len() >= MAX_RPC_PENDING_MESSAGES {
220            return Err(Error::session("Follow-up queue is full"));
221        }
222        self.follow_up.push_back(message);
223        Ok(())
224    }
225
226    fn pop_steering(&mut self) -> Vec<Message> {
227        match self.steering_mode {
228            QueueMode::All => self.steering.drain(..).collect(),
229            QueueMode::OneAtATime => self.steering.pop_front().into_iter().collect(),
230        }
231    }
232
233    fn pop_follow_up(&mut self) -> Vec<Message> {
234        match self.follow_up_mode {
235            QueueMode::All => self.follow_up.drain(..).collect(),
236            QueueMode::OneAtATime => self.follow_up.pop_front().into_iter().collect(),
237        }
238    }
239}
240
241/// Tracks a running bash command so it can be aborted.
242struct RunningBash {
243    id: String,
244    abort_tx: oneshot::Sender<()>,
245}
246
247#[derive(Debug, Default)]
248struct RpcUiBridgeState {
249    active: Option<ExtensionUiRequest>,
250    queue: VecDeque<ExtensionUiRequest>,
251}
252
253pub async fn run_stdio(mut session: AgentSession, options: RpcOptions) -> Result<()> {
254    session.agent.set_queue_modes(
255        options.config.steering_queue_mode(),
256        options.config.follow_up_queue_mode(),
257    );
258
259    let (in_tx, in_rx) = mpsc::channel::<String>(1024);
260    let (out_tx, out_rx) = std::sync::mpsc::channel::<String>();
261
262    std::thread::spawn(move || {
263        let stdin = io::stdin();
264        let mut reader = io::BufReader::new(stdin.lock());
265        let mut line = String::new();
266        loop {
267            line.clear();
268            match reader.read_line(&mut line) {
269                Ok(0) | Err(_) => break,
270                Ok(_) => {
271                    let line_to_send = std::mem::take(&mut line);
272                    // Retry loop to handle backpressure (channel full) without dropping input.
273                    // Stop when the receiver side has closed so this thread does not spin forever.
274                    if !try_send_line_with_backpressure(&in_tx, line_to_send) {
275                        break;
276                    }
277                }
278            }
279        }
280    });
281
282    std::thread::spawn(move || {
283        let stdout = io::stdout();
284        let mut writer = io::BufWriter::new(stdout.lock());
285        for line in out_rx {
286            if writer.write_all(line.as_bytes()).is_err() {
287                break;
288            }
289            if writer.write_all(b"\n").is_err() {
290                break;
291            }
292            if writer.flush().is_err() {
293                break;
294            }
295        }
296    });
297
298    run(session, options, in_rx, out_tx).await
299}
300
301#[allow(clippy::too_many_lines)]
302#[allow(
303    clippy::significant_drop_tightening,
304    clippy::significant_drop_in_scrutinee
305)]
306pub async fn run(
307    session: AgentSession,
308    options: RpcOptions,
309    in_rx: mpsc::Receiver<String>,
310    out_tx: std::sync::mpsc::Sender<String>,
311) -> Result<()> {
312    let cx = AgentCx::for_request();
313    let session_handle = Arc::clone(&session.session);
314    let session = Arc::new(Mutex::new(session));
315    let shared_state = Arc::new(Mutex::new(RpcSharedState::new(&options.config)));
316    let is_streaming = Arc::new(AtomicBool::new(false));
317    let is_compacting = Arc::new(AtomicBool::new(false));
318    let abort_handle: Arc<Mutex<Option<AbortHandle>>> = Arc::new(Mutex::new(None));
319    let bash_state: Arc<Mutex<Option<RunningBash>>> = Arc::new(Mutex::new(None));
320    let retry_abort = Arc::new(AtomicBool::new(false));
321
322    {
323        use futures::future::BoxFuture;
324        let steering_state = Arc::clone(&shared_state);
325        let follow_state = Arc::clone(&shared_state);
326        let steering_cx = cx.clone();
327        let follow_cx = cx.clone();
328        let mut guard = session
329            .lock(&cx)
330            .await
331            .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
332        let steering_fetcher = move || -> BoxFuture<'static, Vec<Message>> {
333            let steering_state = Arc::clone(&steering_state);
334            let steering_cx = steering_cx.clone();
335            Box::pin(async move {
336                steering_state
337                    .lock(&steering_cx)
338                    .await
339                    .map_or_else(|_| Vec::new(), |mut state| state.pop_steering())
340            })
341        };
342        let follow_fetcher = move || -> BoxFuture<'static, Vec<Message>> {
343            let follow_state = Arc::clone(&follow_state);
344            let follow_cx = follow_cx.clone();
345            Box::pin(async move {
346                follow_state
347                    .lock(&follow_cx)
348                    .await
349                    .map_or_else(|_| Vec::new(), |mut state| state.pop_follow_up())
350            })
351        };
352        guard.agent.register_message_fetchers(
353            Some(Arc::new(steering_fetcher)),
354            Some(Arc::new(follow_fetcher)),
355        );
356    }
357
358    // Set up extension UI channel for RPC mode.
359    // When extensions request UI (capability prompts, etc.), we emit them as
360    // JSON notifications so the RPC client can respond programmatically.
361    let rpc_extension_manager = {
362        let cx_ui = cx.clone();
363        let guard = session
364            .lock(&cx_ui)
365            .await
366            .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
367        guard
368            .extensions
369            .as_ref()
370            .map(crate::extensions::ExtensionRegion::manager)
371            .cloned()
372    };
373
374    let rpc_ui_state: Option<Arc<Mutex<RpcUiBridgeState>>> = rpc_extension_manager
375        .as_ref()
376        .map(|_| Arc::new(Mutex::new(RpcUiBridgeState::default())));
377
378    if let Some(ref manager) = rpc_extension_manager {
379        let (extension_ui_tx, extension_ui_rx) =
380            asupersync::channel::mpsc::channel::<ExtensionUiRequest>(64);
381        manager.set_ui_sender(extension_ui_tx);
382
383        let out_tx_ui = out_tx.clone();
384        let ui_state = rpc_ui_state
385            .as_ref()
386            .map(Arc::clone)
387            .expect("rpc ui state should exist when extension manager exists");
388        let manager_ui = (*manager).clone();
389        let runtime_handle_ui = options.runtime_handle.clone();
390        options.runtime_handle.spawn(async move {
391            const MAX_UI_PENDING_REQUESTS: usize = 64;
392            let cx = AgentCx::for_request();
393            while let Ok(request) = extension_ui_rx.recv(&cx).await {
394                if request.expects_response() {
395                    let emit_now = {
396                        let Ok(mut guard) = ui_state.lock(&cx).await else {
397                            return;
398                        };
399                        if guard.active.is_none() {
400                            guard.active = Some(request.clone());
401                            true
402                        } else if guard.queue.len() < MAX_UI_PENDING_REQUESTS {
403                            guard.queue.push_back(request.clone());
404                            false
405                        } else {
406                            drop(guard);
407                            let _ = manager_ui.respond_ui(ExtensionUiResponse {
408                                id: request.id.clone(),
409                                value: None,
410                                cancelled: true,
411                            });
412                            false
413                        }
414                    };
415
416                    if emit_now {
417                        rpc_emit_extension_ui_request(
418                            &runtime_handle_ui,
419                            Arc::clone(&ui_state),
420                            manager_ui.clone(),
421                            out_tx_ui.clone(),
422                            request,
423                        );
424                    }
425                } else {
426                    // Fire-and-forget UI updates should not be queued.
427                    let rpc_event = request.to_rpc_event();
428                    let _ = out_tx_ui.send(event(&rpc_event));
429                }
430            }
431        });
432    }
433
434    while let Ok(line) = in_rx.recv(&cx).await {
435        if line.trim().is_empty() {
436            continue;
437        }
438
439        let parsed: Value = match serde_json::from_str(&line) {
440            Ok(v) => v,
441            Err(err) => {
442                let resp = response_error(None, "parse", format!("Failed to parse command: {err}"));
443                let _ = out_tx.send(resp);
444                continue;
445            }
446        };
447
448        let Some(command_type_raw) = parsed.get("type").and_then(Value::as_str) else {
449            let resp = response_error(None, "parse", "Missing command type".to_string());
450            let _ = out_tx.send(resp);
451            continue;
452        };
453        let command_type = normalize_command_type(command_type_raw);
454
455        let id = parsed.get("id").and_then(Value::as_str).map(str::to_string);
456
457        match command_type {
458            "prompt" => {
459                let Some(message) = parsed
460                    .get("message")
461                    .and_then(Value::as_str)
462                    .map(String::from)
463                else {
464                    let resp = response_error(id, "prompt", "Missing message".to_string());
465                    let _ = out_tx.send(resp);
466                    continue;
467                };
468
469                let images = match parse_prompt_images(parsed.get("images")) {
470                    Ok(images) => images,
471                    Err(err) => {
472                        let resp = response_error_with_hints(id, "prompt", &err);
473                        let _ = out_tx.send(resp);
474                        continue;
475                    }
476                };
477
478                let streaming_behavior =
479                    match parse_streaming_behavior(parsed.get("streamingBehavior")) {
480                        Ok(value) => value,
481                        Err(err) => {
482                            let resp = response_error_with_hints(id, "prompt", &err);
483                            let _ = out_tx.send(resp);
484                            continue;
485                        }
486                    };
487
488                let expanded = options.resources.expand_input(&message);
489
490                if is_streaming.load(Ordering::SeqCst) {
491                    if streaming_behavior.is_none() {
492                        let resp = response_error(
493                            id,
494                            "prompt",
495                            "Agent is currently streaming; specify streamingBehavior".to_string(),
496                        );
497                        let _ = out_tx.send(resp);
498                        continue;
499                    }
500
501                    let queued_result = {
502                        let mut state = shared_state
503                            .lock(&cx)
504                            .await
505                            .map_err(|err| Error::session(format!("state lock failed: {err}")))?;
506                        match streaming_behavior {
507                            Some(StreamingBehavior::Steer) => {
508                                state.push_steering(build_user_message(&expanded, &images))
509                            }
510                            Some(StreamingBehavior::FollowUp) => {
511                                state.push_follow_up(build_user_message(&expanded, &images))
512                            }
513                            None => Ok(()), // Unreachable due to check above
514                        }
515                    };
516
517                    match queued_result {
518                        Ok(()) => {
519                            let _ = out_tx.send(response_ok(id, "prompt", None));
520                        }
521                        Err(err) => {
522                            let resp = response_error_with_hints(id, "prompt", &err);
523                            let _ = out_tx.send(resp);
524                        }
525                    }
526                    continue;
527                }
528
529                // Ack immediately.
530                let _ = out_tx.send(response_ok(id, "prompt", None));
531
532                is_streaming.store(true, Ordering::SeqCst);
533
534                let out_tx = out_tx.clone();
535                let session = Arc::clone(&session);
536                let shared_state = Arc::clone(&shared_state);
537                let is_streaming = Arc::clone(&is_streaming);
538                let is_compacting = Arc::clone(&is_compacting);
539                let abort_handle_slot = Arc::clone(&abort_handle);
540                let retry_abort = retry_abort.clone();
541                let options = options.clone();
542                let expanded = expanded.clone();
543                let runtime_handle = options.runtime_handle.clone();
544                runtime_handle.spawn(async move {
545                    let cx = AgentCx::for_request();
546                    run_prompt_with_retry(
547                        session,
548                        shared_state,
549                        is_streaming,
550                        is_compacting,
551                        abort_handle_slot,
552                        out_tx,
553                        retry_abort,
554                        options,
555                        expanded,
556                        images,
557                        cx,
558                    )
559                    .await;
560                });
561            }
562
563            "steer" => {
564                let Some(message) = parsed
565                    .get("message")
566                    .and_then(Value::as_str)
567                    .map(String::from)
568                else {
569                    let resp = response_error(id, "steer", "Missing message".to_string());
570                    let _ = out_tx.send(resp);
571                    continue;
572                };
573
574                let expanded = options.resources.expand_input(&message);
575                if is_extension_command(&message, &expanded) {
576                    let resp = response_error(
577                        id,
578                        "steer",
579                        "Extension commands are not allowed with steer".to_string(),
580                    );
581                    let _ = out_tx.send(resp);
582                    continue;
583                }
584
585                if is_streaming.load(Ordering::SeqCst) {
586                    let result = shared_state
587                        .lock(&cx)
588                        .await
589                        .map_err(|err| Error::session(format!("state lock failed: {err}")))?
590                        .push_steering(build_user_message(&expanded, &[]));
591
592                    match result {
593                        Ok(()) => {
594                            let _ = out_tx.send(response_ok(id, "steer", None));
595                        }
596                        Err(err) => {
597                            let _ = out_tx.send(response_error_with_hints(id, "steer", &err));
598                        }
599                    }
600                    continue;
601                }
602
603                let _ = out_tx.send(response_ok(id, "steer", None));
604
605                is_streaming.store(true, Ordering::SeqCst);
606
607                let out_tx = out_tx.clone();
608                let session = Arc::clone(&session);
609                let shared_state = Arc::clone(&shared_state);
610                let is_streaming = Arc::clone(&is_streaming);
611                let is_compacting = Arc::clone(&is_compacting);
612                let abort_handle_slot = Arc::clone(&abort_handle);
613                let retry_abort = retry_abort.clone();
614                let options = options.clone();
615                let expanded = expanded.clone();
616                let runtime_handle = options.runtime_handle.clone();
617                runtime_handle.spawn(async move {
618                    let cx = AgentCx::for_request();
619                    run_prompt_with_retry(
620                        session,
621                        shared_state,
622                        is_streaming,
623                        is_compacting,
624                        abort_handle_slot,
625                        out_tx,
626                        retry_abort,
627                        options,
628                        expanded,
629                        Vec::new(),
630                        cx,
631                    )
632                    .await;
633                });
634            }
635
636            "follow_up" => {
637                let Some(message) = parsed
638                    .get("message")
639                    .and_then(Value::as_str)
640                    .map(String::from)
641                else {
642                    let resp = response_error(id, "follow_up", "Missing message".to_string());
643                    let _ = out_tx.send(resp);
644                    continue;
645                };
646
647                let expanded = options.resources.expand_input(&message);
648                if is_extension_command(&message, &expanded) {
649                    let resp = response_error(
650                        id,
651                        "follow_up",
652                        "Extension commands are not allowed with follow_up".to_string(),
653                    );
654                    let _ = out_tx.send(resp);
655                    continue;
656                }
657
658                if is_streaming.load(Ordering::SeqCst) {
659                    let result = shared_state
660                        .lock(&cx)
661                        .await
662                        .map_err(|err| Error::session(format!("state lock failed: {err}")))?
663                        .push_follow_up(build_user_message(&expanded, &[]));
664
665                    match result {
666                        Ok(()) => {
667                            let _ = out_tx.send(response_ok(id, "follow_up", None));
668                        }
669                        Err(err) => {
670                            let _ = out_tx.send(response_error_with_hints(id, "follow_up", &err));
671                        }
672                    }
673                    continue;
674                }
675
676                let _ = out_tx.send(response_ok(id, "follow_up", None));
677
678                is_streaming.store(true, Ordering::SeqCst);
679
680                let out_tx = out_tx.clone();
681                let session = Arc::clone(&session);
682                let shared_state = Arc::clone(&shared_state);
683                let is_streaming = Arc::clone(&is_streaming);
684                let is_compacting = Arc::clone(&is_compacting);
685                let abort_handle_slot = Arc::clone(&abort_handle);
686                let retry_abort = retry_abort.clone();
687                let options = options.clone();
688                let expanded = expanded.clone();
689                let runtime_handle = options.runtime_handle.clone();
690                runtime_handle.spawn(async move {
691                    let cx = AgentCx::for_request();
692                    run_prompt_with_retry(
693                        session,
694                        shared_state,
695                        is_streaming,
696                        is_compacting,
697                        abort_handle_slot,
698                        out_tx,
699                        retry_abort,
700                        options,
701                        expanded,
702                        Vec::new(),
703                        cx,
704                    )
705                    .await;
706                });
707            }
708
709            "abort" => {
710                let handle = abort_handle
711                    .lock(&cx)
712                    .await
713                    .map_err(|err| Error::session(format!("abort lock failed: {err}")))?
714                    .clone();
715                if let Some(handle) = handle {
716                    handle.abort();
717                }
718                let _ = out_tx.send(response_ok(id, "abort", None));
719            }
720
721            "get_state" => {
722                let snapshot = {
723                    let state = shared_state
724                        .lock(&cx)
725                        .await
726                        .map_err(|err| Error::session(format!("state lock failed: {err}")))?;
727                    RpcStateSnapshot::from(&*state)
728                };
729                let data = {
730                    let inner_session = session_handle.lock(&cx).await.map_err(|err| {
731                        Error::session(format!("inner session lock failed: {err}"))
732                    })?;
733                    session_state(
734                        &inner_session,
735                        &options,
736                        &snapshot,
737                        is_streaming.load(Ordering::SeqCst),
738                        is_compacting.load(Ordering::SeqCst),
739                    )
740                };
741                let _ = out_tx.send(response_ok(id, "get_state", Some(data)));
742            }
743
744            "get_session_stats" => {
745                let data = {
746                    let inner_session = session_handle.lock(&cx).await.map_err(|err| {
747                        Error::session(format!("inner session lock failed: {err}"))
748                    })?;
749                    session_stats(&inner_session)
750                };
751                let _ = out_tx.send(response_ok(id, "get_session_stats", Some(data)));
752            }
753
754            "get_messages" => {
755                let messages = {
756                    let inner_session = session_handle.lock(&cx).await.map_err(|err| {
757                        Error::session(format!("inner session lock failed: {err}"))
758                    })?;
759                    inner_session
760                        .entries_for_current_path()
761                        .iter()
762                        .filter_map(|entry| match entry {
763                            crate::session::SessionEntry::Message(msg) => match msg.message {
764                                SessionMessage::User { .. }
765                                | SessionMessage::Assistant { .. }
766                                | SessionMessage::ToolResult { .. }
767                                | SessionMessage::BashExecution { .. } => Some(msg.message.clone()),
768                                _ => None,
769                            },
770                            _ => None,
771                        })
772                        .collect::<Vec<_>>()
773                };
774                let messages = messages
775                    .into_iter()
776                    .map(rpc_session_message_value)
777                    .collect::<Vec<_>>();
778                let _ = out_tx.send(response_ok(
779                    id,
780                    "get_messages",
781                    Some(json!({ "messages": messages })),
782                ));
783            }
784
785            "get_available_models" => {
786                let models = options
787                    .available_models
788                    .iter()
789                    .map(rpc_model_from_entry)
790                    .collect::<Vec<_>>();
791                let _ = out_tx.send(response_ok(
792                    id,
793                    "get_available_models",
794                    Some(json!({ "models": models })),
795                ));
796            }
797
798            "set_model" => {
799                let Some(provider) = parsed.get("provider").and_then(Value::as_str) else {
800                    let _ = out_tx.send(response_error(
801                        id,
802                        "set_model",
803                        "Missing provider".to_string(),
804                    ));
805                    continue;
806                };
807                let Some(model_id) = parsed.get("modelId").and_then(Value::as_str) else {
808                    let _ = out_tx.send(response_error(
809                        id,
810                        "set_model",
811                        "Missing modelId".to_string(),
812                    ));
813                    continue;
814                };
815
816                let Some(entry) = options
817                    .available_models
818                    .iter()
819                    .find(|m| {
820                        provider_ids_match(&m.model.provider, provider)
821                            && m.model.id.eq_ignore_ascii_case(model_id)
822                    })
823                    .cloned()
824                else {
825                    let _ = out_tx.send(response_error(
826                        id,
827                        "set_model",
828                        format!("Model not found: {provider}/{model_id}"),
829                    ));
830                    continue;
831                };
832
833                let key = resolve_model_key(&options.auth, &entry);
834                if model_requires_configured_credential(&entry) && key.is_none() {
835                    let err = Error::auth(format!(
836                        "Missing credentials for {}/{}",
837                        entry.model.provider, entry.model.id
838                    ));
839                    let _ = out_tx.send(response_error_with_hints(id, "set_model", &err));
840                    continue;
841                }
842
843                let result: Result<()> = async {
844                    let mut guard = session
845                        .lock(&cx)
846                        .await
847                        .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
848                    let provider_impl = providers::create_provider(
849                        &entry,
850                        guard
851                            .extensions
852                            .as_ref()
853                            .map(crate::extensions::ExtensionRegion::manager),
854                    )?;
855                    guard.agent.set_provider(provider_impl);
856                    guard.agent.stream_options_mut().api_key.clone_from(&key);
857                    guard
858                        .agent
859                        .stream_options_mut()
860                        .headers
861                        .clone_from(&entry.headers);
862
863                    apply_model_change(&mut guard, &entry).await?;
864
865                    let current_thinking = guard
866                        .agent
867                        .stream_options()
868                        .thinking_level
869                        .unwrap_or_default();
870                    let clamped = entry.clamp_thinking_level(current_thinking);
871                    if clamped != current_thinking {
872                        apply_thinking_level(&mut guard, clamped).await?;
873                    }
874                    Ok(())
875                }
876                .await;
877
878                match result {
879                    Ok(()) => {
880                        let _ = out_tx.send(response_ok(
881                            id,
882                            "set_model",
883                            Some(rpc_model_from_entry(&entry)),
884                        ));
885                    }
886                    Err(err) => {
887                        let _ = out_tx.send(response_error_with_hints(id, "set_model", &err));
888                    }
889                }
890            }
891
892            "cycle_model" => {
893                let result = async {
894                    let mut guard = session
895                        .lock(&cx)
896                        .await
897                        .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
898                    cycle_model_for_rpc(&mut guard, &options).await
899                }
900                .await;
901
902                match result {
903                    Ok(Some((entry, thinking_level, is_scoped))) => {
904                        let _ = out_tx.send(response_ok(
905                            id,
906                            "cycle_model",
907                            Some(json!({
908                                "model": rpc_model_from_entry(&entry),
909                                "thinkingLevel": thinking_level.to_string(),
910                                "isScoped": is_scoped,
911                            })),
912                        ));
913                    }
914                    Ok(None) => {
915                        let _ =
916                            out_tx.send(response_ok(id.clone(), "cycle_model", Some(Value::Null)));
917                    }
918                    Err(err) => {
919                        let _ = out_tx.send(response_error_with_hints(id, "cycle_model", &err));
920                    }
921                }
922            }
923
924            "set_thinking_level" => {
925                let Some(level) = parsed.get("level").and_then(Value::as_str) else {
926                    let _ = out_tx.send(response_error(
927                        id,
928                        "set_thinking_level",
929                        "Missing level".to_string(),
930                    ));
931                    continue;
932                };
933                let level = match parse_thinking_level(level) {
934                    Ok(level) => level,
935                    Err(err) => {
936                        let _ =
937                            out_tx.send(response_error_with_hints(id, "set_thinking_level", &err));
938                        continue;
939                    }
940                };
941
942                {
943                    let mut guard = session
944                        .lock(&cx)
945                        .await
946                        .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
947                    let level = {
948                        let inner_session = guard.session.lock(&cx).await.map_err(|err| {
949                            Error::session(format!("inner session lock failed: {err}"))
950                        })?;
951                        current_model_entry(&inner_session, &options)
952                            .map_or(level, |entry| entry.clamp_thinking_level(level))
953                    };
954                    if let Err(err) = apply_thinking_level(&mut guard, level).await {
955                        let _ = out_tx.send(response_error_with_hints(
956                            id.clone(),
957                            "set_thinking_level",
958                            &err,
959                        ));
960                        continue;
961                    }
962                }
963                let _ = out_tx.send(response_ok(id, "set_thinking_level", None));
964            }
965
966            "cycle_thinking_level" => {
967                let next = {
968                    let mut guard = session
969                        .lock(&cx)
970                        .await
971                        .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
972                    let entry = {
973                        let inner_session = guard.session.lock(&cx).await.map_err(|err| {
974                            Error::session(format!("inner session lock failed: {err}"))
975                        })?;
976                        current_model_entry(&inner_session, &options).cloned()
977                    };
978                    let Some(entry) = entry else {
979                        let _ =
980                            out_tx.send(response_ok(id, "cycle_thinking_level", Some(Value::Null)));
981                        continue;
982                    };
983                    if !entry.model.reasoning {
984                        let _ =
985                            out_tx.send(response_ok(id, "cycle_thinking_level", Some(Value::Null)));
986                        continue;
987                    }
988
989                    let levels = available_thinking_levels(&entry);
990                    let current = guard
991                        .agent
992                        .stream_options()
993                        .thinking_level
994                        .unwrap_or_default();
995                    let current_index = levels
996                        .iter()
997                        .position(|level| *level == current)
998                        .unwrap_or(0);
999                    let next = levels[(current_index + 1) % levels.len()];
1000                    if let Err(err) = apply_thinking_level(&mut guard, next).await {
1001                        let _ = out_tx.send(response_error_with_hints(
1002                            id.clone(),
1003                            "cycle_thinking_level",
1004                            &err,
1005                        ));
1006                        continue;
1007                    }
1008                    next
1009                };
1010                let _ = out_tx.send(response_ok(
1011                    id,
1012                    "cycle_thinking_level",
1013                    Some(json!({ "level": next.to_string() })),
1014                ));
1015            }
1016
1017            "set_steering_mode" => {
1018                let Some(mode) = parsed.get("mode").and_then(Value::as_str) else {
1019                    let _ = out_tx.send(response_error(
1020                        id,
1021                        "set_steering_mode",
1022                        "Missing mode".to_string(),
1023                    ));
1024                    continue;
1025                };
1026                let Some(mode) = parse_queue_mode(Some(mode)) else {
1027                    let _ = out_tx.send(response_error(
1028                        id,
1029                        "set_steering_mode",
1030                        "Invalid steering mode".to_string(),
1031                    ));
1032                    continue;
1033                };
1034                let mut state = shared_state
1035                    .lock(&cx)
1036                    .await
1037                    .map_err(|err| Error::session(format!("state lock failed: {err}")))?;
1038                state.steering_mode = mode;
1039                drop(state);
1040                let _ = out_tx.send(response_ok(id, "set_steering_mode", None));
1041            }
1042
1043            "set_follow_up_mode" => {
1044                let Some(mode) = parsed.get("mode").and_then(Value::as_str) else {
1045                    let _ = out_tx.send(response_error(
1046                        id,
1047                        "set_follow_up_mode",
1048                        "Missing mode".to_string(),
1049                    ));
1050                    continue;
1051                };
1052                let Some(mode) = parse_queue_mode(Some(mode)) else {
1053                    let _ = out_tx.send(response_error(
1054                        id,
1055                        "set_follow_up_mode",
1056                        "Invalid follow-up mode".to_string(),
1057                    ));
1058                    continue;
1059                };
1060                let mut state = shared_state
1061                    .lock(&cx)
1062                    .await
1063                    .map_err(|err| Error::session(format!("state lock failed: {err}")))?;
1064                state.follow_up_mode = mode;
1065                drop(state);
1066                let _ = out_tx.send(response_ok(id, "set_follow_up_mode", None));
1067            }
1068
1069            "set_auto_compaction" => {
1070                let Some(enabled) = parsed.get("enabled").and_then(Value::as_bool) else {
1071                    let _ = out_tx.send(response_error(
1072                        id,
1073                        "set_auto_compaction",
1074                        "Missing enabled".to_string(),
1075                    ));
1076                    continue;
1077                };
1078                let mut state = shared_state
1079                    .lock(&cx)
1080                    .await
1081                    .map_err(|err| Error::session(format!("state lock failed: {err}")))?;
1082                state.auto_compaction_enabled = enabled;
1083                drop(state);
1084                let _ = out_tx.send(response_ok(id, "set_auto_compaction", None));
1085            }
1086
1087            "set_auto_retry" => {
1088                let Some(enabled) = parsed.get("enabled").and_then(Value::as_bool) else {
1089                    let _ = out_tx.send(response_error(
1090                        id,
1091                        "set_auto_retry",
1092                        "Missing enabled".to_string(),
1093                    ));
1094                    continue;
1095                };
1096                let mut state = shared_state
1097                    .lock(&cx)
1098                    .await
1099                    .map_err(|err| Error::session(format!("state lock failed: {err}")))?;
1100                state.auto_retry_enabled = enabled;
1101                drop(state);
1102                let _ = out_tx.send(response_ok(id, "set_auto_retry", None));
1103            }
1104
1105            "abort_retry" => {
1106                retry_abort.store(true, Ordering::SeqCst);
1107                let _ = out_tx.send(response_ok(id, "abort_retry", None));
1108            }
1109
1110            "set_session_name" => {
1111                let Some(name) = parsed.get("name").and_then(Value::as_str) else {
1112                    let _ = out_tx.send(response_error(
1113                        id,
1114                        "set_session_name",
1115                        "Missing name".to_string(),
1116                    ));
1117                    continue;
1118                };
1119                let result: Result<()> = async {
1120                    let mut guard = session
1121                        .lock(&cx)
1122                        .await
1123                        .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
1124                    {
1125                        let mut inner_session = guard.session.lock(&cx).await.map_err(|err| {
1126                            Error::session(format!("inner session lock failed: {err}"))
1127                        })?;
1128                        inner_session.append_session_info(Some(name.to_string()));
1129                    }
1130                    guard.persist_session().await?;
1131                    Ok(())
1132                }
1133                .await;
1134
1135                match result {
1136                    Ok(()) => {
1137                        let _ = out_tx.send(response_ok(id, "set_session_name", None));
1138                    }
1139                    Err(err) => {
1140                        let _ =
1141                            out_tx.send(response_error_with_hints(id, "set_session_name", &err));
1142                    }
1143                }
1144            }
1145
1146            "get_last_assistant_text" => {
1147                let text = {
1148                    let inner_session = session_handle.lock(&cx).await.map_err(|err| {
1149                        Error::session(format!("inner session lock failed: {err}"))
1150                    })?;
1151                    last_assistant_text(&inner_session)
1152                };
1153                let _ = out_tx.send(response_ok(
1154                    id,
1155                    "get_last_assistant_text",
1156                    Some(json!({ "text": text })),
1157                ));
1158            }
1159
1160            "export_html" => {
1161                let output_path = parsed
1162                    .get("outputPath")
1163                    .and_then(Value::as_str)
1164                    .map(str::to_string);
1165                // Capture a lightweight snapshot under lock, then release immediately.
1166                // This avoids cloning the full Session (caches, autosave queue, etc.)
1167                // and allows the HTML rendering + file I/O to proceed without holding
1168                // any session lock.
1169                let snapshot = {
1170                    let guard = session
1171                        .lock(&cx)
1172                        .await
1173                        .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
1174                    let inner = guard.session.lock(&cx).await.map_err(|err| {
1175                        Error::session(format!("inner session lock failed: {err}"))
1176                    })?;
1177                    inner.export_snapshot()
1178                };
1179                match export_html_snapshot(&snapshot, output_path.as_deref()).await {
1180                    Ok(path) => {
1181                        let _ = out_tx.send(response_ok(
1182                            id,
1183                            "export_html",
1184                            Some(json!({ "path": path })),
1185                        ));
1186                    }
1187                    Err(err) => {
1188                        let _ = out_tx.send(response_error_with_hints(id, "export_html", &err));
1189                    }
1190                }
1191            }
1192
1193            "bash" => {
1194                let Some(command) = parsed.get("command").and_then(Value::as_str) else {
1195                    let _ = out_tx.send(response_error(id, "bash", "Missing command".to_string()));
1196                    continue;
1197                };
1198
1199                let mut running = bash_state
1200                    .lock(&cx)
1201                    .await
1202                    .map_err(|err| Error::session(format!("bash state lock failed: {err}")))?;
1203                if running.is_some() {
1204                    let _ = out_tx.send(response_error(
1205                        id,
1206                        "bash",
1207                        "Bash command already running".to_string(),
1208                    ));
1209                    continue;
1210                }
1211
1212                let run_id = uuid::Uuid::new_v4().to_string();
1213                let (abort_tx, abort_rx) = oneshot::channel();
1214                *running = Some(RunningBash {
1215                    id: run_id.clone(),
1216                    abort_tx,
1217                });
1218
1219                let out_tx = out_tx.clone();
1220                let session = Arc::clone(&session);
1221                let bash_state = Arc::clone(&bash_state);
1222                let command = command.to_string();
1223                let id_clone = id.clone();
1224                let runtime_handle = options.runtime_handle.clone();
1225
1226                runtime_handle.spawn(async move {
1227                    let cx = AgentCx::for_request();
1228                    let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
1229                    let result = run_bash_rpc(&cwd, &command, abort_rx).await;
1230
1231                    let response = match result {
1232                        Ok(result) => {
1233                            if let Ok(mut guard) = session.lock(&cx).await {
1234                                if let Ok(mut inner_session) = guard.session.lock(&cx).await {
1235                                    inner_session.append_message(SessionMessage::BashExecution {
1236                                        command: command.clone(),
1237                                        output: result.output.clone(),
1238                                        exit_code: result.exit_code,
1239                                        cancelled: Some(result.cancelled),
1240                                        truncated: Some(result.truncated),
1241                                        full_output_path: result.full_output_path.clone(),
1242                                        timestamp: Some(chrono::Utc::now().timestamp_millis()),
1243                                        extra: std::collections::HashMap::default(),
1244                                    });
1245                                }
1246                                let _ = guard.persist_session().await;
1247                            }
1248
1249                            response_ok(
1250                                id_clone,
1251                                "bash",
1252                                Some(json!({
1253                                    "output": result.output,
1254                                    "exitCode": result.exit_code,
1255                                    "cancelled": result.cancelled,
1256                                    "truncated": result.truncated,
1257                                    "fullOutputPath": result.full_output_path,
1258                                })),
1259                            )
1260                        }
1261                        Err(err) => response_error_with_hints(id_clone, "bash", &err),
1262                    };
1263
1264                    let _ = out_tx.send(response);
1265                    if let Ok(mut running) = bash_state.lock(&cx).await {
1266                        if running.as_ref().is_some_and(|r| r.id == run_id) {
1267                            *running = None;
1268                        }
1269                    }
1270                });
1271            }
1272
1273            "abort_bash" => {
1274                let mut running = bash_state
1275                    .lock(&cx)
1276                    .await
1277                    .map_err(|err| Error::session(format!("bash state lock failed: {err}")))?;
1278                if let Some(running_bash) = running.take() {
1279                    let _ = running_bash.abort_tx.send(&cx, ());
1280                }
1281                let _ = out_tx.send(response_ok(id, "abort_bash", None));
1282            }
1283
1284            "compact" => {
1285                let custom_instructions = parsed
1286                    .get("customInstructions")
1287                    .and_then(Value::as_str)
1288                    .map(str::to_string);
1289
1290                let result: Result<Value> = async {
1291                    let mut guard = session
1292                        .lock(&cx)
1293                        .await
1294                        .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
1295                    let path_entries = {
1296                        let mut inner_session = guard.session.lock(&cx).await.map_err(|err| {
1297                            Error::session(format!("inner session lock failed: {err}"))
1298                        })?;
1299                        inner_session.ensure_entry_ids();
1300                        inner_session
1301                            .entries_for_current_path()
1302                            .into_iter()
1303                            .cloned()
1304                            .collect::<Vec<_>>()
1305                    };
1306
1307                    let key = guard
1308                        .agent
1309                        .stream_options()
1310                        .api_key
1311                        .as_deref()
1312                        .ok_or_else(|| Error::auth("Missing API key for compaction"))?;
1313
1314                    let provider = guard.agent.provider();
1315
1316                    let settings = ResolvedCompactionSettings {
1317                        enabled: options.config.compaction_enabled(),
1318                        reserve_tokens: options.config.compaction_reserve_tokens(),
1319                        keep_recent_tokens: options.config.compaction_keep_recent_tokens(),
1320                        ..Default::default()
1321                    };
1322
1323                    let prep = prepare_compaction(&path_entries, settings).ok_or_else(|| {
1324                        Error::session(
1325                            "Compaction not available (already compacted or missing IDs)",
1326                        )
1327                    })?;
1328
1329                    is_compacting.store(true, Ordering::SeqCst);
1330                    let compact_res =
1331                        compact(prep, provider, key, custom_instructions.as_deref()).await;
1332                    is_compacting.store(false, Ordering::SeqCst);
1333                    let result_data = compact_res?;
1334
1335                    let details_value = compaction_details_to_value(&result_data.details)?;
1336
1337                    let messages = {
1338                        let mut inner_session = guard.session.lock(&cx).await.map_err(|err| {
1339                            Error::session(format!("inner session lock failed: {err}"))
1340                        })?;
1341                        inner_session.append_compaction(
1342                            result_data.summary.clone(),
1343                            result_data.first_kept_entry_id.clone(),
1344                            result_data.tokens_before,
1345                            Some(details_value.clone()),
1346                            None,
1347                        );
1348                        inner_session.to_messages_for_current_path()
1349                    };
1350                    guard.persist_session().await?;
1351                    guard.agent.replace_messages(messages);
1352
1353                    Ok(json!({
1354                        "summary": result_data.summary,
1355                        "firstKeptEntryId": result_data.first_kept_entry_id,
1356                        "tokensBefore": result_data.tokens_before,
1357                        "details": details_value,
1358                    }))
1359                }
1360                .await;
1361
1362                match result {
1363                    Ok(data) => {
1364                        let _ = out_tx.send(response_ok(id, "compact", Some(data)));
1365                    }
1366                    Err(err) => {
1367                        let _ = out_tx.send(response_error_with_hints(id, "compact", &err));
1368                    }
1369                }
1370            }
1371
1372            "new_session" => {
1373                let parent = parsed
1374                    .get("parentSession")
1375                    .and_then(Value::as_str)
1376                    .map(str::to_string);
1377                {
1378                    let mut guard = session
1379                        .lock(&cx)
1380                        .await
1381                        .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
1382                    let (session_dir, provider, model_id, thinking_level) = {
1383                        let inner_session = guard.session.lock(&cx).await.map_err(|err| {
1384                            Error::session(format!("inner session lock failed: {err}"))
1385                        })?;
1386                        (
1387                            inner_session.session_dir.clone(),
1388                            inner_session.header.provider.clone(),
1389                            inner_session.header.model_id.clone(),
1390                            inner_session.header.thinking_level.clone(),
1391                        )
1392                    };
1393                    let mut new_session = if guard.save_enabled() {
1394                        crate::session::Session::create_with_dir(session_dir)
1395                    } else {
1396                        crate::session::Session::in_memory()
1397                    };
1398                    new_session.header.parent_session = parent;
1399                    // Keep model fields in header for clients.
1400                    new_session.header.provider.clone_from(&provider);
1401                    new_session.header.model_id.clone_from(&model_id);
1402                    new_session
1403                        .header
1404                        .thinking_level
1405                        .clone_from(&thinking_level);
1406
1407                    let session_id = new_session.header.id.clone();
1408                    {
1409                        let mut inner_session = guard.session.lock(&cx).await.map_err(|err| {
1410                            Error::session(format!("inner session lock failed: {err}"))
1411                        })?;
1412                        *inner_session = new_session;
1413                    }
1414                    guard.agent.clear_messages();
1415                    guard.agent.stream_options_mut().session_id = Some(session_id);
1416                }
1417                {
1418                    let mut state = shared_state
1419                        .lock(&cx)
1420                        .await
1421                        .map_err(|err| Error::session(format!("state lock failed: {err}")))?;
1422                    state.steering.clear();
1423                    state.follow_up.clear();
1424                }
1425                let _ = out_tx.send(response_ok(
1426                    id,
1427                    "new_session",
1428                    Some(json!({ "cancelled": false })),
1429                ));
1430            }
1431
1432            "switch_session" => {
1433                let Some(session_path) = parsed.get("sessionPath").and_then(Value::as_str) else {
1434                    let _ = out_tx.send(response_error(
1435                        id,
1436                        "switch_session",
1437                        "Missing sessionPath".to_string(),
1438                    ));
1439                    continue;
1440                };
1441
1442                let loaded = crate::session::Session::open(session_path).await;
1443                match loaded {
1444                    Ok(new_session) => {
1445                        let messages = new_session.to_messages_for_current_path();
1446                        let session_id = new_session.header.id.clone();
1447                        let mut guard = session
1448                            .lock(&cx)
1449                            .await
1450                            .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
1451                        {
1452                            let mut inner_session =
1453                                guard.session.lock(&cx).await.map_err(|err| {
1454                                    Error::session(format!("inner session lock failed: {err}"))
1455                                })?;
1456                            *inner_session = new_session;
1457                        }
1458                        guard.agent.replace_messages(messages);
1459                        guard.agent.stream_options_mut().session_id = Some(session_id);
1460                        let _ = out_tx.send(response_ok(
1461                            id,
1462                            "switch_session",
1463                            Some(json!({ "cancelled": false })),
1464                        ));
1465                        let mut state = shared_state
1466                            .lock(&cx)
1467                            .await
1468                            .map_err(|err| Error::session(format!("state lock failed: {err}")))?;
1469                        state.steering.clear();
1470                        state.follow_up.clear();
1471                    }
1472                    Err(err) => {
1473                        let _ = out_tx.send(response_error_with_hints(id, "switch_session", &err));
1474                    }
1475                }
1476            }
1477
1478            "fork" => {
1479                let Some(entry_id) = parsed.get("entryId").and_then(Value::as_str) else {
1480                    let _ = out_tx.send(response_error(id, "fork", "Missing entryId".to_string()));
1481                    continue;
1482                };
1483
1484                let result: Result<String> =
1485                    async {
1486                        // Phase 1: Snapshot — brief lock to compute ForkPlan + extract metadata.
1487                        let (fork_plan, parent_path, session_dir, save_enabled, header_snapshot) = {
1488                            let guard = session.lock(&cx).await.map_err(|err| {
1489                                Error::session(format!("session lock failed: {err}"))
1490                            })?;
1491                            let inner = guard.session.lock(&cx).await.map_err(|err| {
1492                                Error::session(format!("inner session lock failed: {err}"))
1493                            })?;
1494                            let plan = inner.plan_fork_from_user_message(entry_id)?;
1495                            let parent_path = inner.path.as_ref().map(|p| p.display().to_string());
1496                            let session_dir = inner.session_dir.clone();
1497                            let header = inner.header.clone();
1498                            (plan, parent_path, session_dir, guard.save_enabled(), header)
1499                            // Both locks released here.
1500                        };
1501
1502                        // Phase 2: Build new session without holding any lock.
1503                        let crate::session::ForkPlan {
1504                            entries,
1505                            leaf_id,
1506                            selected_text,
1507                        } = fork_plan;
1508
1509                        let mut new_session = if save_enabled {
1510                            crate::session::Session::create_with_dir(session_dir)
1511                        } else {
1512                            crate::session::Session::in_memory()
1513                        };
1514                        new_session.header.parent_session = parent_path;
1515                        new_session
1516                            .header
1517                            .provider
1518                            .clone_from(&header_snapshot.provider);
1519                        new_session
1520                            .header
1521                            .model_id
1522                            .clone_from(&header_snapshot.model_id);
1523                        new_session
1524                            .header
1525                            .thinking_level
1526                            .clone_from(&header_snapshot.thinking_level);
1527                        new_session.entries = entries;
1528                        new_session.leaf_id = leaf_id;
1529                        new_session.ensure_entry_ids();
1530
1531                        let messages = new_session.to_messages_for_current_path();
1532                        let session_id = new_session.header.id.clone();
1533
1534                        // Phase 3: Swap — brief lock to install the new session.
1535                        {
1536                            let mut guard = session.lock(&cx).await.map_err(|err| {
1537                                Error::session(format!("session lock failed: {err}"))
1538                            })?;
1539                            let mut inner = guard.session.lock(&cx).await.map_err(|err| {
1540                                Error::session(format!("inner session lock failed: {err}"))
1541                            })?;
1542                            *inner = new_session;
1543                            drop(inner);
1544                            guard.agent.replace_messages(messages);
1545                            guard.agent.stream_options_mut().session_id = Some(session_id);
1546                        }
1547
1548                        {
1549                            let mut state = shared_state.lock(&cx).await.map_err(|err| {
1550                                Error::session(format!("state lock failed: {err}"))
1551                            })?;
1552                            state.steering.clear();
1553                            state.follow_up.clear();
1554                        }
1555
1556                        Ok(selected_text)
1557                    }
1558                    .await;
1559
1560                match result {
1561                    Ok(selected_text) => {
1562                        let _ = out_tx.send(response_ok(
1563                            id,
1564                            "fork",
1565                            Some(json!({ "text": selected_text, "cancelled": false })),
1566                        ));
1567                    }
1568                    Err(err) => {
1569                        let _ = out_tx.send(response_error_with_hints(id, "fork", &err));
1570                    }
1571                }
1572            }
1573
1574            "get_fork_messages" => {
1575                // Snapshot entries under brief lock, compute messages outside.
1576                let path_entries = {
1577                    let guard = session
1578                        .lock(&cx)
1579                        .await
1580                        .map_err(|err| Error::session(format!("session lock failed: {err}")))?;
1581                    let inner_session = guard.session.lock(&cx).await.map_err(|err| {
1582                        Error::session(format!("inner session lock failed: {err}"))
1583                    })?;
1584                    inner_session
1585                        .entries_for_current_path()
1586                        .into_iter()
1587                        .cloned()
1588                        .collect::<Vec<_>>()
1589                };
1590                let messages = fork_messages_from_entries(&path_entries);
1591                let _ = out_tx.send(response_ok(
1592                    id,
1593                    "get_fork_messages",
1594                    Some(json!({ "messages": messages })),
1595                ));
1596            }
1597
1598            "get_commands" => {
1599                let commands = options.resources.list_commands();
1600                let _ = out_tx.send(response_ok(
1601                    id,
1602                    "get_commands",
1603                    Some(json!({ "commands": commands })),
1604                ));
1605            }
1606
1607            "extension_ui_response" => {
1608                if let (Some(manager), Some(ui_state)) =
1609                    (rpc_extension_manager.as_ref(), rpc_ui_state.as_ref())
1610                {
1611                    let Some(request_id) = rpc_parse_extension_ui_response_id(&parsed) else {
1612                        let _ = out_tx.send(response_error(
1613                            id,
1614                            "extension_ui_response",
1615                            "Missing requestId (or id) field",
1616                        ));
1617                        continue;
1618                    };
1619
1620                    let (response, next_request) = {
1621                        let Ok(mut guard) = ui_state.lock(&cx).await else {
1622                            let _ = out_tx.send(response_error(
1623                                id,
1624                                "extension_ui_response",
1625                                "Extension UI bridge unavailable",
1626                            ));
1627                            continue;
1628                        };
1629
1630                        let Some(active) = guard.active.clone() else {
1631                            let _ = out_tx.send(response_error(
1632                                id,
1633                                "extension_ui_response",
1634                                "No active extension UI request",
1635                            ));
1636                            continue;
1637                        };
1638
1639                        if active.id != request_id {
1640                            let _ = out_tx.send(response_error(
1641                                id,
1642                                "extension_ui_response",
1643                                format!(
1644                                    "Unexpected requestId: {request_id} (active: {})",
1645                                    active.id
1646                                ),
1647                            ));
1648                            continue;
1649                        }
1650
1651                        let response = match rpc_parse_extension_ui_response(&parsed, &active) {
1652                            Ok(response) => response,
1653                            Err(message) => {
1654                                let _ = out_tx.send(response_error(
1655                                    id,
1656                                    "extension_ui_response",
1657                                    message,
1658                                ));
1659                                continue;
1660                            }
1661                        };
1662
1663                        guard.active = None;
1664                        let next = guard.queue.pop_front();
1665                        if let Some(ref next) = next {
1666                            guard.active = Some(next.clone());
1667                        }
1668                        (response, next)
1669                    };
1670
1671                    let resolved = manager.respond_ui(response);
1672                    let _ = out_tx.send(response_ok(
1673                        id,
1674                        "extension_ui_response",
1675                        Some(json!({ "resolved": resolved })),
1676                    ));
1677
1678                    if let Some(next) = next_request {
1679                        rpc_emit_extension_ui_request(
1680                            &options.runtime_handle,
1681                            Arc::clone(ui_state),
1682                            (*manager).clone(),
1683                            out_tx.clone(),
1684                            next,
1685                        );
1686                    }
1687                } else {
1688                    let _ = out_tx.send(response_ok(id, "extension_ui_response", None));
1689                }
1690            }
1691
1692            _ => {
1693                let _ = out_tx.send(response_error(
1694                    id,
1695                    command_type_raw,
1696                    format!("Unknown command: {command_type_raw}"),
1697                ));
1698            }
1699        }
1700    }
1701
1702    // Explicitly shut down extension runtimes before the session drops.
1703    // Move the region out under lock, then await shutdown after releasing
1704    // the lock so we don't hold the session mutex across an async wait.
1705    let extension_region = session
1706        .lock(&cx)
1707        .await
1708        .ok()
1709        .and_then(|mut guard| guard.extensions.take());
1710    if let Some(ext) = extension_region {
1711        ext.shutdown().await;
1712    }
1713
1714    Ok(())
1715}
1716
1717// =============================================================================
1718// Prompt Execution
1719// =============================================================================
1720
1721#[allow(clippy::too_many_lines)]
1722async fn run_prompt_with_retry(
1723    session: Arc<Mutex<AgentSession>>,
1724    shared_state: Arc<Mutex<RpcSharedState>>,
1725    is_streaming: Arc<AtomicBool>,
1726    is_compacting: Arc<AtomicBool>,
1727    abort_handle_slot: Arc<Mutex<Option<AbortHandle>>>,
1728    out_tx: std::sync::mpsc::Sender<String>,
1729    retry_abort: Arc<AtomicBool>,
1730    options: RpcOptions,
1731    message: String,
1732    images: Vec<ImageContent>,
1733    cx: AgentCx,
1734) {
1735    retry_abort.store(false, Ordering::SeqCst);
1736    is_streaming.store(true, Ordering::SeqCst);
1737
1738    let max_retries = options.config.retry_max_retries();
1739    let mut retry_count: u32 = 0;
1740    let mut success = false;
1741    let mut final_error: Option<String> = None;
1742    let mut final_error_hints: Option<Value> = None;
1743
1744    loop {
1745        let (abort_handle, abort_signal) = AbortHandle::new();
1746        if let Ok(mut guard) = OwnedMutexGuard::lock(Arc::clone(&abort_handle_slot), &cx).await {
1747            *guard = Some(abort_handle);
1748        } else {
1749            is_streaming.store(false, Ordering::SeqCst);
1750            return;
1751        }
1752
1753        let runtime_for_events = options.runtime_handle.clone();
1754
1755        let result = {
1756            let mut guard = match OwnedMutexGuard::lock(Arc::clone(&session), &cx).await {
1757                Ok(guard) => guard,
1758                Err(err) => {
1759                    final_error = Some(format!("session lock failed: {err}"));
1760                    final_error_hints = None;
1761                    break;
1762                }
1763            };
1764            let extensions = guard.extensions.as_ref().map(|r| r.manager().clone());
1765            let runtime_for_events_handler = runtime_for_events.clone();
1766            let event_tx = out_tx.clone();
1767            let coalescer = extensions
1768                .as_ref()
1769                .map(|m| crate::extensions::EventCoalescer::new(m.clone()));
1770            let event_handler = move |event: AgentEvent| {
1771                let serialized = if let AgentEvent::AgentEnd {
1772                    messages, error, ..
1773                } = &event
1774                {
1775                    json!({
1776                        "type": "agent_end",
1777                        "messages": messages,
1778                        "error": error,
1779                    })
1780                    .to_string()
1781                } else {
1782                    serde_json::to_string(&event).unwrap_or_else(|err| {
1783                        json!({
1784                            "type": "event_serialize_error",
1785                            "error": err.to_string(),
1786                        })
1787                        .to_string()
1788                    })
1789                };
1790                let _ = event_tx.send(serialized);
1791                // Route non-lifecycle events through the coalescer for
1792                // batched/coalesced dispatch with lazy serialization.
1793                if let Some(coal) = &coalescer {
1794                    coal.dispatch_agent_event_lazy(&event, &runtime_for_events_handler);
1795                }
1796            };
1797
1798            if images.is_empty() {
1799                guard
1800                    .run_text_with_abort(message.clone(), Some(abort_signal), event_handler)
1801                    .await
1802            } else {
1803                let mut blocks = vec![ContentBlock::Text(TextContent::new(message.clone()))];
1804                for image in &images {
1805                    blocks.push(ContentBlock::Image(image.clone()));
1806                }
1807                guard
1808                    .run_with_content_with_abort(blocks, Some(abort_signal), event_handler)
1809                    .await
1810            }
1811        };
1812
1813        if let Ok(mut guard) = OwnedMutexGuard::lock(Arc::clone(&abort_handle_slot), &cx).await {
1814            *guard = None;
1815        }
1816
1817        match result {
1818            Ok(message) => {
1819                if matches!(message.stop_reason, StopReason::Error | StopReason::Aborted) {
1820                    final_error = message
1821                        .error_message
1822                        .clone()
1823                        .or_else(|| Some("Request error".to_string()));
1824                    final_error_hints = None;
1825                    if message.stop_reason == StopReason::Aborted {
1826                        break;
1827                    }
1828                    // Check if this error is retryable. Context overflow and
1829                    // auth failures should NOT be retried.
1830                    if let Some(ref err_msg) = final_error {
1831                        let context_window = if let Ok(guard) =
1832                            OwnedMutexGuard::lock(Arc::clone(&session), &cx).await
1833                        {
1834                            guard.session.lock(&cx).await.map_or(None, |inner| {
1835                                current_model_entry(&inner, &options)
1836                                    .map(|e| e.model.context_window)
1837                            })
1838                        } else {
1839                            None
1840                        };
1841                        if !crate::error::is_retryable_error(
1842                            err_msg,
1843                            Some(message.usage.input),
1844                            context_window,
1845                        ) {
1846                            break;
1847                        }
1848                    }
1849                } else {
1850                    success = true;
1851                    break;
1852                }
1853            }
1854            Err(err) => {
1855                let err_str = err.to_string();
1856                // No usage/context_window from an Err (no response received),
1857                // so pass None for both — text matching alone handles it.
1858                if !crate::error::is_retryable_error(&err_str, None, None) {
1859                    final_error = Some(err_str);
1860                    final_error_hints = Some(error_hints_value(&err));
1861                    break;
1862                }
1863                final_error = Some(err_str);
1864                final_error_hints = Some(error_hints_value(&err));
1865            }
1866        }
1867
1868        let retry_enabled = OwnedMutexGuard::lock(Arc::clone(&shared_state), &cx)
1869            .await
1870            .is_ok_and(|state| state.auto_retry_enabled);
1871        if !retry_enabled || retry_count >= max_retries {
1872            break;
1873        }
1874
1875        retry_count += 1;
1876        let delay_ms = retry_delay_ms(&options.config, retry_count);
1877        let error_message = final_error
1878            .clone()
1879            .unwrap_or_else(|| "Request error".to_string());
1880        let _ = out_tx.send(event(&json!({
1881            "type": "auto_retry_start",
1882            "attempt": retry_count,
1883            "maxAttempts": max_retries,
1884            "delayMs": delay_ms,
1885            "errorMessage": error_message,
1886        })));
1887
1888        let delay = Duration::from_millis(delay_ms as u64);
1889        let start = std::time::Instant::now();
1890        while start.elapsed() < delay {
1891            if retry_abort.load(Ordering::SeqCst) {
1892                break;
1893            }
1894            sleep(wall_now(), Duration::from_millis(50)).await;
1895        }
1896
1897        if retry_abort.load(Ordering::SeqCst) {
1898            final_error = Some("Retry aborted".to_string());
1899            break;
1900        }
1901    }
1902
1903    if retry_count > 0 {
1904        let _ = out_tx.send(event(&json!({
1905            "type": "auto_retry_end",
1906            "success": success,
1907            "attempt": retry_count,
1908            "finalError": if success { Value::Null } else { json!(final_error.clone()) },
1909        })));
1910    }
1911
1912    is_streaming.store(false, Ordering::SeqCst);
1913
1914    if !success {
1915        if let Some(err) = final_error {
1916            let mut payload = json!({
1917                "type": "agent_end",
1918                "messages": [],
1919                "error": err
1920            });
1921            if let Some(hints) = final_error_hints {
1922                payload["errorHints"] = hints;
1923            }
1924            let _ = out_tx.send(event(&payload));
1925        }
1926        return;
1927    }
1928
1929    let auto_compaction_enabled = OwnedMutexGuard::lock(Arc::clone(&shared_state), &cx)
1930        .await
1931        .is_ok_and(|state| state.auto_compaction_enabled);
1932    if auto_compaction_enabled {
1933        maybe_auto_compact(session, options, is_compacting, out_tx).await;
1934    }
1935}
1936
1937// =============================================================================
1938// Helpers
1939// =============================================================================
1940
1941fn response_ok(id: Option<String>, command: &str, data: Option<Value>) -> String {
1942    let mut resp = json!({
1943        "type": "response",
1944        "command": command,
1945        "success": true,
1946    });
1947    if let Some(id) = id {
1948        resp["id"] = Value::String(id);
1949    }
1950    if let Some(data) = data {
1951        resp["data"] = data;
1952    }
1953    resp.to_string()
1954}
1955
1956fn response_error(id: Option<String>, command: &str, error: impl Into<String>) -> String {
1957    let mut resp = json!({
1958        "type": "response",
1959        "command": command,
1960        "success": false,
1961        "error": error.into(),
1962    });
1963    if let Some(id) = id {
1964        resp["id"] = Value::String(id);
1965    }
1966    resp.to_string()
1967}
1968
1969fn response_error_with_hints(id: Option<String>, command: &str, error: &Error) -> String {
1970    let mut resp = json!({
1971        "type": "response",
1972        "command": command,
1973        "success": false,
1974        "error": error.to_string(),
1975        "errorHints": error_hints_value(error),
1976    });
1977    if let Some(id) = id {
1978        resp["id"] = Value::String(id);
1979    }
1980    resp.to_string()
1981}
1982
1983fn event(value: &Value) -> String {
1984    value.to_string()
1985}
1986
1987fn rpc_emit_extension_ui_request(
1988    runtime_handle: &RuntimeHandle,
1989    ui_state: Arc<Mutex<RpcUiBridgeState>>,
1990    manager: ExtensionManager,
1991    out_tx_ui: std::sync::mpsc::Sender<String>,
1992    request: ExtensionUiRequest,
1993) {
1994    // Emit the UI request as a JSON notification to the client.
1995    let rpc_event = request.to_rpc_event();
1996    let _ = out_tx_ui.send(event(&rpc_event));
1997
1998    if !request.expects_response() {
1999        return;
2000    }
2001
2002    // For dialog methods, enforce deterministic ordering (one active request at a time) by
2003    // auto-resolving timeouts as cancellation defaults (per bd-2hz.1).
2004    let Some(timeout_ms) = request.effective_timeout_ms() else {
2005        return;
2006    };
2007
2008    // Fire a little early so ExtensionManager::request_ui doesn't hit its own timeout first.
2009    let fire_ms = timeout_ms.saturating_sub(10).max(1);
2010    let request_id = request.id;
2011    let ui_state_timeout = Arc::clone(&ui_state);
2012    let manager_timeout = manager;
2013    let out_tx_timeout = out_tx_ui;
2014    let runtime_handle_inner = runtime_handle.clone();
2015
2016    runtime_handle.spawn(async move {
2017        sleep(wall_now(), Duration::from_millis(fire_ms)).await;
2018        let cx = AgentCx::for_request();
2019
2020        let next = {
2021            let Ok(mut guard) = ui_state_timeout.lock(cx.cx()).await else {
2022                return;
2023            };
2024
2025            let Some(active) = guard.active.as_ref() else {
2026                return;
2027            };
2028
2029            // No-op if the active request has already advanced.
2030            if active.id != request_id {
2031                return;
2032            }
2033
2034            // Resolve with cancellation defaults (downstream maps method -> default return value).
2035            let _ = manager_timeout.respond_ui(ExtensionUiResponse {
2036                id: request_id,
2037                value: None,
2038                cancelled: true,
2039            });
2040
2041            guard.active = None;
2042            let next = guard.queue.pop_front();
2043            if let Some(ref next) = next {
2044                guard.active = Some(next.clone());
2045            }
2046            next
2047        };
2048
2049        if let Some(next) = next {
2050            rpc_emit_extension_ui_request(
2051                &runtime_handle_inner,
2052                ui_state_timeout,
2053                manager_timeout,
2054                out_tx_timeout,
2055                next,
2056            );
2057        }
2058    });
2059}
2060
2061fn rpc_parse_extension_ui_response_id(parsed: &Value) -> Option<String> {
2062    let request_id = parsed
2063        .get("requestId")
2064        .and_then(Value::as_str)
2065        .map(str::trim)
2066        .filter(|value| !value.is_empty())
2067        .map(String::from);
2068
2069    request_id.or_else(|| {
2070        parsed
2071            .get("id")
2072            .and_then(Value::as_str)
2073            .map(str::trim)
2074            .filter(|value| !value.is_empty())
2075            .map(String::from)
2076    })
2077}
2078
2079fn rpc_parse_extension_ui_response(
2080    parsed: &Value,
2081    active: &ExtensionUiRequest,
2082) -> std::result::Result<ExtensionUiResponse, String> {
2083    let cancelled = parsed
2084        .get("cancelled")
2085        .and_then(Value::as_bool)
2086        .unwrap_or(false);
2087
2088    if cancelled {
2089        return Ok(ExtensionUiResponse {
2090            id: active.id.clone(),
2091            value: None,
2092            cancelled: true,
2093        });
2094    }
2095
2096    match active.method.as_str() {
2097        "confirm" => {
2098            let value = parsed
2099                .get("confirmed")
2100                .and_then(Value::as_bool)
2101                .or_else(|| parsed.get("value").and_then(Value::as_bool))
2102                .ok_or_else(|| "confirm requires boolean `confirmed` (or `value`)".to_string())?;
2103            Ok(ExtensionUiResponse {
2104                id: active.id.clone(),
2105                value: Some(Value::Bool(value)),
2106                cancelled: false,
2107            })
2108        }
2109        "select" => {
2110            let Some(value) = parsed.get("value") else {
2111                return Err("select requires `value` field".to_string());
2112            };
2113
2114            let options = active
2115                .payload
2116                .get("options")
2117                .and_then(Value::as_array)
2118                .ok_or_else(|| "select request missing `options` array".to_string())?;
2119
2120            let mut allowed = Vec::with_capacity(options.len());
2121            for opt in options {
2122                match opt {
2123                    Value::String(s) => allowed.push(Value::String(s.clone())),
2124                    Value::Object(map) => {
2125                        let label = map
2126                            .get("label")
2127                            .and_then(Value::as_str)
2128                            .unwrap_or("")
2129                            .trim();
2130                        if label.is_empty() {
2131                            continue;
2132                        }
2133                        if let Some(v) = map.get("value") {
2134                            allowed.push(v.clone());
2135                        } else {
2136                            allowed.push(Value::String(label.to_string()));
2137                        }
2138                    }
2139                    _ => {}
2140                }
2141            }
2142
2143            if !allowed.iter().any(|candidate| candidate == value) {
2144                return Err("select response value did not match any option".to_string());
2145            }
2146
2147            Ok(ExtensionUiResponse {
2148                id: active.id.clone(),
2149                value: Some(value.clone()),
2150                cancelled: false,
2151            })
2152        }
2153        "input" | "editor" => {
2154            let Some(value) = parsed.get("value") else {
2155                return Err(format!("{} requires `value` field", active.method));
2156            };
2157            if !value.is_string() {
2158                return Err(format!("{} requires string `value`", active.method));
2159            }
2160            Ok(ExtensionUiResponse {
2161                id: active.id.clone(),
2162                value: Some(value.clone()),
2163                cancelled: false,
2164            })
2165        }
2166        "notify" => Ok(ExtensionUiResponse {
2167            id: active.id.clone(),
2168            value: None,
2169            cancelled: false,
2170        }),
2171        other => Err(format!("Unsupported extension UI method: {other}")),
2172    }
2173}
2174
2175#[cfg(test)]
2176mod ui_bridge_tests {
2177    use super::*;
2178
2179    #[test]
2180    fn parse_extension_ui_response_id_prefers_request_id() {
2181        let value = json!({"type":"extension_ui_response","id":"legacy","requestId":"canonical"});
2182        assert_eq!(
2183            rpc_parse_extension_ui_response_id(&value),
2184            Some("canonical".to_string())
2185        );
2186    }
2187
2188    #[test]
2189    fn parse_extension_ui_response_id_accepts_id_alias() {
2190        let value = json!({"type":"extension_ui_response","id":"legacy"});
2191        assert_eq!(
2192            rpc_parse_extension_ui_response_id(&value),
2193            Some("legacy".to_string())
2194        );
2195    }
2196
2197    #[test]
2198    fn parse_confirm_response_accepts_confirmed_alias() {
2199        let active = ExtensionUiRequest::new("req-1", "confirm", json!({"title":"t"}));
2200        let value = json!({"type":"extension_ui_response","requestId":"req-1","confirmed":true});
2201        let resp = rpc_parse_extension_ui_response(&value, &active).expect("parse confirm");
2202        assert!(!resp.cancelled);
2203        assert_eq!(resp.value, Some(json!(true)));
2204    }
2205
2206    #[test]
2207    fn parse_confirm_response_accepts_value_bool() {
2208        let active = ExtensionUiRequest::new("req-1", "confirm", json!({"title":"t"}));
2209        let value = json!({"type":"extension_ui_response","requestId":"req-1","value":false});
2210        let resp = rpc_parse_extension_ui_response(&value, &active).expect("parse confirm");
2211        assert!(!resp.cancelled);
2212        assert_eq!(resp.value, Some(json!(false)));
2213    }
2214
2215    #[test]
2216    fn parse_cancelled_response_wins_over_value() {
2217        let active = ExtensionUiRequest::new("req-1", "confirm", json!({"title":"t"}));
2218        let value = json!({"type":"extension_ui_response","requestId":"req-1","cancelled":true,"value":true});
2219        let resp = rpc_parse_extension_ui_response(&value, &active).expect("parse cancel");
2220        assert!(resp.cancelled);
2221        assert_eq!(resp.value, None);
2222    }
2223
2224    #[test]
2225    fn parse_select_response_validates_against_options() {
2226        let active = ExtensionUiRequest::new(
2227            "req-1",
2228            "select",
2229            json!({"title":"pick","options":["A","B"]}),
2230        );
2231        let ok_value = json!({"type":"extension_ui_response","requestId":"req-1","value":"B"});
2232        let ok = rpc_parse_extension_ui_response(&ok_value, &active).expect("parse select ok");
2233        assert_eq!(ok.value, Some(json!("B")));
2234
2235        let bad_value = json!({"type":"extension_ui_response","requestId":"req-1","value":"C"});
2236        assert!(
2237            rpc_parse_extension_ui_response(&bad_value, &active).is_err(),
2238            "invalid selection should error"
2239        );
2240    }
2241
2242    #[test]
2243    fn parse_input_requires_string_value() {
2244        let active = ExtensionUiRequest::new("req-1", "input", json!({"title":"t"}));
2245        let ok_value = json!({"type":"extension_ui_response","requestId":"req-1","value":"hi"});
2246        let ok = rpc_parse_extension_ui_response(&ok_value, &active).expect("parse input ok");
2247        assert_eq!(ok.value, Some(json!("hi")));
2248
2249        let bad_value = json!({"type":"extension_ui_response","requestId":"req-1","value":123});
2250        assert!(
2251            rpc_parse_extension_ui_response(&bad_value, &active).is_err(),
2252            "non-string input should error"
2253        );
2254    }
2255
2256    #[test]
2257    fn parse_editor_requires_string_value() {
2258        let active = ExtensionUiRequest::new("req-1", "editor", json!({"title":"t"}));
2259        let ok = json!({"requestId":"req-1","value":"multi\nline"});
2260        let resp = rpc_parse_extension_ui_response(&ok, &active).expect("editor ok");
2261        assert_eq!(resp.value, Some(json!("multi\nline")));
2262
2263        let bad = json!({"requestId":"req-1","value":42});
2264        assert!(
2265            rpc_parse_extension_ui_response(&bad, &active).is_err(),
2266            "editor needs string"
2267        );
2268    }
2269
2270    #[test]
2271    fn parse_notify_returns_no_value() {
2272        let active = ExtensionUiRequest::new("req-1", "notify", json!({"title":"t"}));
2273        let val = json!({"requestId":"req-1"});
2274        let resp = rpc_parse_extension_ui_response(&val, &active).expect("notify ok");
2275        assert!(!resp.cancelled);
2276        assert!(resp.value.is_none());
2277    }
2278
2279    #[test]
2280    fn parse_unsupported_method_errors() {
2281        let active = ExtensionUiRequest::new("req-1", "custom_method", json!({}));
2282        let val = json!({"requestId":"req-1","value":"x"});
2283        let err = rpc_parse_extension_ui_response(&val, &active).unwrap_err();
2284        assert!(err.contains("Unsupported"), "err={err}");
2285    }
2286
2287    #[test]
2288    fn parse_select_missing_value_field() {
2289        let active =
2290            ExtensionUiRequest::new("req-1", "select", json!({"title":"pick","options":["A"]}));
2291        let val = json!({"requestId":"req-1"});
2292        let err = rpc_parse_extension_ui_response(&val, &active).unwrap_err();
2293        assert!(err.contains("value"), "err={err}");
2294    }
2295
2296    #[test]
2297    fn parse_confirm_missing_value_errors() {
2298        let active = ExtensionUiRequest::new("req-1", "confirm", json!({"title":"t"}));
2299        let val = json!({"requestId":"req-1"});
2300        let err = rpc_parse_extension_ui_response(&val, &active).unwrap_err();
2301        assert!(err.contains("confirm"), "err={err}");
2302    }
2303
2304    #[test]
2305    fn parse_select_with_label_value_objects() {
2306        let active = ExtensionUiRequest::new(
2307            "req-1",
2308            "select",
2309            json!({
2310                "title": "pick",
2311                "options": [
2312                    {"label": "Alpha", "value": "a"},
2313                    {"label": "Beta", "value": "b"},
2314                ]
2315            }),
2316        );
2317        let val = json!({"requestId":"req-1","value":"a"});
2318        let resp = rpc_parse_extension_ui_response(&val, &active).expect("select by value");
2319        assert_eq!(resp.value, Some(json!("a")));
2320    }
2321
2322    #[test]
2323    fn parse_id_rejects_empty_and_whitespace() {
2324        let val = json!({"requestId":"  ","id":""});
2325        assert!(rpc_parse_extension_ui_response_id(&val).is_none());
2326    }
2327
2328    #[test]
2329    fn bridge_state_default_is_empty() {
2330        let state = RpcUiBridgeState::default();
2331        assert!(state.active.is_none());
2332        assert!(state.queue.is_empty());
2333    }
2334}
2335
2336fn error_hints_value(error: &Error) -> Value {
2337    let hint = error_hints::hints_for_error(error);
2338    json!({
2339        "summary": hint.summary,
2340        "hints": hint.hints,
2341        "contextFields": hint.context_fields,
2342    })
2343}
2344
2345fn rpc_session_message_value(message: SessionMessage) -> Value {
2346    let mut value =
2347        serde_json::to_value(message).expect("SessionMessage should always serialize to JSON");
2348    rpc_flatten_content_blocks(&mut value);
2349    value
2350}
2351
2352fn rpc_flatten_content_blocks(value: &mut Value) {
2353    let Value::Object(message_obj) = value else {
2354        return;
2355    };
2356    let Some(content) = message_obj.get_mut("content") else {
2357        return;
2358    };
2359    let Value::Array(blocks) = content else {
2360        return;
2361    };
2362
2363    for block in blocks {
2364        let Value::Object(block_obj) = block else {
2365            continue;
2366        };
2367        let Some(inner) = block_obj.remove("0") else {
2368            continue;
2369        };
2370        let Value::Object(inner_obj) = inner else {
2371            block_obj.insert("0".to_string(), inner);
2372            continue;
2373        };
2374        for (key, value) in inner_obj {
2375            block_obj.entry(key).or_insert(value);
2376        }
2377    }
2378}
2379
2380fn retry_delay_ms(config: &Config, attempt: u32) -> u32 {
2381    let base = u64::from(config.retry_base_delay_ms());
2382    let max = u64::from(config.retry_max_delay_ms());
2383    let shift = attempt.saturating_sub(1);
2384    let multiplier = 1u64.checked_shl(shift).unwrap_or(u64::MAX);
2385    let delay = base.saturating_mul(multiplier).min(max);
2386    u32::try_from(delay).unwrap_or(u32::MAX)
2387}
2388
2389#[cfg(test)]
2390mod retry_tests {
2391    use super::*;
2392    use crate::agent::{Agent, AgentConfig, AgentSession};
2393    use crate::model::{AssistantMessage, Usage};
2394    use crate::provider::Provider;
2395    use crate::resources::ResourceLoader;
2396    use crate::session::Session;
2397    use crate::tools::ToolRegistry;
2398    use async_trait::async_trait;
2399    use futures::stream;
2400    use std::path::Path;
2401    use std::pin::Pin;
2402    use std::sync::atomic::{AtomicUsize, Ordering};
2403
2404    #[derive(Debug)]
2405    struct FlakyProvider {
2406        calls: AtomicUsize,
2407    }
2408
2409    impl FlakyProvider {
2410        const fn new() -> Self {
2411            Self {
2412                calls: AtomicUsize::new(0),
2413            }
2414        }
2415    }
2416
2417    #[async_trait]
2418    #[allow(clippy::unnecessary_literal_bound)]
2419    impl Provider for FlakyProvider {
2420        fn name(&self) -> &str {
2421            "test-provider"
2422        }
2423
2424        fn api(&self) -> &str {
2425            "test-api"
2426        }
2427
2428        fn model_id(&self) -> &str {
2429            "test-model"
2430        }
2431
2432        async fn stream(
2433            &self,
2434            _context: &crate::provider::Context<'_>,
2435            _options: &crate::provider::StreamOptions,
2436        ) -> crate::error::Result<
2437            Pin<
2438                Box<
2439                    dyn futures::Stream<Item = crate::error::Result<crate::model::StreamEvent>>
2440                        + Send,
2441                >,
2442            >,
2443        > {
2444            let call = self.calls.fetch_add(1, Ordering::SeqCst);
2445
2446            let mut partial = AssistantMessage {
2447                content: Vec::new(),
2448                api: self.api().to_string(),
2449                provider: self.name().to_string(),
2450                model: self.model_id().to_string(),
2451                usage: Usage::default(),
2452                stop_reason: StopReason::Stop,
2453                error_message: None,
2454                timestamp: 0,
2455            };
2456
2457            let events = if call == 0 {
2458                // First call fails with an explicit error event.
2459                partial.stop_reason = StopReason::Error;
2460                partial.error_message = Some("server error".to_string());
2461                vec![
2462                    Ok(crate::model::StreamEvent::Start {
2463                        partial: partial.clone(),
2464                    }),
2465                    Ok(crate::model::StreamEvent::Error {
2466                        reason: StopReason::Error,
2467                        error: partial,
2468                    }),
2469                ]
2470            } else {
2471                // Second call succeeds.
2472                vec![
2473                    Ok(crate::model::StreamEvent::Start {
2474                        partial: partial.clone(),
2475                    }),
2476                    Ok(crate::model::StreamEvent::Done {
2477                        reason: StopReason::Stop,
2478                        message: partial,
2479                    }),
2480                ]
2481            };
2482
2483            Ok(Box::pin(stream::iter(events)))
2484        }
2485    }
2486
2487    #[derive(Debug)]
2488    struct AlwaysErrorProvider;
2489
2490    #[async_trait]
2491    #[allow(clippy::unnecessary_literal_bound)]
2492    impl Provider for AlwaysErrorProvider {
2493        fn name(&self) -> &str {
2494            "test-provider"
2495        }
2496
2497        fn api(&self) -> &str {
2498            "test-api"
2499        }
2500
2501        fn model_id(&self) -> &str {
2502            "test-model"
2503        }
2504
2505        async fn stream(
2506            &self,
2507            _context: &crate::provider::Context<'_>,
2508            _options: &crate::provider::StreamOptions,
2509        ) -> crate::error::Result<
2510            Pin<
2511                Box<
2512                    dyn futures::Stream<Item = crate::error::Result<crate::model::StreamEvent>>
2513                        + Send,
2514                >,
2515            >,
2516        > {
2517            let mut partial = AssistantMessage {
2518                content: Vec::new(),
2519                api: self.api().to_string(),
2520                provider: self.name().to_string(),
2521                model: self.model_id().to_string(),
2522                usage: Usage::default(),
2523                stop_reason: StopReason::Error,
2524                error_message: Some("server error".to_string()),
2525                timestamp: 0,
2526            };
2527
2528            let events = vec![
2529                Ok(crate::model::StreamEvent::Start {
2530                    partial: partial.clone(),
2531                }),
2532                Ok(crate::model::StreamEvent::Error {
2533                    reason: StopReason::Error,
2534                    error: {
2535                        partial.stop_reason = StopReason::Error;
2536                        partial
2537                    },
2538                }),
2539            ];
2540
2541            Ok(Box::pin(stream::iter(events)))
2542        }
2543    }
2544
2545    #[test]
2546    fn rpc_auto_retry_retries_then_succeeds() {
2547        let runtime = asupersync::runtime::RuntimeBuilder::new()
2548            .blocking_threads(1, 8)
2549            .build()
2550            .expect("runtime build");
2551        let runtime_handle = runtime.handle();
2552
2553        runtime.block_on(async move {
2554            let provider = Arc::new(FlakyProvider::new());
2555            let tools = ToolRegistry::new(&[], Path::new("."), None);
2556            let agent = Agent::new(provider, tools, AgentConfig::default());
2557            let inner_session = Arc::new(Mutex::new(Session::in_memory()));
2558            let agent_session = AgentSession::new(
2559                agent,
2560                inner_session,
2561                false,
2562                crate::compaction::ResolvedCompactionSettings::default(),
2563            );
2564
2565            let session = Arc::new(Mutex::new(agent_session));
2566
2567            let mut config = Config::default();
2568            config.retry = Some(crate::config::RetrySettings {
2569                enabled: Some(true),
2570                max_retries: Some(1),
2571                base_delay_ms: Some(1),
2572                max_delay_ms: Some(1),
2573            });
2574
2575            let mut shared = RpcSharedState::new(&config);
2576            shared.auto_compaction_enabled = false;
2577            let shared_state = Arc::new(Mutex::new(shared));
2578
2579            let is_streaming = Arc::new(AtomicBool::new(false));
2580            let is_compacting = Arc::new(AtomicBool::new(false));
2581            let abort_handle_slot: Arc<Mutex<Option<AbortHandle>>> = Arc::new(Mutex::new(None));
2582            let retry_abort = Arc::new(AtomicBool::new(false));
2583            let (out_tx, out_rx) = std::sync::mpsc::channel::<String>();
2584
2585            let auth_path = tempfile::tempdir()
2586                .expect("tempdir")
2587                .path()
2588                .join("auth.json");
2589            let auth = AuthStorage::load(auth_path).expect("auth load");
2590
2591            let options = RpcOptions {
2592                config,
2593                resources: ResourceLoader::empty(false),
2594                available_models: Vec::new(),
2595                scoped_models: Vec::new(),
2596                auth,
2597                runtime_handle,
2598            };
2599
2600            run_prompt_with_retry(
2601                session,
2602                shared_state,
2603                is_streaming,
2604                is_compacting,
2605                abort_handle_slot,
2606                out_tx,
2607                retry_abort,
2608                options,
2609                "hello".to_string(),
2610                Vec::new(),
2611                AgentCx::for_request(),
2612            )
2613            .await;
2614
2615            let mut saw_retry_start = false;
2616            let mut saw_retry_end_success = false;
2617
2618            for line in out_rx.try_iter() {
2619                let Ok(value) = serde_json::from_str::<Value>(&line) else {
2620                    continue;
2621                };
2622                let Some(kind) = value.get("type").and_then(Value::as_str) else {
2623                    continue;
2624                };
2625                match kind {
2626                    "auto_retry_start" => {
2627                        saw_retry_start = true;
2628                    }
2629                    "auto_retry_end" => {
2630                        if value.get("success").and_then(Value::as_bool) == Some(true) {
2631                            saw_retry_end_success = true;
2632                        }
2633                    }
2634                    _ => {}
2635                }
2636            }
2637
2638            assert!(saw_retry_start, "missing auto_retry_start event");
2639            assert!(
2640                saw_retry_end_success,
2641                "missing successful auto_retry_end event"
2642            );
2643        });
2644    }
2645
2646    #[test]
2647    fn rpc_abort_retry_emits_ordered_retry_timeline() {
2648        let runtime = asupersync::runtime::RuntimeBuilder::new()
2649            .blocking_threads(1, 8)
2650            .build()
2651            .expect("runtime build");
2652        let runtime_handle = runtime.handle();
2653
2654        runtime.block_on(async move {
2655            let provider = Arc::new(AlwaysErrorProvider);
2656            let tools = ToolRegistry::new(&[], Path::new("."), None);
2657            let agent = Agent::new(provider, tools, AgentConfig::default());
2658            let inner_session = Arc::new(Mutex::new(Session::in_memory()));
2659            let agent_session = AgentSession::new(
2660                agent,
2661                inner_session,
2662                false,
2663                crate::compaction::ResolvedCompactionSettings::default(),
2664            );
2665
2666            let session = Arc::new(Mutex::new(agent_session));
2667
2668            let mut config = Config::default();
2669            config.retry = Some(crate::config::RetrySettings {
2670                enabled: Some(true),
2671                max_retries: Some(3),
2672                base_delay_ms: Some(100),
2673                max_delay_ms: Some(100),
2674            });
2675
2676            let mut shared = RpcSharedState::new(&config);
2677            shared.auto_compaction_enabled = false;
2678            let shared_state = Arc::new(Mutex::new(shared));
2679
2680            let is_streaming = Arc::new(AtomicBool::new(false));
2681            let is_compacting = Arc::new(AtomicBool::new(false));
2682            let abort_handle_slot: Arc<Mutex<Option<AbortHandle>>> = Arc::new(Mutex::new(None));
2683            let retry_abort = Arc::new(AtomicBool::new(false));
2684            let (out_tx, out_rx) = std::sync::mpsc::channel::<String>();
2685
2686            let auth_path = tempfile::tempdir()
2687                .expect("tempdir")
2688                .path()
2689                .join("auth.json");
2690            let auth = AuthStorage::load(auth_path).expect("auth load");
2691
2692            let options = RpcOptions {
2693                config,
2694                resources: ResourceLoader::empty(false),
2695                available_models: Vec::new(),
2696                scoped_models: Vec::new(),
2697                auth,
2698                runtime_handle,
2699            };
2700
2701            let retry_abort_for_thread = Arc::clone(&retry_abort);
2702            let abort_thread = std::thread::spawn(move || {
2703                std::thread::sleep(std::time::Duration::from_millis(10));
2704                retry_abort_for_thread.store(true, Ordering::SeqCst);
2705            });
2706
2707            run_prompt_with_retry(
2708                session,
2709                shared_state,
2710                is_streaming,
2711                is_compacting,
2712                abort_handle_slot,
2713                out_tx,
2714                retry_abort,
2715                options,
2716                "hello".to_string(),
2717                Vec::new(),
2718                AgentCx::for_request(),
2719            )
2720            .await;
2721            abort_thread.join().expect("abort thread join");
2722
2723            let mut timeline = Vec::new();
2724            let mut last_agent_end_error = None::<String>;
2725
2726            for line in out_rx.try_iter() {
2727                let Ok(value) = serde_json::from_str::<Value>(&line) else {
2728                    continue;
2729                };
2730                let Some(kind) = value.get("type").and_then(Value::as_str) else {
2731                    continue;
2732                };
2733                timeline.push(kind.to_string());
2734                if kind == "agent_end" {
2735                    last_agent_end_error = value
2736                        .get("error")
2737                        .and_then(Value::as_str)
2738                        .map(str::to_string);
2739                }
2740            }
2741
2742            let retry_start_idx = timeline
2743                .iter()
2744                .position(|kind| kind == "auto_retry_start")
2745                .expect("missing auto_retry_start");
2746            let retry_end_idx = timeline
2747                .iter()
2748                .position(|kind| kind == "auto_retry_end")
2749                .expect("missing auto_retry_end");
2750            let agent_end_idx = timeline
2751                .iter()
2752                .rposition(|kind| kind == "agent_end")
2753                .expect("missing agent_end");
2754
2755            assert!(
2756                retry_start_idx < retry_end_idx && retry_end_idx < agent_end_idx,
2757                "unexpected retry timeline ordering: {timeline:?}"
2758            );
2759            assert_eq!(
2760                last_agent_end_error.as_deref(),
2761                Some("Retry aborted"),
2762                "expected retry-abort terminal error, timeline: {timeline:?}"
2763            );
2764        });
2765    }
2766}
2767
2768fn should_auto_compact(tokens_before: u64, context_window: u32, reserve_tokens: u32) -> bool {
2769    let reserve = u64::from(reserve_tokens);
2770    let window = u64::from(context_window);
2771    tokens_before > window.saturating_sub(reserve)
2772}
2773
2774#[allow(clippy::too_many_lines)]
2775async fn maybe_auto_compact(
2776    session: Arc<Mutex<AgentSession>>,
2777    options: RpcOptions,
2778    is_compacting: Arc<AtomicBool>,
2779    out_tx: std::sync::mpsc::Sender<String>,
2780) {
2781    let cx = AgentCx::for_request();
2782    let (path_entries, context_window, reserve_tokens, settings) = {
2783        let Ok(guard) = session.lock(cx.cx()).await else {
2784            return;
2785        };
2786        let (path_entries, context_window) = {
2787            let Ok(mut inner_session) = guard.session.lock(cx.cx()).await else {
2788                return;
2789            };
2790            inner_session.ensure_entry_ids();
2791            let Some(entry) = current_model_entry(&inner_session, &options) else {
2792                return;
2793            };
2794            let path_entries = inner_session
2795                .entries_for_current_path()
2796                .into_iter()
2797                .cloned()
2798                .collect::<Vec<_>>();
2799            (path_entries, entry.model.context_window)
2800        };
2801
2802        let reserve_tokens = options.config.compaction_reserve_tokens();
2803        let settings = ResolvedCompactionSettings {
2804            enabled: true,
2805            reserve_tokens,
2806            keep_recent_tokens: options.config.compaction_keep_recent_tokens(),
2807            ..Default::default()
2808        };
2809
2810        (path_entries, context_window, reserve_tokens, settings)
2811    };
2812
2813    let Some(prep) = prepare_compaction(&path_entries, settings) else {
2814        return;
2815    };
2816    if !should_auto_compact(prep.tokens_before, context_window, reserve_tokens) {
2817        return;
2818    }
2819
2820    let _ = out_tx.send(event(&json!({
2821        "type": "auto_compaction_start",
2822        "reason": "threshold",
2823    })));
2824    is_compacting.store(true, Ordering::SeqCst);
2825
2826    let (provider, key) = {
2827        let Ok(guard) = session.lock(cx.cx()).await else {
2828            is_compacting.store(false, Ordering::SeqCst);
2829            return;
2830        };
2831        let Some(key) = guard.agent.stream_options().api_key.clone() else {
2832            is_compacting.store(false, Ordering::SeqCst);
2833            let _ = out_tx.send(event(&json!({
2834                "type": "auto_compaction_end",
2835                "result": Value::Null,
2836                "aborted": false,
2837                "willRetry": false,
2838                "errorMessage": "Missing API key for compaction",
2839            })));
2840            return;
2841        };
2842        (guard.agent.provider(), key)
2843    };
2844
2845    let result = compact(prep, provider, &key, None).await;
2846    is_compacting.store(false, Ordering::SeqCst);
2847
2848    match result {
2849        Ok(result) => {
2850            let details_value = match compaction_details_to_value(&result.details) {
2851                Ok(value) => value,
2852                Err(err) => {
2853                    let _ = out_tx.send(event(&json!({
2854                        "type": "auto_compaction_end",
2855                        "result": Value::Null,
2856                        "aborted": false,
2857                        "willRetry": false,
2858                        "errorMessage": err.to_string(),
2859                    })));
2860                    return;
2861                }
2862            };
2863
2864            let Ok(mut guard) = session.lock(cx.cx()).await else {
2865                return;
2866            };
2867            let messages = {
2868                let Ok(mut inner_session) = guard.session.lock(cx.cx()).await else {
2869                    return;
2870                };
2871                inner_session.append_compaction(
2872                    result.summary.clone(),
2873                    result.first_kept_entry_id.clone(),
2874                    result.tokens_before,
2875                    Some(details_value.clone()),
2876                    None,
2877                );
2878                inner_session.to_messages_for_current_path()
2879            };
2880            let _ = guard.persist_session().await;
2881            guard.agent.replace_messages(messages);
2882            drop(guard);
2883
2884            let _ = out_tx.send(event(&json!({
2885                "type": "auto_compaction_end",
2886                "result": {
2887                    "summary": result.summary,
2888                    "firstKeptEntryId": result.first_kept_entry_id,
2889                    "tokensBefore": result.tokens_before,
2890                    "details": details_value,
2891                },
2892                "aborted": false,
2893                "willRetry": false,
2894            })));
2895        }
2896        Err(err) => {
2897            let _ = out_tx.send(event(&json!({
2898                "type": "auto_compaction_end",
2899                "result": Value::Null,
2900                "aborted": false,
2901                "willRetry": false,
2902                "errorMessage": err.to_string(),
2903            })));
2904        }
2905    }
2906}
2907
2908fn rpc_model_from_entry(entry: &ModelEntry) -> Value {
2909    let input = entry
2910        .model
2911        .input
2912        .iter()
2913        .map(|t| match t {
2914            crate::provider::InputType::Text => "text",
2915            crate::provider::InputType::Image => "image",
2916        })
2917        .collect::<Vec<_>>();
2918
2919    json!({
2920        "id": entry.model.id,
2921        "name": entry.model.name,
2922        "api": entry.model.api,
2923        "provider": entry.model.provider,
2924        "baseUrl": entry.model.base_url,
2925        "reasoning": entry.model.reasoning,
2926        "input": input,
2927        "contextWindow": entry.model.context_window,
2928        "maxTokens": entry.model.max_tokens,
2929        "cost": entry.model.cost,
2930    })
2931}
2932
2933fn session_state(
2934    session: &crate::session::Session,
2935    options: &RpcOptions,
2936    snapshot: &RpcStateSnapshot,
2937    is_streaming: bool,
2938    is_compacting: bool,
2939) -> Value {
2940    let model = session
2941        .header
2942        .provider
2943        .as_deref()
2944        .zip(session.header.model_id.as_deref())
2945        .and_then(|(provider, model_id)| {
2946            options.available_models.iter().find(|m| {
2947                provider_ids_match(&m.model.provider, provider)
2948                    && m.model.id.eq_ignore_ascii_case(model_id)
2949            })
2950        })
2951        .map(rpc_model_from_entry);
2952
2953    let message_count = session
2954        .entries_for_current_path()
2955        .iter()
2956        .filter(|entry| matches!(entry, crate::session::SessionEntry::Message(_)))
2957        .count();
2958
2959    let session_name = session
2960        .entries_for_current_path()
2961        .iter()
2962        .rev()
2963        .find_map(|entry| {
2964            let crate::session::SessionEntry::SessionInfo(info) = entry else {
2965                return None;
2966            };
2967            info.name.clone()
2968        });
2969
2970    let mut state = serde_json::Map::new();
2971    state.insert("model".to_string(), model.unwrap_or(Value::Null));
2972    state.insert(
2973        "thinkingLevel".to_string(),
2974        Value::String(
2975            session
2976                .header
2977                .thinking_level
2978                .clone()
2979                .unwrap_or_else(|| "off".to_string()),
2980        ),
2981    );
2982    state.insert("isStreaming".to_string(), Value::Bool(is_streaming));
2983    state.insert("isCompacting".to_string(), Value::Bool(is_compacting));
2984    state.insert(
2985        "steeringMode".to_string(),
2986        Value::String(snapshot.steering_mode.as_str().to_string()),
2987    );
2988    state.insert(
2989        "followUpMode".to_string(),
2990        Value::String(snapshot.follow_up_mode.as_str().to_string()),
2991    );
2992    state.insert(
2993        "sessionFile".to_string(),
2994        session
2995            .path
2996            .as_ref()
2997            .map_or(Value::Null, |p| Value::String(p.display().to_string())),
2998    );
2999    state.insert(
3000        "sessionId".to_string(),
3001        Value::String(session.header.id.clone()),
3002    );
3003    state.insert(
3004        "sessionName".to_string(),
3005        session_name.map_or(Value::Null, Value::String),
3006    );
3007    state.insert(
3008        "autoCompactionEnabled".to_string(),
3009        Value::Bool(snapshot.auto_compaction_enabled),
3010    );
3011    state.insert(
3012        "messageCount".to_string(),
3013        Value::Number(message_count.into()),
3014    );
3015    state.insert(
3016        "pendingMessageCount".to_string(),
3017        Value::Number(snapshot.pending_count().into()),
3018    );
3019    state.insert(
3020        "durabilityMode".to_string(),
3021        Value::String(session.autosave_durability_mode().as_str().to_string()),
3022    );
3023    Value::Object(state)
3024}
3025
3026fn session_stats(session: &crate::session::Session) -> Value {
3027    let mut user_messages: u64 = 0;
3028    let mut assistant_messages: u64 = 0;
3029    let mut tool_results: u64 = 0;
3030    let mut tool_calls: u64 = 0;
3031
3032    let mut total_input: u64 = 0;
3033    let mut total_output: u64 = 0;
3034    let mut total_cache_read: u64 = 0;
3035    let mut total_cache_write: u64 = 0;
3036    let mut total_cost: f64 = 0.0;
3037
3038    let messages = session.to_messages_for_current_path();
3039
3040    for message in &messages {
3041        match message {
3042            Message::User(_) | Message::Custom(_) => user_messages += 1,
3043            Message::Assistant(message) => {
3044                assistant_messages += 1;
3045                tool_calls += message
3046                    .content
3047                    .iter()
3048                    .filter(|block| matches!(block, ContentBlock::ToolCall(_)))
3049                    .count() as u64;
3050                total_input += message.usage.input;
3051                total_output += message.usage.output;
3052                total_cache_read += message.usage.cache_read;
3053                total_cache_write += message.usage.cache_write;
3054                total_cost += message.usage.cost.total;
3055            }
3056            Message::ToolResult(_) => tool_results += 1,
3057        }
3058    }
3059
3060    let total_messages = messages.len() as u64;
3061
3062    let total_tokens = total_input + total_output + total_cache_read + total_cache_write;
3063    let autosave = session.autosave_metrics();
3064    let pending_message_count = autosave.pending_mutations as u64;
3065    let durability_mode = session.autosave_durability_mode();
3066    let durability_mode_label = match durability_mode {
3067        crate::session::AutosaveDurabilityMode::Strict => "strict",
3068        crate::session::AutosaveDurabilityMode::Balanced => "balanced",
3069        crate::session::AutosaveDurabilityMode::Throughput => "throughput",
3070    };
3071    let (status_event, status_severity, status_summary, status_action, status_sli_ids) =
3072        if pending_message_count == 0 {
3073            (
3074                "session.persistence.healthy",
3075                "ok",
3076                "Persistence queue is clear.",
3077                "No action required.",
3078                vec!["sli_resume_ready_p95_ms"],
3079            )
3080        } else {
3081            let summary = match durability_mode {
3082                crate::session::AutosaveDurabilityMode::Strict => {
3083                    "Pending persistence backlog under strict durability mode."
3084                }
3085                crate::session::AutosaveDurabilityMode::Balanced => {
3086                    "Pending persistence backlog under balanced durability mode."
3087                }
3088                crate::session::AutosaveDurabilityMode::Throughput => {
3089                    "Pending persistence backlog under throughput durability mode."
3090                }
3091            };
3092            let action = match durability_mode {
3093                crate::session::AutosaveDurabilityMode::Throughput => {
3094                    "Expect deferred writes; trigger manual save before critical transitions."
3095                }
3096                _ => "Allow autosave flush to complete or trigger manual save before exit.",
3097            };
3098            (
3099                "session.persistence.backlog",
3100                "warning",
3101                summary,
3102                action,
3103                vec![
3104                    "sli_resume_ready_p95_ms",
3105                    "sli_failure_recovery_success_rate",
3106                ],
3107            )
3108        };
3109
3110    let mut data = serde_json::Map::new();
3111    data.insert(
3112        "sessionFile".to_string(),
3113        session
3114            .path
3115            .as_ref()
3116            .map_or(Value::Null, |p| Value::String(p.display().to_string())),
3117    );
3118    data.insert(
3119        "sessionId".to_string(),
3120        Value::String(session.header.id.clone()),
3121    );
3122    data.insert(
3123        "userMessages".to_string(),
3124        Value::Number(user_messages.into()),
3125    );
3126    data.insert(
3127        "assistantMessages".to_string(),
3128        Value::Number(assistant_messages.into()),
3129    );
3130    data.insert("toolCalls".to_string(), Value::Number(tool_calls.into()));
3131    data.insert(
3132        "toolResults".to_string(),
3133        Value::Number(tool_results.into()),
3134    );
3135    data.insert(
3136        "totalMessages".to_string(),
3137        Value::Number(total_messages.into()),
3138    );
3139    data.insert(
3140        "durabilityMode".to_string(),
3141        Value::String(durability_mode_label.to_string()),
3142    );
3143    data.insert(
3144        "pendingMessageCount".to_string(),
3145        Value::Number(pending_message_count.into()),
3146    );
3147    data.insert(
3148        "tokens".to_string(),
3149        json!({
3150            "input": total_input,
3151            "output": total_output,
3152            "cacheRead": total_cache_read,
3153            "cacheWrite": total_cache_write,
3154            "total": total_tokens,
3155        }),
3156    );
3157    data.insert(
3158        "persistenceStatus".to_string(),
3159        json!({
3160            "event": status_event,
3161            "severity": status_severity,
3162            "summary": status_summary,
3163            "action": status_action,
3164            "sliIds": status_sli_ids,
3165            "pendingMessageCount": pending_message_count,
3166            "flushCounters": {
3167                "started": autosave.flush_started,
3168                "succeeded": autosave.flush_succeeded,
3169                "failed": autosave.flush_failed,
3170            },
3171        }),
3172    );
3173    data.insert(
3174        "uxEventMarkers".to_string(),
3175        json!([
3176            {
3177                "event": status_event,
3178                "severity": status_severity,
3179                "durabilityMode": durability_mode_label,
3180                "pendingMessageCount": pending_message_count,
3181                "sliIds": status_sli_ids,
3182            }
3183        ]),
3184    );
3185    data.insert("cost".to_string(), Value::from(total_cost));
3186    Value::Object(data)
3187}
3188
3189fn last_assistant_text(session: &crate::session::Session) -> Option<String> {
3190    let entries = session.entries_for_current_path();
3191    for entry in entries.into_iter().rev() {
3192        let crate::session::SessionEntry::Message(msg_entry) = entry else {
3193            continue;
3194        };
3195        let SessionMessage::Assistant { message } = &msg_entry.message else {
3196            continue;
3197        };
3198        let mut text = String::new();
3199        for block in &message.content {
3200            if let ContentBlock::Text(t) = block {
3201                text.push_str(&t.text);
3202            }
3203        }
3204        if !text.is_empty() {
3205            return Some(text);
3206        }
3207    }
3208    None
3209}
3210
3211/// Export HTML from a lightweight `ExportSnapshot` (non-blocking path).
3212///
3213/// The snapshot is captured under a brief lock, so the HTML rendering and
3214/// file I/O happen entirely outside any session lock.
3215async fn export_html_snapshot(
3216    snapshot: &crate::session::ExportSnapshot,
3217    output_path: Option<&str>,
3218) -> Result<String> {
3219    let html = snapshot.to_html();
3220
3221    let path = output_path.map_or_else(
3222        || {
3223            snapshot.path.as_ref().map_or_else(
3224                || {
3225                    let ts = chrono::Utc::now().format("%Y-%m-%dT%H-%M-%S%.3fZ");
3226                    PathBuf::from(format!("pi-session-{ts}.html"))
3227                },
3228                |session_path| {
3229                    let basename = session_path
3230                        .file_stem()
3231                        .and_then(|s| s.to_str())
3232                        .unwrap_or("session");
3233                    PathBuf::from(format!("pi-session-{basename}.html"))
3234                },
3235            )
3236        },
3237        PathBuf::from,
3238    );
3239
3240    if let Some(parent) = path.parent().filter(|p| !p.as_os_str().is_empty()) {
3241        asupersync::fs::create_dir_all(parent).await?;
3242    }
3243    asupersync::fs::write(&path, html).await?;
3244    Ok(path.display().to_string())
3245}
3246
3247#[derive(Debug, Clone)]
3248struct BashRpcResult {
3249    output: String,
3250    exit_code: i32,
3251    cancelled: bool,
3252    truncated: bool,
3253    full_output_path: Option<String>,
3254}
3255
3256const fn line_count_from_newline_count(
3257    total_bytes: usize,
3258    newline_count: usize,
3259    last_byte_was_newline: bool,
3260) -> usize {
3261    if total_bytes == 0 {
3262        0
3263    } else if last_byte_was_newline {
3264        newline_count
3265    } else {
3266        newline_count.saturating_add(1)
3267    }
3268}
3269
3270async fn ingest_bash_rpc_chunk(
3271    bytes: Vec<u8>,
3272    chunks: &mut VecDeque<Vec<u8>>,
3273    chunks_bytes: &mut usize,
3274    total_bytes: &mut usize,
3275    total_lines: &mut usize,
3276    last_byte_was_newline: &mut bool,
3277    temp_file: &mut Option<asupersync::fs::File>,
3278    temp_file_path: &mut Option<PathBuf>,
3279    spill_failed: &mut bool,
3280    max_chunks_bytes: usize,
3281) {
3282    if bytes.is_empty() {
3283        return;
3284    }
3285
3286    *last_byte_was_newline = bytes.last().is_some_and(|byte| *byte == b'\n');
3287    *total_bytes = total_bytes.saturating_add(bytes.len());
3288    *total_lines = total_lines.saturating_add(memchr_iter(b'\n', &bytes).count());
3289
3290    // Spill to temp file if we exceed the limit
3291    if *total_bytes > DEFAULT_MAX_BYTES && temp_file.is_none() && !*spill_failed {
3292        let id_full = uuid::Uuid::new_v4().simple().to_string();
3293        let id = &id_full[..16];
3294        let path = std::env::temp_dir().join(format!("pi-rpc-bash-{id}.log"));
3295
3296        // Secure synchronous creation
3297        let expected_inode: Option<u64> = {
3298            let mut options = std::fs::OpenOptions::new();
3299            options.write(true).create_new(true);
3300            #[cfg(unix)]
3301            {
3302                use std::os::unix::fs::OpenOptionsExt;
3303                options.mode(0o600);
3304            }
3305
3306            match options.open(&path) {
3307                Ok(file) => {
3308                    #[cfg(unix)]
3309                    {
3310                        use std::os::unix::fs::MetadataExt;
3311                        file.metadata().ok().map(|m| m.ino())
3312                    }
3313                    #[cfg(not(unix))]
3314                    {
3315                        None
3316                    }
3317                }
3318                Err(e) => {
3319                    tracing::warn!("Failed to create bash temp file: {e}");
3320                    None
3321                }
3322            }
3323        };
3324
3325        if expected_inode.is_some() || !cfg!(unix) {
3326            // Re-open async for writing
3327            match asupersync::fs::OpenOptions::new()
3328                .append(true)
3329                .open(&path)
3330                .await
3331            {
3332                Ok(mut file) => {
3333                    // Validate identity to prevent TOCTOU/symlink attacks
3334                    let mut identity_match = true;
3335                    #[cfg(unix)]
3336                    if let Some(expected) = expected_inode {
3337                        use std::os::unix::fs::MetadataExt;
3338                        match file.metadata().await {
3339                            Ok(meta) => {
3340                                if meta.ino() != expected {
3341                                    tracing::warn!(
3342                                        "Temp file identity mismatch (possible TOCTOU attack)"
3343                                    );
3344                                    identity_match = false;
3345                                }
3346                            }
3347                            Err(e) => {
3348                                tracing::warn!("Failed to stat temp file: {e}");
3349                                identity_match = false;
3350                            }
3351                        }
3352                    }
3353
3354                    if identity_match {
3355                        // Flush existing chunks to the new file
3356                        for existing in chunks.iter() {
3357                            use asupersync::io::AsyncWriteExt;
3358                            if let Err(e) = file.write_all(existing).await {
3359                                tracing::warn!("Failed to flush bash chunk to temp file: {e}");
3360                                *spill_failed = true;
3361                                break;
3362                            }
3363                        }
3364                        if !*spill_failed {
3365                            *temp_file = Some(file);
3366                            *temp_file_path = Some(path);
3367                        }
3368                    } else {
3369                        let _ = std::fs::remove_file(&path);
3370                        *spill_failed = true;
3371                    }
3372                }
3373                Err(e) => {
3374                    tracing::warn!("Failed to reopen bash temp file async: {e}");
3375                    // Clean up the empty file we just created
3376                    let _ = std::fs::remove_file(&path);
3377                    *spill_failed = true;
3378                }
3379            }
3380        } else {
3381            *spill_failed = true;
3382        }
3383    }
3384
3385    // Write new chunk to file if we have one
3386    if let Some(file) = temp_file.as_mut() {
3387        if *total_bytes <= crate::tools::BASH_FILE_LIMIT_BYTES {
3388            use asupersync::io::AsyncWriteExt;
3389            if let Err(e) = file.write_all(&bytes).await {
3390                tracing::warn!("Failed to write bash chunk to temp file: {e}");
3391                *spill_failed = true;
3392                *temp_file = None;
3393            }
3394        } else {
3395            // Hard limit reached. Stop writing and close the file to release the FD.
3396            if !*spill_failed {
3397                tracing::warn!("Bash output exceeded hard limit; stopping file log");
3398                *spill_failed = true;
3399                *temp_file = None;
3400            }
3401        }
3402    }
3403
3404    // Update memory buffer
3405    *chunks_bytes = chunks_bytes.saturating_add(bytes.len());
3406    chunks.push_back(bytes);
3407    while *chunks_bytes > max_chunks_bytes && chunks.len() > 1 {
3408        if let Some(front) = chunks.pop_front() {
3409            *chunks_bytes = chunks_bytes.saturating_sub(front.len());
3410        }
3411    }
3412}
3413
3414async fn run_bash_rpc(
3415    cwd: &std::path::Path,
3416    command: &str,
3417    abort_rx: oneshot::Receiver<()>,
3418) -> Result<BashRpcResult> {
3419    #[derive(Clone, Copy)]
3420    enum StreamKind {
3421        Stdout,
3422        Stderr,
3423    }
3424
3425    struct StreamChunk {
3426        kind: StreamKind,
3427        bytes: Vec<u8>,
3428    }
3429
3430    fn pump_stream(
3431        mut reader: impl std::io::Read,
3432        tx: std::sync::mpsc::SyncSender<StreamChunk>,
3433        kind: StreamKind,
3434    ) {
3435        let mut buf = [0u8; 8192];
3436        loop {
3437            let read = match reader.read(&mut buf) {
3438                Ok(0) | Err(_) => break,
3439                Ok(read) => read,
3440            };
3441            let chunk = StreamChunk {
3442                kind,
3443                bytes: buf[..read].to_vec(),
3444            };
3445            if tx.send(chunk).is_err() {
3446                break;
3447            }
3448        }
3449    }
3450
3451    let shell = ["/bin/bash", "/usr/bin/bash", "/usr/local/bin/bash"]
3452        .into_iter()
3453        .find(|p| std::path::Path::new(p).exists())
3454        .unwrap_or("sh");
3455
3456    let command = format!("trap 'code=$?; wait; exit $code' EXIT\n{command}");
3457
3458    let mut child = std::process::Command::new(shell)
3459        .arg("-c")
3460        .arg(&command)
3461        .current_dir(cwd)
3462        .stdin(std::process::Stdio::null())
3463        .stdout(std::process::Stdio::piped())
3464        .stderr(std::process::Stdio::piped())
3465        .spawn()
3466        .map_err(|e| Error::tool("bash", format!("Failed to spawn shell: {e}")))?;
3467
3468    let Some(stdout) = child.stdout.take() else {
3469        return Err(Error::tool("bash", "Missing stdout".to_string()));
3470    };
3471    let Some(stderr) = child.stderr.take() else {
3472        return Err(Error::tool("bash", "Missing stderr".to_string()));
3473    };
3474
3475    let mut guard = crate::tools::ProcessGuard::new(child, true);
3476
3477    let (tx, rx) = std::sync::mpsc::sync_channel::<StreamChunk>(128);
3478    let tx_stdout = tx.clone();
3479    let _stdout_handle =
3480        std::thread::spawn(move || pump_stream(stdout, tx_stdout, StreamKind::Stdout));
3481    let _stderr_handle = std::thread::spawn(move || pump_stream(stderr, tx, StreamKind::Stderr));
3482
3483    let tick = Duration::from_millis(10);
3484
3485    // Bounded buffer state (same logic as BashTool)
3486    let mut chunks: VecDeque<Vec<u8>> = VecDeque::new();
3487    let mut chunks_bytes = 0usize;
3488    let mut total_bytes = 0usize;
3489    let mut total_lines = 0usize;
3490    let mut last_byte_was_newline = false;
3491    let mut temp_file: Option<asupersync::fs::File> = None;
3492    let mut temp_file_path: Option<PathBuf> = None;
3493    let max_chunks_bytes = DEFAULT_MAX_BYTES * 2;
3494
3495    let mut cancelled = false;
3496    let mut spill_failed = false;
3497
3498    let exit_code = loop {
3499        while let Ok(chunk) = rx.try_recv() {
3500            ingest_bash_rpc_chunk(
3501                chunk.bytes,
3502                &mut chunks,
3503                &mut chunks_bytes,
3504                &mut total_bytes,
3505                &mut total_lines,
3506                &mut last_byte_was_newline,
3507                &mut temp_file,
3508                &mut temp_file_path,
3509                &mut spill_failed,
3510                max_chunks_bytes,
3511            )
3512            .await;
3513        }
3514
3515        if !cancelled && abort_rx.try_recv().is_ok() {
3516            cancelled = true;
3517            if let Ok(Some(status)) = guard.kill() {
3518                break status.code().unwrap_or(-1);
3519            }
3520        }
3521
3522        match guard.try_wait_child() {
3523            Ok(Some(status)) => break status.code().unwrap_or(-1),
3524            Ok(None) => {}
3525            Err(err) => {
3526                return Err(Error::tool(
3527                    "bash",
3528                    format!("Failed to wait for process: {err}"),
3529                ));
3530            }
3531        }
3532
3533        sleep(wall_now(), tick).await;
3534    };
3535
3536    // Drain remaining output
3537    let drain_deadline = Instant::now() + Duration::from_secs(2);
3538    let mut drain_timed_out = false;
3539    loop {
3540        match rx.try_recv() {
3541            Ok(chunk) => {
3542                ingest_bash_rpc_chunk(
3543                    chunk.bytes,
3544                    &mut chunks,
3545                    &mut chunks_bytes,
3546                    &mut total_bytes,
3547                    &mut total_lines,
3548                    &mut last_byte_was_newline,
3549                    &mut temp_file,
3550                    &mut temp_file_path,
3551                    &mut spill_failed,
3552                    max_chunks_bytes,
3553                )
3554                .await;
3555            }
3556            Err(std::sync::mpsc::TryRecvError::Empty) => {
3557                if Instant::now() >= drain_deadline {
3558                    drain_timed_out = true;
3559                    break;
3560                }
3561                sleep(wall_now(), tick).await;
3562            }
3563            Err(std::sync::mpsc::TryRecvError::Disconnected) => break,
3564        }
3565    }
3566
3567    // Drop the receiver to close the channel.
3568    // This ensures that any `tx.send()` calls in the pump threads return an error (Disconnected)
3569    // instead of blocking if the channel is full.
3570    // We intentionally do NOT join() the pump threads because if a background child process
3571    // inherits stdout/stderr, the pipe remains open and `read()` blocks indefinitely,
3572    // which would cause `join()` to hang the entire agent.
3573    drop(rx);
3574
3575    // Explicitly drop the temp file handle to ensure any buffered data is flushed to disk
3576    // before we potentially return the path to the caller.
3577    drop(temp_file);
3578
3579    // Construct final output from memory buffer
3580    let mut combined = Vec::with_capacity(chunks_bytes);
3581    for chunk in chunks {
3582        combined.extend_from_slice(&chunk);
3583    }
3584    let tail_output = String::from_utf8_lossy(&combined).to_string();
3585
3586    let mut truncation = truncate_tail(tail_output, DEFAULT_MAX_LINES, DEFAULT_MAX_BYTES);
3587    if total_bytes > chunks_bytes {
3588        truncation.truncated = true;
3589        truncation.truncated_by = Some(crate::tools::TruncatedBy::Bytes);
3590        truncation.total_bytes = total_bytes;
3591        truncation.total_lines =
3592            line_count_from_newline_count(total_bytes, total_lines, last_byte_was_newline);
3593    } else if drain_timed_out {
3594        truncation.truncated = true;
3595        truncation.truncated_by = Some(crate::tools::TruncatedBy::Bytes);
3596    }
3597    let will_truncate = truncation.truncated;
3598
3599    let mut output_text = if truncation.content.is_empty() {
3600        "(no output)".to_string()
3601    } else {
3602        truncation.content
3603    };
3604
3605    if drain_timed_out {
3606        output_text.push_str("\n... [Output truncated: drain timeout]");
3607    }
3608
3609    Ok(BashRpcResult {
3610        output: output_text,
3611        exit_code,
3612        cancelled,
3613        truncated: will_truncate,
3614        full_output_path: temp_file_path.map(|p| p.display().to_string()),
3615    })
3616}
3617
3618fn parse_prompt_images(value: Option<&Value>) -> Result<Vec<ImageContent>> {
3619    let Some(value) = value else {
3620        return Ok(Vec::new());
3621    };
3622    let Some(arr) = value.as_array() else {
3623        return Err(Error::validation("images must be an array"));
3624    };
3625
3626    let mut images = Vec::new();
3627    for item in arr {
3628        let Some(obj) = item.as_object() else {
3629            continue;
3630        };
3631        let item_type = obj.get("type").and_then(Value::as_str).unwrap_or("");
3632        if item_type != "image" {
3633            continue;
3634        }
3635        let Some(source) = obj.get("source").and_then(Value::as_object) else {
3636            continue;
3637        };
3638        let source_type = source.get("type").and_then(Value::as_str).unwrap_or("");
3639        if source_type != "base64" {
3640            continue;
3641        }
3642        let Some(media_type) = source.get("mediaType").and_then(Value::as_str) else {
3643            continue;
3644        };
3645        let Some(data) = source.get("data").and_then(Value::as_str) else {
3646            continue;
3647        };
3648        images.push(ImageContent {
3649            data: data.to_string(),
3650            mime_type: media_type.to_string(),
3651        });
3652    }
3653    Ok(images)
3654}
3655
3656fn resolve_model_key(auth: &AuthStorage, entry: &ModelEntry) -> Option<String> {
3657    normalize_api_key_opt(auth.resolve_api_key(&entry.model.provider, None))
3658        .or_else(|| normalize_api_key_opt(entry.api_key.clone()))
3659}
3660
3661fn normalize_api_key_opt(api_key: Option<String>) -> Option<String> {
3662    api_key.and_then(|key| {
3663        let trimmed = key.trim();
3664        (!trimmed.is_empty()).then(|| trimmed.to_string())
3665    })
3666}
3667
3668fn model_requires_configured_credential(entry: &ModelEntry) -> bool {
3669    let provider = entry.model.provider.as_str();
3670    entry.auth_header
3671        || provider_metadata(provider).is_some_and(|meta| !meta.auth_env_keys.is_empty())
3672        || entry.oauth_config.is_some()
3673}
3674
3675fn parse_thinking_level(level: &str) -> Result<crate::model::ThinkingLevel> {
3676    level.parse().map_err(|err: String| Error::validation(err))
3677}
3678
3679fn current_model_entry<'a>(
3680    session: &crate::session::Session,
3681    options: &'a RpcOptions,
3682) -> Option<&'a ModelEntry> {
3683    let provider = session.header.provider.as_deref()?;
3684    let model_id = session.header.model_id.as_deref()?;
3685    options.available_models.iter().find(|m| {
3686        provider_ids_match(&m.model.provider, provider) && m.model.id.eq_ignore_ascii_case(model_id)
3687    })
3688}
3689
3690async fn apply_thinking_level(
3691    guard: &mut AgentSession,
3692    level: crate::model::ThinkingLevel,
3693) -> Result<()> {
3694    let cx = AgentCx::for_request();
3695    {
3696        let mut inner_session = guard
3697            .session
3698            .lock(cx.cx())
3699            .await
3700            .map_err(|err| Error::session(format!("inner session lock failed: {err}")))?;
3701        inner_session.header.thinking_level = Some(level.to_string());
3702        inner_session.append_thinking_level_change(level.to_string());
3703    }
3704    guard.agent.stream_options_mut().thinking_level = Some(level);
3705    guard.persist_session().await
3706}
3707
3708async fn apply_model_change(guard: &mut AgentSession, entry: &ModelEntry) -> Result<()> {
3709    let cx = AgentCx::for_request();
3710    {
3711        let mut inner_session = guard
3712            .session
3713            .lock(cx.cx())
3714            .await
3715            .map_err(|err| Error::session(format!("inner session lock failed: {err}")))?;
3716        inner_session.header.provider = Some(entry.model.provider.clone());
3717        inner_session.header.model_id = Some(entry.model.id.clone());
3718        inner_session.append_model_change(entry.model.provider.clone(), entry.model.id.clone());
3719    }
3720    guard.persist_session().await
3721}
3722
3723/// Extract user messages from a pre-captured list of session entries.
3724///
3725/// Used by the non-blocking `get_fork_messages` path where entries are
3726/// captured under a brief lock and messages are computed outside the lock.
3727fn fork_messages_from_entries(entries: &[crate::session::SessionEntry]) -> Vec<Value> {
3728    let mut result = Vec::new();
3729
3730    for entry in entries {
3731        let crate::session::SessionEntry::Message(m) = entry else {
3732            continue;
3733        };
3734        let SessionMessage::User { content, .. } = &m.message else {
3735            continue;
3736        };
3737        let entry_id = m.base.id.clone().unwrap_or_default();
3738        let text = extract_user_text(content);
3739        result.push(json!({
3740            "entryId": entry_id,
3741            "text": text,
3742        }));
3743    }
3744
3745    result
3746}
3747
3748fn extract_user_text(content: &crate::model::UserContent) -> Option<String> {
3749    match content {
3750        crate::model::UserContent::Text(text) => Some(text.clone()),
3751        crate::model::UserContent::Blocks(blocks) => blocks.iter().find_map(|b| {
3752            if let ContentBlock::Text(t) = b {
3753                Some(t.text.clone())
3754            } else {
3755                None
3756            }
3757        }),
3758    }
3759}
3760
3761/// Returns the available thinking levels for a model.
3762/// For reasoning models, returns the full range; for non-reasoning, returns only Off.
3763fn available_thinking_levels(entry: &ModelEntry) -> Vec<crate::model::ThinkingLevel> {
3764    use crate::model::ThinkingLevel;
3765    if entry.model.reasoning {
3766        let mut levels = vec![
3767            ThinkingLevel::Off,
3768            ThinkingLevel::Minimal,
3769            ThinkingLevel::Low,
3770            ThinkingLevel::Medium,
3771            ThinkingLevel::High,
3772        ];
3773        if entry.supports_xhigh() {
3774            levels.push(ThinkingLevel::XHigh);
3775        }
3776        levels
3777    } else {
3778        vec![ThinkingLevel::Off]
3779    }
3780}
3781
3782/// Cycles through scoped models (if any) and returns the next model.
3783/// Returns (ModelEntry, ThinkingLevel, is_from_scoped_models).
3784async fn cycle_model_for_rpc(
3785    guard: &mut AgentSession,
3786    options: &RpcOptions,
3787) -> Result<Option<(ModelEntry, crate::model::ThinkingLevel, bool)>> {
3788    let (candidates, is_scoped) = if options.scoped_models.is_empty() {
3789        (options.available_models.clone(), false)
3790    } else {
3791        (
3792            options
3793                .scoped_models
3794                .iter()
3795                .map(|sm| sm.model.clone())
3796                .collect::<Vec<_>>(),
3797            true,
3798        )
3799    };
3800
3801    if candidates.len() <= 1 {
3802        return Ok(None);
3803    }
3804
3805    let cx = AgentCx::for_request();
3806    let (current_provider, current_model_id) = {
3807        let inner_session = guard
3808            .session
3809            .lock(cx.cx())
3810            .await
3811            .map_err(|err| Error::session(format!("inner session lock failed: {err}")))?;
3812        (
3813            inner_session.header.provider.clone(),
3814            inner_session.header.model_id.clone(),
3815        )
3816    };
3817
3818    let current_index = candidates.iter().position(|entry| {
3819        current_provider
3820            .as_deref()
3821            .is_some_and(|provider| provider_ids_match(provider, &entry.model.provider))
3822            && current_model_id
3823                .as_deref()
3824                .is_some_and(|model_id| model_id.eq_ignore_ascii_case(&entry.model.id))
3825    });
3826
3827    let next_index = current_index.map_or(0, |idx| (idx + 1) % candidates.len());
3828
3829    let next_entry = candidates[next_index].clone();
3830    let provider_impl = crate::providers::create_provider(
3831        &next_entry,
3832        guard
3833            .extensions
3834            .as_ref()
3835            .map(crate::extensions::ExtensionRegion::manager),
3836    )?;
3837    guard.agent.set_provider(provider_impl);
3838
3839    let key = resolve_model_key(&options.auth, &next_entry);
3840    if model_requires_configured_credential(&next_entry) && key.is_none() {
3841        return Err(Error::auth(format!(
3842            "Missing credentials for {}/{}",
3843            next_entry.model.provider, next_entry.model.id
3844        )));
3845    }
3846    guard.agent.stream_options_mut().api_key.clone_from(&key);
3847    guard
3848        .agent
3849        .stream_options_mut()
3850        .headers
3851        .clone_from(&next_entry.headers);
3852
3853    apply_model_change(guard, &next_entry).await?;
3854
3855    let desired_thinking = if is_scoped {
3856        options.scoped_models[next_index]
3857            .thinking_level
3858            .unwrap_or(crate::model::ThinkingLevel::Off)
3859    } else {
3860        guard
3861            .agent
3862            .stream_options()
3863            .thinking_level
3864            .unwrap_or_default()
3865    };
3866
3867    let next_thinking = next_entry.clamp_thinking_level(desired_thinking);
3868    apply_thinking_level(guard, next_thinking).await?;
3869
3870    Ok(Some((next_entry, next_thinking, is_scoped)))
3871}
3872
3873#[cfg(test)]
3874mod tests {
3875    use super::*;
3876    use crate::auth::AuthCredential;
3877    use crate::model::{
3878        ContentBlock, ImageContent, TextContent, ThinkingLevel, UserContent, UserMessage,
3879    };
3880    use crate::provider::{InputType, Model, ModelCost};
3881    use crate::session::Session;
3882    use serde_json::json;
3883    use std::collections::HashMap;
3884
3885    // -----------------------------------------------------------------------
3886    // Helper builders
3887    // -----------------------------------------------------------------------
3888
3889    fn dummy_model(id: &str, reasoning: bool) -> Model {
3890        Model {
3891            id: id.to_string(),
3892            name: id.to_string(),
3893            api: "anthropic".to_string(),
3894            provider: "anthropic".to_string(),
3895            base_url: "https://api.anthropic.com".to_string(),
3896            reasoning,
3897            input: vec![InputType::Text],
3898            cost: ModelCost {
3899                input: 3.0,
3900                output: 15.0,
3901                cache_read: 0.3,
3902                cache_write: 3.75,
3903            },
3904            context_window: 200_000,
3905            max_tokens: 8192,
3906            headers: HashMap::new(),
3907        }
3908    }
3909
3910    fn dummy_entry(id: &str, reasoning: bool) -> ModelEntry {
3911        ModelEntry {
3912            model: dummy_model(id, reasoning),
3913            api_key: None,
3914            headers: HashMap::new(),
3915            auth_header: false,
3916            compat: None,
3917            oauth_config: None,
3918        }
3919    }
3920
3921    fn rpc_options_with_models(available_models: Vec<ModelEntry>) -> RpcOptions {
3922        let runtime = asupersync::runtime::RuntimeBuilder::new()
3923            .blocking_threads(1, 1)
3924            .build()
3925            .expect("runtime build");
3926        let runtime_handle = runtime.handle();
3927
3928        let auth_path = tempfile::tempdir()
3929            .expect("tempdir")
3930            .path()
3931            .join("auth.json");
3932        let auth = AuthStorage::load(auth_path).expect("auth load");
3933
3934        RpcOptions {
3935            config: Config::default(),
3936            resources: ResourceLoader::empty(false),
3937            available_models,
3938            scoped_models: Vec::new(),
3939            auth,
3940            runtime_handle,
3941        }
3942    }
3943
3944    #[test]
3945    fn line_count_from_newline_count_matches_trailing_newline_semantics() {
3946        assert_eq!(line_count_from_newline_count(0, 0, false), 0);
3947        assert_eq!(line_count_from_newline_count(2, 1, true), 1);
3948        assert_eq!(line_count_from_newline_count(1, 0, false), 1);
3949        assert_eq!(line_count_from_newline_count(3, 1, false), 2);
3950    }
3951
3952    // -----------------------------------------------------------------------
3953    // parse_queue_mode
3954    // -----------------------------------------------------------------------
3955
3956    #[test]
3957    fn parse_queue_mode_all() {
3958        assert_eq!(parse_queue_mode(Some("all")), Some(QueueMode::All));
3959    }
3960
3961    #[test]
3962    fn parse_queue_mode_one_at_a_time() {
3963        assert_eq!(
3964            parse_queue_mode(Some("one-at-a-time")),
3965            Some(QueueMode::OneAtATime)
3966        );
3967    }
3968
3969    #[test]
3970    fn parse_queue_mode_none_value() {
3971        assert_eq!(parse_queue_mode(None), None);
3972    }
3973
3974    #[test]
3975    fn parse_queue_mode_unknown_returns_none() {
3976        assert_eq!(parse_queue_mode(Some("batch")), None);
3977        assert_eq!(parse_queue_mode(Some("")), None);
3978    }
3979
3980    #[test]
3981    fn parse_queue_mode_trims_whitespace() {
3982        assert_eq!(parse_queue_mode(Some("  all  ")), Some(QueueMode::All));
3983    }
3984
3985    #[test]
3986    fn provider_ids_match_accepts_aliases() {
3987        assert!(provider_ids_match("openrouter", "open-router"));
3988        assert!(provider_ids_match("google-gemini-cli", "gemini-cli"));
3989        assert!(!provider_ids_match("openai", "anthropic"));
3990    }
3991
3992    #[test]
3993    fn resolve_model_key_prefers_stored_auth_key_over_inline_entry_key() {
3994        let mut entry = dummy_entry("gpt-4o-mini", true);
3995        entry.model.provider = "openai".to_string();
3996        entry.auth_header = true;
3997        entry.api_key = Some("inline-model-key".to_string());
3998
3999        let auth_path = tempfile::tempdir()
4000            .expect("tempdir")
4001            .path()
4002            .join("auth.json");
4003        let mut auth = AuthStorage::load(auth_path).expect("auth load");
4004        auth.set(
4005            "openai".to_string(),
4006            AuthCredential::ApiKey {
4007                key: "stored-auth-key".to_string(),
4008            },
4009        );
4010
4011        assert_eq!(
4012            resolve_model_key(&auth, &entry).as_deref(),
4013            Some("stored-auth-key")
4014        );
4015    }
4016
4017    #[test]
4018    fn resolve_model_key_ignores_blank_inline_key_and_falls_back_to_auth_storage() {
4019        let mut entry = dummy_entry("gpt-4o-mini", true);
4020        entry.model.provider = "openai".to_string();
4021        entry.auth_header = true;
4022        entry.api_key = Some("   ".to_string());
4023
4024        let auth_path = tempfile::tempdir()
4025            .expect("tempdir")
4026            .path()
4027            .join("auth.json");
4028        let mut auth = AuthStorage::load(auth_path).expect("auth load");
4029        auth.set(
4030            "openai".to_string(),
4031            AuthCredential::ApiKey {
4032                key: "stored-auth-key".to_string(),
4033            },
4034        );
4035
4036        assert_eq!(
4037            resolve_model_key(&auth, &entry).as_deref(),
4038            Some("stored-auth-key")
4039        );
4040    }
4041
4042    #[test]
4043    fn unknown_keyless_model_does_not_require_credentials() {
4044        let mut entry = dummy_entry("dev-model", false);
4045        entry.model.provider = "acme-local".to_string();
4046        entry.auth_header = false;
4047        entry.oauth_config = None;
4048
4049        assert!(!model_requires_configured_credential(&entry));
4050    }
4051
4052    #[test]
4053    fn anthropic_model_requires_credentials_even_without_auth_header() {
4054        let mut entry = dummy_entry("claude-sonnet-4-6", true);
4055        entry.model.provider = "anthropic".to_string();
4056        entry.auth_header = false;
4057        entry.oauth_config = None;
4058
4059        assert!(model_requires_configured_credential(&entry));
4060    }
4061
4062    // -----------------------------------------------------------------------
4063    // parse_streaming_behavior
4064    // -----------------------------------------------------------------------
4065
4066    #[test]
4067    fn parse_streaming_behavior_steer() {
4068        let val = json!("steer");
4069        let result = parse_streaming_behavior(Some(&val)).unwrap();
4070        assert_eq!(result, Some(StreamingBehavior::Steer));
4071    }
4072
4073    #[test]
4074    fn parse_streaming_behavior_follow_up_hyphenated() {
4075        let val = json!("follow-up");
4076        let result = parse_streaming_behavior(Some(&val)).unwrap();
4077        assert_eq!(result, Some(StreamingBehavior::FollowUp));
4078    }
4079
4080    #[test]
4081    fn parse_streaming_behavior_follow_up_camel() {
4082        let val = json!("followUp");
4083        let result = parse_streaming_behavior(Some(&val)).unwrap();
4084        assert_eq!(result, Some(StreamingBehavior::FollowUp));
4085    }
4086
4087    #[test]
4088    fn parse_streaming_behavior_none() {
4089        let result = parse_streaming_behavior(None).unwrap();
4090        assert_eq!(result, None);
4091    }
4092
4093    #[test]
4094    fn parse_streaming_behavior_invalid_string() {
4095        let val = json!("invalid");
4096        assert!(parse_streaming_behavior(Some(&val)).is_err());
4097    }
4098
4099    #[test]
4100    fn parse_streaming_behavior_non_string_errors() {
4101        let val = json!(42);
4102        assert!(parse_streaming_behavior(Some(&val)).is_err());
4103    }
4104
4105    // -----------------------------------------------------------------------
4106    // normalize_command_type
4107    // -----------------------------------------------------------------------
4108
4109    #[test]
4110    fn normalize_command_type_passthrough() {
4111        assert_eq!(normalize_command_type("prompt"), "prompt");
4112        assert_eq!(normalize_command_type("compact"), "compact");
4113    }
4114
4115    #[test]
4116    fn normalize_command_type_follow_up_aliases() {
4117        assert_eq!(normalize_command_type("follow-up"), "follow_up");
4118        assert_eq!(normalize_command_type("followUp"), "follow_up");
4119        assert_eq!(normalize_command_type("queue-follow-up"), "follow_up");
4120        assert_eq!(normalize_command_type("queueFollowUp"), "follow_up");
4121    }
4122
4123    #[test]
4124    fn normalize_command_type_kebab_and_camel_aliases() {
4125        assert_eq!(normalize_command_type("get-state"), "get_state");
4126        assert_eq!(normalize_command_type("getState"), "get_state");
4127        assert_eq!(normalize_command_type("set-model"), "set_model");
4128        assert_eq!(normalize_command_type("setModel"), "set_model");
4129        assert_eq!(
4130            normalize_command_type("set-steering-mode"),
4131            "set_steering_mode"
4132        );
4133        assert_eq!(
4134            normalize_command_type("setSteeringMode"),
4135            "set_steering_mode"
4136        );
4137        assert_eq!(
4138            normalize_command_type("set-follow-up-mode"),
4139            "set_follow_up_mode"
4140        );
4141        assert_eq!(
4142            normalize_command_type("setFollowUpMode"),
4143            "set_follow_up_mode"
4144        );
4145        assert_eq!(
4146            normalize_command_type("set-auto-compaction"),
4147            "set_auto_compaction"
4148        );
4149        assert_eq!(
4150            normalize_command_type("setAutoCompaction"),
4151            "set_auto_compaction"
4152        );
4153        assert_eq!(normalize_command_type("set-auto-retry"), "set_auto_retry");
4154        assert_eq!(normalize_command_type("setAutoRetry"), "set_auto_retry");
4155    }
4156
4157    // -----------------------------------------------------------------------
4158    // build_user_message
4159    // -----------------------------------------------------------------------
4160
4161    #[test]
4162    fn build_user_message_text_only() {
4163        let msg = build_user_message("hello", &[]);
4164        match msg {
4165            Message::User(UserMessage {
4166                content: UserContent::Text(text),
4167                ..
4168            }) => assert_eq!(text, "hello"),
4169            other => panic!("expected text user message, got {other:?}"),
4170        }
4171    }
4172
4173    #[test]
4174    fn build_user_message_with_images() {
4175        let images = vec![ImageContent {
4176            data: "base64data".to_string(),
4177            mime_type: "image/png".to_string(),
4178        }];
4179        let msg = build_user_message("look at this", &images);
4180        match msg {
4181            Message::User(UserMessage {
4182                content: UserContent::Blocks(blocks),
4183                ..
4184            }) => {
4185                assert_eq!(blocks.len(), 2);
4186                assert!(matches!(&blocks[0], ContentBlock::Text(_)));
4187                assert!(matches!(&blocks[1], ContentBlock::Image(_)));
4188            }
4189            other => panic!("expected blocks user message, got {other:?}"),
4190        }
4191    }
4192
4193    // -----------------------------------------------------------------------
4194    // is_extension_command
4195    // -----------------------------------------------------------------------
4196
4197    #[test]
4198    fn is_extension_command_slash_unchanged() {
4199        assert!(is_extension_command("/mycommand", "/mycommand"));
4200    }
4201
4202    #[test]
4203    fn is_extension_command_expanded_returns_false() {
4204        // If the resource loader expanded it, the expanded text differs from the original.
4205        assert!(!is_extension_command(
4206            "/prompt-name",
4207            "This is the expanded prompt text."
4208        ));
4209    }
4210
4211    #[test]
4212    fn is_extension_command_no_slash() {
4213        assert!(!is_extension_command("hello", "hello"));
4214    }
4215
4216    #[test]
4217    fn is_extension_command_leading_whitespace() {
4218        assert!(is_extension_command("  /cmd", "  /cmd"));
4219    }
4220
4221    // -----------------------------------------------------------------------
4222    // try_send_line_with_backpressure
4223    // -----------------------------------------------------------------------
4224
4225    #[test]
4226    fn try_send_line_with_backpressure_enqueues_when_capacity_available() {
4227        let (tx, _rx) = mpsc::channel::<String>(1);
4228        assert!(try_send_line_with_backpressure(&tx, "line".to_string()));
4229        assert!(matches!(
4230            tx.try_send("next".to_string()),
4231            Err(mpsc::SendError::Full(_))
4232        ));
4233    }
4234
4235    #[test]
4236    fn try_send_line_with_backpressure_stops_when_receiver_closed() {
4237        let (tx, rx) = mpsc::channel::<String>(1);
4238        drop(rx);
4239        assert!(!try_send_line_with_backpressure(&tx, "line".to_string()));
4240    }
4241
4242    #[test]
4243    fn try_send_line_with_backpressure_waits_until_capacity_is_available() {
4244        let (tx, rx) = mpsc::channel::<String>(1);
4245        tx.try_send("occupied".to_string())
4246            .expect("seed initial occupied slot");
4247
4248        let expected = "delayed-line".to_string();
4249        let expected_for_thread = expected.clone();
4250        let recv_handle = std::thread::spawn(move || {
4251            std::thread::sleep(Duration::from_millis(30));
4252            let deadline = Instant::now() + Duration::from_millis(300);
4253            let mut received = Vec::new();
4254            while received.len() < 2 && Instant::now() < deadline {
4255                if let Ok(msg) = rx.try_recv() {
4256                    received.push(msg);
4257                } else {
4258                    std::thread::sleep(Duration::from_millis(5));
4259                }
4260            }
4261            assert_eq!(received.len(), 2, "should receive both queued lines");
4262            let first = received.remove(0);
4263            let second = received.remove(0);
4264            assert_eq!(first, "occupied");
4265            assert_eq!(second, expected_for_thread);
4266        });
4267
4268        assert!(try_send_line_with_backpressure(&tx, expected));
4269        drop(tx);
4270        recv_handle.join().expect("receiver thread should finish");
4271    }
4272
4273    #[test]
4274    fn try_send_line_with_backpressure_preserves_large_payload() {
4275        let (tx, rx) = mpsc::channel::<String>(1);
4276        tx.try_send("busy".to_string())
4277            .expect("seed initial busy slot");
4278
4279        let large = "x".repeat(256 * 1024);
4280        let large_for_thread = large.clone();
4281        let recv_handle = std::thread::spawn(move || {
4282            std::thread::sleep(Duration::from_millis(30));
4283            let deadline = Instant::now() + Duration::from_millis(500);
4284            let mut received = Vec::new();
4285            while received.len() < 2 && Instant::now() < deadline {
4286                if let Ok(msg) = rx.try_recv() {
4287                    received.push(msg);
4288                } else {
4289                    std::thread::sleep(Duration::from_millis(5));
4290                }
4291            }
4292            assert_eq!(received.len(), 2, "should receive busy + payload lines");
4293            let payload = received.remove(1);
4294            assert_eq!(payload.len(), large_for_thread.len());
4295            assert_eq!(payload, large_for_thread);
4296        });
4297
4298        assert!(try_send_line_with_backpressure(&tx, large));
4299        drop(tx);
4300        recv_handle.join().expect("receiver thread should finish");
4301    }
4302
4303    #[test]
4304    fn try_send_line_with_backpressure_detects_disconnect_while_waiting() {
4305        let (tx, rx) = mpsc::channel::<String>(1);
4306        tx.try_send("busy".to_string())
4307            .expect("seed initial busy slot");
4308
4309        let drop_handle = std::thread::spawn(move || {
4310            std::thread::sleep(Duration::from_millis(30));
4311            drop(rx);
4312        });
4313
4314        assert!(
4315            !try_send_line_with_backpressure(&tx, "line-after-disconnect".to_string()),
4316            "send should stop after receiver disconnects while channel is full"
4317        );
4318        drop_handle.join().expect("drop thread should finish");
4319    }
4320
4321    #[test]
4322    fn try_send_line_with_backpressure_high_volume_preserves_order_and_count() {
4323        let (tx, rx) = mpsc::channel::<String>(4);
4324        let lines: Vec<String> = (0..256)
4325            .map(|idx| format!("line-{idx:03}: {}", "x".repeat(64)))
4326            .collect();
4327        let expected = lines.clone();
4328
4329        let recv_handle = std::thread::spawn(move || {
4330            let deadline = Instant::now() + Duration::from_secs(4);
4331            let mut received = Vec::new();
4332            while received.len() < expected.len() && Instant::now() < deadline {
4333                if let Ok(msg) = rx.try_recv() {
4334                    received.push(msg);
4335                }
4336                std::thread::sleep(Duration::from_millis(1));
4337            }
4338            assert_eq!(
4339                received.len(),
4340                expected.len(),
4341                "should receive every line under sustained backpressure"
4342            );
4343            assert_eq!(received, expected, "line ordering must remain stable");
4344        });
4345
4346        for line in lines {
4347            assert!(try_send_line_with_backpressure(&tx, line));
4348        }
4349        drop(tx);
4350        recv_handle.join().expect("receiver thread should finish");
4351    }
4352
4353    #[test]
4354    fn try_send_line_with_backpressure_preserves_partial_line_without_newline() {
4355        let (tx, rx) = mpsc::channel::<String>(1);
4356        tx.try_send("busy".to_string())
4357            .expect("seed initial busy slot");
4358
4359        let partial_json = "{\"type\":\"prompt\",\"message\":\"tail-fragment-ascii\"".to_string();
4360        let expected = partial_json.clone();
4361
4362        let recv_handle = std::thread::spawn(move || {
4363            std::thread::sleep(Duration::from_millis(25));
4364            let first = rx.try_recv().expect("seeded line should be available");
4365            assert_eq!(first, "busy");
4366            let deadline = Instant::now() + Duration::from_millis(500);
4367            let second = loop {
4368                if let Ok(line) = rx.try_recv() {
4369                    break line;
4370                }
4371                assert!(
4372                    Instant::now() < deadline,
4373                    "partial payload should be available"
4374                );
4375                std::thread::sleep(Duration::from_millis(5));
4376            };
4377            assert_eq!(second, expected);
4378        });
4379
4380        assert!(try_send_line_with_backpressure(&tx, partial_json));
4381        drop(tx);
4382        recv_handle.join().expect("receiver thread should finish");
4383    }
4384
4385    // -----------------------------------------------------------------------
4386    // RpcStateSnapshot::pending_count
4387    // -----------------------------------------------------------------------
4388
4389    #[test]
4390    fn snapshot_pending_count() {
4391        let snapshot = RpcStateSnapshot {
4392            steering_count: 3,
4393            follow_up_count: 7,
4394            steering_mode: QueueMode::All,
4395            follow_up_mode: QueueMode::OneAtATime,
4396            auto_compaction_enabled: false,
4397            auto_retry_enabled: true,
4398        };
4399        assert_eq!(snapshot.pending_count(), 10);
4400    }
4401
4402    #[test]
4403    fn snapshot_pending_count_zero() {
4404        let snapshot = RpcStateSnapshot {
4405            steering_count: 0,
4406            follow_up_count: 0,
4407            steering_mode: QueueMode::All,
4408            follow_up_mode: QueueMode::All,
4409            auto_compaction_enabled: false,
4410            auto_retry_enabled: false,
4411        };
4412        assert_eq!(snapshot.pending_count(), 0);
4413    }
4414
4415    // -----------------------------------------------------------------------
4416    // retry_delay_ms
4417    // -----------------------------------------------------------------------
4418
4419    #[test]
4420    fn retry_delay_first_attempt_is_base() {
4421        let config = Config::default();
4422        // attempt 0 and 1 should both use the base delay (shift = attempt - 1 saturating)
4423        assert_eq!(retry_delay_ms(&config, 0), config.retry_base_delay_ms());
4424        assert_eq!(retry_delay_ms(&config, 1), config.retry_base_delay_ms());
4425    }
4426
4427    #[test]
4428    fn retry_delay_doubles_each_attempt() {
4429        let config = Config::default();
4430        let base = config.retry_base_delay_ms();
4431        // attempt 2: base * 2, attempt 3: base * 4
4432        assert_eq!(retry_delay_ms(&config, 2), base * 2);
4433        assert_eq!(retry_delay_ms(&config, 3), base * 4);
4434    }
4435
4436    #[test]
4437    fn retry_delay_capped_at_max() {
4438        let config = Config::default();
4439        let max = config.retry_max_delay_ms();
4440        // Large attempt number should be capped
4441        let delay = retry_delay_ms(&config, 30);
4442        assert_eq!(delay, max);
4443    }
4444
4445    #[test]
4446    fn retry_delay_saturates_on_overflow() {
4447        let config = Config::default();
4448        // u32::MAX attempt should not panic
4449        let delay = retry_delay_ms(&config, u32::MAX);
4450        assert!(delay <= config.retry_max_delay_ms());
4451    }
4452
4453    // -----------------------------------------------------------------------
4454    // should_auto_compact
4455    // -----------------------------------------------------------------------
4456
4457    #[test]
4458    fn auto_compact_below_threshold() {
4459        // 50k tokens used, 200k window, 40k reserve → threshold = 160k → no compact
4460        assert!(!should_auto_compact(50_000, 200_000, 40_000));
4461    }
4462
4463    #[test]
4464    fn auto_compact_above_threshold() {
4465        // 170k tokens used, 200k window, 40k reserve → threshold = 160k → compact
4466        assert!(should_auto_compact(170_000, 200_000, 40_000));
4467    }
4468
4469    #[test]
4470    fn auto_compact_exact_threshold() {
4471        // Exactly at threshold → not above → no compact
4472        assert!(!should_auto_compact(160_000, 200_000, 40_000));
4473    }
4474
4475    #[test]
4476    fn auto_compact_reserve_exceeds_window() {
4477        // reserve > window → window - reserve saturates to 0 → any tokens > 0 triggers compact
4478        assert!(should_auto_compact(1, 100, 200));
4479    }
4480
4481    #[test]
4482    fn auto_compact_zero_tokens() {
4483        assert!(!should_auto_compact(0, 200_000, 40_000));
4484    }
4485
4486    // -----------------------------------------------------------------------
4487    // rpc_flatten_content_blocks
4488    // -----------------------------------------------------------------------
4489
4490    #[test]
4491    fn flatten_content_blocks_unwraps_inner_0() {
4492        let mut value = json!({
4493            "content": [
4494                {"0": {"type": "text", "text": "hello"}}
4495            ]
4496        });
4497        rpc_flatten_content_blocks(&mut value);
4498        let blocks = value["content"].as_array().unwrap();
4499        assert_eq!(blocks[0]["type"], "text");
4500        assert_eq!(blocks[0]["text"], "hello");
4501        assert!(blocks[0].get("0").is_none());
4502    }
4503
4504    #[test]
4505    fn flatten_content_blocks_preserves_non_wrapped() {
4506        let mut value = json!({
4507            "content": [
4508                {"type": "text", "text": "already flat"}
4509            ]
4510        });
4511        rpc_flatten_content_blocks(&mut value);
4512        let blocks = value["content"].as_array().unwrap();
4513        assert_eq!(blocks[0]["type"], "text");
4514        assert_eq!(blocks[0]["text"], "already flat");
4515    }
4516
4517    #[test]
4518    fn flatten_content_blocks_no_content_field() {
4519        let mut value = json!({"role": "assistant"});
4520        rpc_flatten_content_blocks(&mut value); // should not panic
4521        assert_eq!(value, json!({"role": "assistant"}));
4522    }
4523
4524    #[test]
4525    fn flatten_content_blocks_non_object() {
4526        let mut value = json!("just a string");
4527        rpc_flatten_content_blocks(&mut value); // should not panic
4528    }
4529
4530    #[test]
4531    fn flatten_content_blocks_existing_keys_not_overwritten() {
4532        // If a block already has a key that conflicts with inner "0", preserve outer
4533        let mut value = json!({
4534            "content": [
4535                {"type": "existing", "0": {"type": "inner", "extra": "data"}}
4536            ]
4537        });
4538        rpc_flatten_content_blocks(&mut value);
4539        let blocks = value["content"].as_array().unwrap();
4540        // "type" should keep the outer "existing" value, not be overwritten by inner "inner"
4541        assert_eq!(blocks[0]["type"], "existing");
4542        // "extra" from inner should be merged in
4543        assert_eq!(blocks[0]["extra"], "data");
4544    }
4545
4546    // -----------------------------------------------------------------------
4547    // parse_prompt_images
4548    // -----------------------------------------------------------------------
4549
4550    #[test]
4551    fn parse_prompt_images_none() {
4552        let images = parse_prompt_images(None).unwrap();
4553        assert!(images.is_empty());
4554    }
4555
4556    #[test]
4557    fn parse_prompt_images_empty_array() {
4558        let val = json!([]);
4559        let images = parse_prompt_images(Some(&val)).unwrap();
4560        assert!(images.is_empty());
4561    }
4562
4563    #[test]
4564    fn parse_prompt_images_valid() {
4565        let val = json!([{
4566            "type": "image",
4567            "source": {
4568                "type": "base64",
4569                "mediaType": "image/png",
4570                "data": "iVBORw0KGgo="
4571            }
4572        }]);
4573        let images = parse_prompt_images(Some(&val)).unwrap();
4574        assert_eq!(images.len(), 1);
4575        assert_eq!(images[0].mime_type, "image/png");
4576        assert_eq!(images[0].data, "iVBORw0KGgo=");
4577    }
4578
4579    #[test]
4580    fn parse_prompt_images_skips_non_image_type() {
4581        let val = json!([{
4582            "type": "text",
4583            "text": "hello"
4584        }]);
4585        let images = parse_prompt_images(Some(&val)).unwrap();
4586        assert!(images.is_empty());
4587    }
4588
4589    #[test]
4590    fn parse_prompt_images_skips_non_base64_source() {
4591        let val = json!([{
4592            "type": "image",
4593            "source": {
4594                "type": "url",
4595                "url": "https://example.com/img.png"
4596            }
4597        }]);
4598        let images = parse_prompt_images(Some(&val)).unwrap();
4599        assert!(images.is_empty());
4600    }
4601
4602    #[test]
4603    fn parse_prompt_images_not_array_errors() {
4604        let val = json!("not-an-array");
4605        assert!(parse_prompt_images(Some(&val)).is_err());
4606    }
4607
4608    #[test]
4609    fn parse_prompt_images_multiple_valid() {
4610        let val = json!([
4611            {
4612                "type": "image",
4613                "source": {"type": "base64", "mediaType": "image/jpeg", "data": "abc"}
4614            },
4615            {
4616                "type": "image",
4617                "source": {"type": "base64", "mediaType": "image/webp", "data": "def"}
4618            }
4619        ]);
4620        let images = parse_prompt_images(Some(&val)).unwrap();
4621        assert_eq!(images.len(), 2);
4622        assert_eq!(images[0].mime_type, "image/jpeg");
4623        assert_eq!(images[1].mime_type, "image/webp");
4624    }
4625
4626    // -----------------------------------------------------------------------
4627    // extract_user_text
4628    // -----------------------------------------------------------------------
4629
4630    #[test]
4631    fn extract_user_text_from_text_content() {
4632        let content = UserContent::Text("hello world".to_string());
4633        assert_eq!(extract_user_text(&content), Some("hello world".to_string()));
4634    }
4635
4636    #[test]
4637    fn extract_user_text_from_blocks() {
4638        let content = UserContent::Blocks(vec![
4639            ContentBlock::Image(ImageContent {
4640                data: String::new(),
4641                mime_type: "image/png".to_string(),
4642            }),
4643            ContentBlock::Text(TextContent::new("found it")),
4644        ]);
4645        assert_eq!(extract_user_text(&content), Some("found it".to_string()));
4646    }
4647
4648    #[test]
4649    fn extract_user_text_blocks_no_text() {
4650        let content = UserContent::Blocks(vec![ContentBlock::Image(ImageContent {
4651            data: String::new(),
4652            mime_type: "image/png".to_string(),
4653        })]);
4654        assert_eq!(extract_user_text(&content), None);
4655    }
4656
4657    // -----------------------------------------------------------------------
4658    // parse_thinking_level
4659    // -----------------------------------------------------------------------
4660
4661    #[test]
4662    fn parse_thinking_level_all_variants() {
4663        assert_eq!(parse_thinking_level("off").unwrap(), ThinkingLevel::Off);
4664        assert_eq!(parse_thinking_level("none").unwrap(), ThinkingLevel::Off);
4665        assert_eq!(parse_thinking_level("0").unwrap(), ThinkingLevel::Off);
4666        assert_eq!(
4667            parse_thinking_level("minimal").unwrap(),
4668            ThinkingLevel::Minimal
4669        );
4670        assert_eq!(parse_thinking_level("min").unwrap(), ThinkingLevel::Minimal);
4671        assert_eq!(parse_thinking_level("low").unwrap(), ThinkingLevel::Low);
4672        assert_eq!(parse_thinking_level("1").unwrap(), ThinkingLevel::Low);
4673        assert_eq!(
4674            parse_thinking_level("medium").unwrap(),
4675            ThinkingLevel::Medium
4676        );
4677        assert_eq!(parse_thinking_level("med").unwrap(), ThinkingLevel::Medium);
4678        assert_eq!(parse_thinking_level("2").unwrap(), ThinkingLevel::Medium);
4679        assert_eq!(parse_thinking_level("high").unwrap(), ThinkingLevel::High);
4680        assert_eq!(parse_thinking_level("3").unwrap(), ThinkingLevel::High);
4681        assert_eq!(parse_thinking_level("xhigh").unwrap(), ThinkingLevel::XHigh);
4682        assert_eq!(parse_thinking_level("4").unwrap(), ThinkingLevel::XHigh);
4683    }
4684
4685    #[test]
4686    fn parse_thinking_level_case_insensitive() {
4687        assert_eq!(parse_thinking_level("HIGH").unwrap(), ThinkingLevel::High);
4688        assert_eq!(
4689            parse_thinking_level("Medium").unwrap(),
4690            ThinkingLevel::Medium
4691        );
4692        assert_eq!(parse_thinking_level("  Off  ").unwrap(), ThinkingLevel::Off);
4693    }
4694
4695    #[test]
4696    fn parse_thinking_level_invalid() {
4697        assert!(parse_thinking_level("invalid").is_err());
4698        assert!(parse_thinking_level("").is_err());
4699        assert!(parse_thinking_level("5").is_err());
4700    }
4701
4702    // -----------------------------------------------------------------------
4703    // supports_xhigh + clamp_thinking_level
4704    // -----------------------------------------------------------------------
4705
4706    #[test]
4707    fn supports_xhigh_known_models() {
4708        assert!(dummy_entry("gpt-5.1-codex-max", true).supports_xhigh());
4709        assert!(dummy_entry("gpt-5.2", true).supports_xhigh());
4710        assert!(dummy_entry("gpt-5.2-codex", true).supports_xhigh());
4711        assert!(dummy_entry("gpt-5.3-codex", true).supports_xhigh());
4712    }
4713
4714    #[test]
4715    fn supports_xhigh_unknown_models() {
4716        assert!(!dummy_entry("claude-opus-4-6", true).supports_xhigh());
4717        assert!(!dummy_entry("gpt-4o", true).supports_xhigh());
4718        assert!(!dummy_entry("", true).supports_xhigh());
4719    }
4720
4721    #[test]
4722    fn clamp_thinking_non_reasoning_model() {
4723        let entry = dummy_entry("claude-3-haiku", false);
4724        assert_eq!(
4725            entry.clamp_thinking_level(ThinkingLevel::High),
4726            ThinkingLevel::Off
4727        );
4728    }
4729
4730    #[test]
4731    fn clamp_thinking_xhigh_without_support() {
4732        let entry = dummy_entry("claude-opus-4-6", true);
4733        assert_eq!(
4734            entry.clamp_thinking_level(ThinkingLevel::XHigh),
4735            ThinkingLevel::High
4736        );
4737    }
4738
4739    #[test]
4740    fn clamp_thinking_xhigh_with_support() {
4741        let entry = dummy_entry("gpt-5.2", true);
4742        assert_eq!(
4743            entry.clamp_thinking_level(ThinkingLevel::XHigh),
4744            ThinkingLevel::XHigh
4745        );
4746    }
4747
4748    #[test]
4749    fn clamp_thinking_normal_level_passthrough() {
4750        let entry = dummy_entry("claude-opus-4-6", true);
4751        assert_eq!(
4752            entry.clamp_thinking_level(ThinkingLevel::Medium),
4753            ThinkingLevel::Medium
4754        );
4755    }
4756
4757    // -----------------------------------------------------------------------
4758    // available_thinking_levels
4759    // -----------------------------------------------------------------------
4760
4761    #[test]
4762    fn available_thinking_levels_non_reasoning() {
4763        let entry = dummy_entry("gpt-4o-mini", false);
4764        let levels = available_thinking_levels(&entry);
4765        assert_eq!(levels, vec![ThinkingLevel::Off]);
4766    }
4767
4768    #[test]
4769    fn available_thinking_levels_reasoning_no_xhigh() {
4770        let entry = dummy_entry("claude-opus-4-6", true);
4771        let levels = available_thinking_levels(&entry);
4772        assert_eq!(
4773            levels,
4774            vec![
4775                ThinkingLevel::Off,
4776                ThinkingLevel::Minimal,
4777                ThinkingLevel::Low,
4778                ThinkingLevel::Medium,
4779                ThinkingLevel::High,
4780            ]
4781        );
4782    }
4783
4784    #[test]
4785    fn available_thinking_levels_reasoning_with_xhigh() {
4786        let entry = dummy_entry("gpt-5.2", true);
4787        let levels = available_thinking_levels(&entry);
4788        assert_eq!(
4789            levels,
4790            vec![
4791                ThinkingLevel::Off,
4792                ThinkingLevel::Minimal,
4793                ThinkingLevel::Low,
4794                ThinkingLevel::Medium,
4795                ThinkingLevel::High,
4796                ThinkingLevel::XHigh,
4797            ]
4798        );
4799    }
4800
4801    // -----------------------------------------------------------------------
4802    // rpc_model_from_entry
4803    // -----------------------------------------------------------------------
4804
4805    #[test]
4806    fn rpc_model_from_entry_basic() {
4807        let entry = dummy_entry("claude-opus-4-6", true);
4808        let value = rpc_model_from_entry(&entry);
4809        assert_eq!(value["id"], "claude-opus-4-6");
4810        assert_eq!(value["name"], "claude-opus-4-6");
4811        assert_eq!(value["provider"], "anthropic");
4812        assert_eq!(value["reasoning"], true);
4813        assert_eq!(value["contextWindow"], 200_000);
4814        assert_eq!(value["maxTokens"], 8192);
4815    }
4816
4817    #[test]
4818    fn rpc_model_from_entry_input_types() {
4819        let mut entry = dummy_entry("gpt-4o", false);
4820        entry.model.input = vec![InputType::Text, InputType::Image];
4821        let value = rpc_model_from_entry(&entry);
4822        let input = value["input"].as_array().unwrap();
4823        assert_eq!(input.len(), 2);
4824        assert_eq!(input[0], "text");
4825        assert_eq!(input[1], "image");
4826    }
4827
4828    #[test]
4829    fn rpc_model_from_entry_cost_present() {
4830        let entry = dummy_entry("test-model", false);
4831        let value = rpc_model_from_entry(&entry);
4832        assert!(value.get("cost").is_some());
4833        let cost = &value["cost"];
4834        assert_eq!(cost["input"], 3.0);
4835        assert_eq!(cost["output"], 15.0);
4836    }
4837
4838    #[test]
4839    fn current_model_entry_matches_provider_alias_and_model_case() {
4840        let mut model = dummy_entry("gpt-4o-mini", true);
4841        model.model.provider = "openrouter".to_string();
4842        let options = rpc_options_with_models(vec![model]);
4843
4844        let mut session = Session::in_memory();
4845        session.header.provider = Some("open-router".to_string());
4846        session.header.model_id = Some("GPT-4O-MINI".to_string());
4847
4848        let resolved = current_model_entry(&session, &options).expect("resolve aliased model");
4849        assert_eq!(resolved.model.provider, "openrouter");
4850        assert_eq!(resolved.model.id, "gpt-4o-mini");
4851    }
4852
4853    #[test]
4854    fn session_state_resolves_model_for_provider_alias() {
4855        let mut model = dummy_entry("gpt-4o-mini", true);
4856        model.model.provider = "openrouter".to_string();
4857        let options = rpc_options_with_models(vec![model]);
4858
4859        let mut session = Session::in_memory();
4860        session.header.provider = Some("open-router".to_string());
4861        session.header.model_id = Some("gpt-4o-mini".to_string());
4862
4863        let snapshot = RpcStateSnapshot {
4864            steering_count: 0,
4865            follow_up_count: 0,
4866            steering_mode: QueueMode::OneAtATime,
4867            follow_up_mode: QueueMode::OneAtATime,
4868            auto_compaction_enabled: false,
4869            auto_retry_enabled: false,
4870        };
4871
4872        let state = session_state(&session, &options, &snapshot, false, false);
4873        assert_eq!(state["model"]["provider"], "openrouter");
4874        assert_eq!(state["model"]["id"], "gpt-4o-mini");
4875    }
4876
4877    // -----------------------------------------------------------------------
4878    // error_hints_value
4879    // -----------------------------------------------------------------------
4880
4881    #[test]
4882    fn error_hints_value_produces_expected_shape() {
4883        let error = Error::validation("test error");
4884        let value = error_hints_value(&error);
4885        assert!(value.get("summary").is_some());
4886        assert!(value.get("hints").is_some());
4887        assert!(value.get("contextFields").is_some());
4888        assert!(value["hints"].is_array());
4889    }
4890
4891    // -----------------------------------------------------------------------
4892    // rpc_parse_extension_ui_response_id edge cases
4893    // -----------------------------------------------------------------------
4894
4895    #[test]
4896    fn parse_ui_response_id_empty_string() {
4897        let value = json!({"requestId": ""});
4898        assert_eq!(rpc_parse_extension_ui_response_id(&value), None);
4899    }
4900
4901    #[test]
4902    fn parse_ui_response_id_whitespace_only() {
4903        let value = json!({"requestId": "   "});
4904        assert_eq!(rpc_parse_extension_ui_response_id(&value), None);
4905    }
4906
4907    #[test]
4908    fn parse_ui_response_id_trims() {
4909        let value = json!({"requestId": "  req-1  "});
4910        assert_eq!(
4911            rpc_parse_extension_ui_response_id(&value),
4912            Some("req-1".to_string())
4913        );
4914    }
4915
4916    #[test]
4917    fn parse_ui_response_id_prefers_request_id_over_id_alias() {
4918        let value = json!({"requestId": "req-1", "id": "legacy-id"});
4919        assert_eq!(
4920            rpc_parse_extension_ui_response_id(&value),
4921            Some("req-1".to_string())
4922        );
4923    }
4924
4925    #[test]
4926    fn parse_ui_response_id_falls_back_to_id_alias_when_request_id_not_string() {
4927        let value = json!({"requestId": 123, "id": "legacy-id"});
4928        assert_eq!(
4929            rpc_parse_extension_ui_response_id(&value),
4930            Some("legacy-id".to_string())
4931        );
4932    }
4933
4934    #[test]
4935    fn parse_ui_response_id_falls_back_to_id_alias_when_request_id_blank() {
4936        let value = json!({"requestId": "", "id": "legacy-id"});
4937        assert_eq!(
4938            rpc_parse_extension_ui_response_id(&value),
4939            Some("legacy-id".to_string())
4940        );
4941    }
4942
4943    #[test]
4944    fn parse_ui_response_id_falls_back_to_id_alias_when_request_id_whitespace() {
4945        let value = json!({"requestId": "   ", "id": "legacy-id"});
4946        assert_eq!(
4947            rpc_parse_extension_ui_response_id(&value),
4948            Some("legacy-id".to_string())
4949        );
4950    }
4951
4952    #[test]
4953    fn parse_ui_response_id_neither_field() {
4954        let value = json!({"type": "something"});
4955        assert_eq!(rpc_parse_extension_ui_response_id(&value), None);
4956    }
4957
4958    // -----------------------------------------------------------------------
4959    // rpc_parse_extension_ui_response edge cases
4960    // -----------------------------------------------------------------------
4961
4962    #[test]
4963    fn parse_editor_response_requires_string() {
4964        let active = ExtensionUiRequest::new("req-1", "editor", json!({"title": "t"}));
4965        let ok = json!({"type": "extension_ui_response", "requestId": "req-1", "value": "code"});
4966        assert!(rpc_parse_extension_ui_response(&ok, &active).is_ok());
4967
4968        let bad = json!({"type": "extension_ui_response", "requestId": "req-1", "value": 42});
4969        assert!(rpc_parse_extension_ui_response(&bad, &active).is_err());
4970    }
4971
4972    #[test]
4973    fn parse_notify_response_returns_ack() {
4974        let active = ExtensionUiRequest::new("req-1", "notify", json!({"title": "t"}));
4975        let val = json!({"type": "extension_ui_response", "requestId": "req-1"});
4976        let resp = rpc_parse_extension_ui_response(&val, &active).unwrap();
4977        assert!(!resp.cancelled);
4978    }
4979
4980    #[test]
4981    fn parse_unknown_method_errors() {
4982        let active = ExtensionUiRequest::new("req-1", "unknown_method", json!({}));
4983        let val = json!({"type": "extension_ui_response", "requestId": "req-1"});
4984        assert!(rpc_parse_extension_ui_response(&val, &active).is_err());
4985    }
4986
4987    #[test]
4988    fn parse_select_with_object_options() {
4989        let active = ExtensionUiRequest::new(
4990            "req-1",
4991            "select",
4992            json!({"title": "pick", "options": [{"label": "Alpha", "value": "a"}, {"label": "Beta"}]}),
4993        );
4994        // Selecting by value key
4995        let val_a = json!({"type": "extension_ui_response", "requestId": "req-1", "value": "a"});
4996        let resp = rpc_parse_extension_ui_response(&val_a, &active).unwrap();
4997        assert_eq!(resp.value, Some(json!("a")));
4998
4999        // Selecting by label fallback (no value key in option)
5000        let val_b = json!({"type": "extension_ui_response", "requestId": "req-1", "value": "Beta"});
5001        let resp = rpc_parse_extension_ui_response(&val_b, &active).unwrap();
5002        assert_eq!(resp.value, Some(json!("Beta")));
5003    }
5004}