Skip to main content

github_copilot_sdk/
session.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use parking_lot::Mutex as ParkingLotMutex;
7use serde_json::Value;
8use tokio::sync::oneshot;
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tracing::{Instrument, warn};
12
13use crate::canvas::CanvasHandler;
14use crate::generated::api_types::{LogRequest, ModelSwitchToRequest, OpenCanvasInstance};
15use crate::generated::session_events::{
16    CommandExecuteData, ElicitationRequestedData, ExternalToolRequestedData,
17    SessionCanvasClosedData, SessionErrorData, SessionEventType,
18};
19use crate::handler::{
20    AutoModeSwitchHandler, AutoModeSwitchResponse, ElicitationHandler, ExitPlanModeHandler,
21    PermissionHandler, PermissionResult, UserInputHandler, UserInputResponse,
22};
23use crate::hooks::SessionHooks;
24use crate::session_fs::SessionFsProvider;
25use crate::trace_context::inject_trace_context;
26use crate::transforms::SystemMessageTransform;
27use crate::types::{
28    CommandContext, CommandDefinition, CommandHandler, CreateSessionResult, ElicitationRequest,
29    ElicitationResult, ExitPlanModeData, GetMessagesResponse, MessageOptions,
30    PermissionRequestData, RequestId, ResumeSessionConfig, ResumeSessionResult, SectionOverride,
31    SessionCapabilities, SessionConfig, SessionEvent, SessionId, SetModelOptions,
32    SystemMessageConfig, ToolInvocation, ToolResult, ToolResultExpanded, TraceContext,
33    UiInputOptions, ensure_attachment_display_names,
34};
35use crate::{
36    Client, Error, ErrorKind, JsonRpcResponse, SessionErrorKind, SessionEventNotification,
37    error_codes,
38};
39
40/// Bundle of the per-session callbacks the SDK dispatches to. Built from a
41/// [`SessionConfig`] / [`ResumeSessionConfig`] at
42/// [`Client::create_session`] / [`Client::resume_session`] time. Each
43/// field is `None` (or an empty map for tools) when the caller didn't
44/// install a handler -- in that case the SDK skips dispatch for that
45/// event type. The wire flags on `session.create` / `session.resume`
46/// are derived from these fields.
47#[derive(Clone)]
48pub(crate) struct SessionHandlers {
49    pub permission: Option<Arc<dyn PermissionHandler>>,
50    pub elicitation: Option<Arc<dyn ElicitationHandler>>,
51    pub user_input: Option<Arc<dyn UserInputHandler>>,
52    pub exit_plan_mode: Option<Arc<dyn ExitPlanModeHandler>>,
53    pub auto_mode_switch: Option<Arc<dyn AutoModeSwitchHandler>>,
54    pub tools: Arc<HashMap<String, Arc<dyn crate::tool::ToolHandler>>>,
55}
56
57/// Shared state between a [`Session`] and its event loop, used by [`Session::send_and_wait`].
58struct IdleWaiter {
59    tx: oneshot::Sender<Result<Option<SessionEvent>, Error>>,
60    last_assistant_message: Option<SessionEvent>,
61    started_at: Instant,
62    first_assistant_message_seen: bool,
63}
64
65/// RAII guard that clears the [`Session::idle_waiter`] slot on drop. Used
66/// by [`Session::send_and_wait`] to ensure the slot doesn't leak if the
67/// caller's future is cancelled (outer `tokio::time::timeout` / `select!`
68/// / dropped JoinHandle). Synchronous clear via `parking_lot::Mutex` —
69/// no async drop needed.
70///
71/// Without this, an outer cancellation between "install waiter" and
72/// "drain channel" would leave the slot occupied, causing all subsequent
73/// `send` and `send_and_wait` calls on the session to return
74/// [`SendWhileWaiting`](SessionErrorKind::SendWhileWaiting). Closes RFD-400
75/// review finding #2.
76struct WaiterGuard {
77    slot: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
78}
79
80impl Drop for WaiterGuard {
81    fn drop(&mut self) {
82        self.slot.lock().take();
83    }
84}
85
86struct PendingSessionRegistration {
87    client: Client,
88    session_id: SessionId,
89    shutdown: CancellationToken,
90    disarmed: bool,
91}
92
93impl PendingSessionRegistration {
94    fn new(client: Client, session_id: SessionId, shutdown: CancellationToken) -> Self {
95        Self {
96            client,
97            session_id,
98            shutdown,
99            disarmed: false,
100        }
101    }
102
103    async fn cleanup(mut self, event_loop: JoinHandle<()>) {
104        self.shutdown.cancel();
105        let _ = event_loop.await;
106        self.client.unregister_session(&self.session_id);
107        self.disarmed = true;
108    }
109
110    fn disarm(&mut self) {
111        self.disarmed = true;
112    }
113}
114
115impl Drop for PendingSessionRegistration {
116    fn drop(&mut self) {
117        if !self.disarmed {
118            self.shutdown.cancel();
119            self.client.unregister_session(&self.session_id);
120        }
121    }
122}
123
124/// A session on a GitHub Copilot CLI server.
125///
126/// Created via [`Client::create_session`] or [`Client::resume_session`].
127/// Owns an internal event loop that dispatches events to the per-callback
128/// handlers installed on the session config.
129///
130/// Protocol methods (`send`, `get_events`, `abort`, etc.) automatically
131/// inject the session ID into RPC params.
132///
133/// Call [`destroy`](Self::destroy) for graceful cleanup (RPC + local). If dropped
134/// without calling `destroy`, the `Drop` impl aborts the event loop and
135/// unregisters from the router as a best-effort safety net.
136pub struct Session {
137    id: SessionId,
138    cwd: PathBuf,
139    workspace_path: Option<PathBuf>,
140    remote_url: Option<String>,
141    client: Client,
142    /// Handle to the spawned event-loop task. Sync `parking_lot::Mutex`
143    /// because the lock is never held across an `.await` and the `Drop`
144    /// impl needs to take the handle synchronously without `try_lock`
145    /// fallibility.
146    event_loop: ParkingLotMutex<Option<JoinHandle<()>>>,
147    /// Cooperative shutdown signal for the event loop. The loop selects
148    /// on [`shutdown.cancelled()`](CancellationToken::cancelled) alongside
149    /// its inbound channels; [`Session::stop_event_loop`] and [`Drop`]
150    /// both call [`cancel()`](CancellationToken::cancel) to ask the loop
151    /// to exit between iterations rather than aborting the task (which
152    /// can land at any await point and leave the session mid-protocol).
153    /// See RFD-400 review finding #3.
154    ///
155    /// `CancellationToken` is the canonical signalling primitive in
156    /// `tokio_util`; it is what `tonic` uses for the equivalent task-
157    /// coordination case. Advanced consumers can obtain a child token
158    /// via [`Session::cancellation_token`] to bind their own work to
159    /// the session lifetime.
160    shutdown: CancellationToken,
161    /// Only populated while a `send_and_wait` call is in flight.
162    ///
163    /// Sync `parking_lot::Mutex` because the lock is never held across an
164    /// `.await`, and synchronous access lets the `WaiterGuard` RAII helper
165    /// in `send_and_wait` clear the slot from a `Drop` impl on caller-side
166    /// cancellation. See RFD-400 review (cancel-safety hardening).
167    idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
168    /// Capabilities negotiated with the CLI, updated on `capabilities.changed` events.
169    capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
170    /// Canvas instances currently known to be open for this session.
171    open_canvases: Arc<parking_lot::RwLock<Vec<OpenCanvasInstance>>>,
172    /// Broadcast channel for runtime event subscribers — see [`Session::subscribe`].
173    event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
174}
175
176impl Session {
177    /// Session ID assigned by the CLI.
178    pub fn id(&self) -> &SessionId {
179        &self.id
180    }
181
182    /// Working directory of the CLI process.
183    pub fn cwd(&self) -> &PathBuf {
184        &self.cwd
185    }
186
187    /// Workspace directory for the session (if using infinite sessions).
188    pub fn workspace_path(&self) -> Option<&Path> {
189        self.workspace_path.as_deref()
190    }
191
192    /// Remote session URL, if the session is running remotely.
193    pub fn remote_url(&self) -> Option<&str> {
194        self.remote_url.as_deref()
195    }
196
197    /// Session capabilities negotiated with the CLI.
198    ///
199    /// Capabilities are set during session creation and updated at runtime
200    /// via `capabilities.changed` events.
201    pub fn capabilities(&self) -> SessionCapabilities {
202        self.capabilities.read().clone()
203    }
204
205    /// Open canvas instances reported by the most recent `session.resume`
206    /// response or surfaced by inbound `canvas.opened` events.
207    pub fn open_canvases(&self) -> Vec<OpenCanvasInstance> {
208        self.open_canvases.read().clone()
209    }
210
211    /// Returns a [`CancellationToken`] that fires when this session shuts
212    /// down (via [`Session::stop_event_loop`], [`Session::destroy`], or
213    /// [`Drop`]).
214    ///
215    /// Use this to bind an external task's lifetime to the session — when
216    /// the session shuts down, awaiting [`cancelled()`](CancellationToken::cancelled)
217    /// resolves so cooperative consumers can stop cleanly.
218    ///
219    /// The returned handle is a *child* token: calling
220    /// [`cancel()`](CancellationToken::cancel) on it cancels only the
221    /// caller's child, not the session itself. To cancel the session, call
222    /// [`Session::stop_event_loop`].
223    ///
224    /// # Example
225    ///
226    /// ```no_run
227    /// # async fn example(session: github_copilot_sdk::session::Session) {
228    /// let token = session.cancellation_token();
229    /// tokio::select! {
230    ///     _ = token.cancelled() => println!("session shut down"),
231    ///     _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
232    ///         println!("60s elapsed, session still alive");
233    ///     }
234    /// }
235    /// # }
236    /// ```
237    pub fn cancellation_token(&self) -> CancellationToken {
238        self.shutdown.child_token()
239    }
240
241    /// Subscribe to events for this session.
242    ///
243    /// Returns an [`EventSubscription`](crate::subscription::EventSubscription)
244    /// that yields every [`SessionEvent`] dispatched on this session's
245    /// event loop. Drop the value to unsubscribe; there is no separate
246    /// cancel handle.
247    ///
248    /// **Observe-only.** Subscribers receive a clone of every
249    /// [`SessionEvent`] but cannot influence permission decisions, tool
250    /// results, or anything else that requires returning a value. Those
251    /// remain the responsibility of the per-callback handlers passed via
252    /// [`SessionConfig`]'s `with_*_handler`
253    /// builder methods.
254    ///
255    /// The returned handle implements both an inherent
256    /// [`recv`](crate::subscription::EventSubscription::recv) method and
257    /// [`Stream`](tokio_stream::Stream), so callers can use a `while let`
258    /// loop or any combinator from `tokio_stream::StreamExt` /
259    /// `futures::StreamExt`.
260    ///
261    /// Each subscriber maintains its own queue. If a consumer cannot keep
262    /// up, the oldest events are dropped and `recv` returns
263    /// [`RecvErrorKind::Lagged`](crate::subscription::RecvErrorKind::Lagged)
264    /// reporting the count of skipped events. Slow consumers do not block
265    /// the session's event loop.
266    ///
267    /// # Example
268    ///
269    /// ```no_run
270    /// # async fn example(session: github_copilot_sdk::session::Session) {
271    /// let mut events = session.subscribe();
272    /// tokio::spawn(async move {
273    ///     while let Ok(event) = events.recv().await {
274    ///         println!("[{}] event {}", event.id, event.event_type);
275    ///     }
276    /// });
277    /// # }
278    /// ```
279    pub fn subscribe(&self) -> crate::subscription::EventSubscription {
280        crate::subscription::EventSubscription::new(self.event_tx.subscribe())
281    }
282
283    /// The underlying Client (for advanced use cases).
284    pub fn client(&self) -> &Client {
285        &self.client
286    }
287
288    /// Typed RPC namespace for this session.
289    ///
290    /// Every protocol method lives here under its schema-aligned path —
291    /// e.g. `session.rpc().workspaces().list_files()`. Wire method names
292    /// and request/response types are generated from the protocol schema,
293    /// so the typed namespace can't drift from the wire contract.
294    ///
295    /// The hand-authored helpers on [`Session`] delegate to this namespace
296    /// and remain the recommended entry point for everyday use; reach for
297    /// `rpc()` when you want a method without a hand-written wrapper.
298    pub fn rpc(&self) -> crate::generated::rpc::SessionRpc<'_> {
299        crate::generated::rpc::SessionRpc { session: self }
300    }
301
302    /// Stop the internal event loop. Called automatically on [`destroy`](Self::destroy).
303    ///
304    /// Cooperative: signals shutdown via the session's [`CancellationToken`]
305    /// and awaits the loop's natural exit rather than aborting the task.
306    /// Any in-flight handler (permission callback, tool call, elicitation
307    /// response) completes before the loop exits, so the CLI never sees a
308    /// half-handled request. See RFD-400 review finding #3.
309    pub async fn stop_event_loop(&self) {
310        self.shutdown.cancel();
311        let handle = self.event_loop.lock().take();
312        if let Some(handle) = handle {
313            let _ = handle.await;
314        }
315        // Fail any pending send_and_wait so it returns immediately.
316        if let Some(waiter) = self.idle_waiter.lock().take() {
317            let _ = waiter.tx.send(Err(
318                ErrorKind::Session(SessionErrorKind::EventLoopClosed).into()
319            ));
320        }
321    }
322
323    /// Send a user message to the agent.
324    ///
325    /// Accepts anything convertible to [`MessageOptions`] — pass a `&str` for the
326    /// trivial case, or build a `MessageOptions` for mode/attachments. The
327    /// `wait_timeout` field on `MessageOptions` is ignored here (use
328    /// [`send_and_wait`](Self::send_and_wait) if you need to wait).
329    ///
330    /// Returns the assigned message ID, which can be used to correlate the
331    /// send with later [`SessionEvent`]s emitted in
332    /// response (assistant messages, tool requests, etc.).
333    ///
334    /// Returns an error if a [`send_and_wait`](Self::send_and_wait) call is
335    /// currently in flight, since the plain send would race with the waiter.
336    ///
337    /// # Cancel safety
338    ///
339    /// **Cancel-safe.** The underlying `session.send` RPC is dispatched
340    /// through the writer-actor (see [`Client::call`](crate::Client::call)),
341    /// so dropping this future after the actor has committed to writing
342    /// will not produce a partial frame on the wire. If the caller's
343    /// future is dropped between "frame enqueued" and "response received",
344    /// the message has already landed on the wire — the agent will process
345    /// it and emit events normally; the caller just won't see the returned
346    /// message ID.
347    pub async fn send(&self, opts: impl Into<MessageOptions>) -> Result<String, Error> {
348        if self.idle_waiter.lock().is_some() {
349            return Err(ErrorKind::Session(SessionErrorKind::SendWhileWaiting).into());
350        }
351        self.send_inner(opts.into()).await
352    }
353
354    async fn send_inner(&self, opts: MessageOptions) -> Result<String, Error> {
355        let mut params = serde_json::json!({
356            "sessionId": self.id,
357            "prompt": opts.prompt,
358        });
359        if let Some(m) = opts.mode {
360            params["mode"] = serde_json::to_value(m)?;
361        }
362        if let Some(am) = opts.agent_mode {
363            params["agentMode"] = serde_json::to_value(am)?;
364        }
365        if let Some(mut a) = opts.attachments {
366            ensure_attachment_display_names(&mut a);
367            params["attachments"] = serde_json::to_value(a)?;
368        }
369        if let Some(headers) = opts.request_headers
370            && !headers.is_empty()
371        {
372            params["requestHeaders"] = serde_json::to_value(headers)?;
373        }
374        if let Some(display_prompt) = opts.display_prompt {
375            params["displayPrompt"] = serde_json::to_value(display_prompt)?;
376        }
377        let trace_ctx = if opts.traceparent.is_some() || opts.tracestate.is_some() {
378            TraceContext {
379                traceparent: opts.traceparent,
380                tracestate: opts.tracestate,
381            }
382        } else {
383            self.client.resolve_trace_context().await
384        };
385        inject_trace_context(&mut params, &trace_ctx);
386        let rpc_start = Instant::now();
387        let result = self.client.call("session.send", Some(params)).await?;
388        let message_id = result
389            .get("messageId")
390            .and_then(|v| v.as_str())
391            .map(|s| s.to_string())
392            .unwrap_or_default();
393        tracing::debug!(
394            elapsed_ms = rpc_start.elapsed().as_millis(),
395            session_id = %self.id,
396            message_id = %message_id,
397            "Session::send completed successfully"
398        );
399        Ok(message_id)
400    }
401
402    /// Send a user message and wait for the agent to finish processing.
403    ///
404    /// Accepts anything convertible to [`MessageOptions`] — pass a `&str` for the
405    /// trivial case, or build a `MessageOptions` for mode/attachments/timeout.
406    /// Blocks until `session.idle` (success) or `session.error` (failure),
407    /// returning the last `assistant.message` event captured during streaming.
408    /// Times out after `MessageOptions::wait_timeout` (default 60 seconds).
409    ///
410    /// Only one `send_and_wait` call may be active per session at a time.
411    /// Calling [`send`](Self::send) while a `send_and_wait`
412    /// is in flight will also return an error.
413    ///
414    /// # Cancel safety
415    ///
416    /// **Cancel-safe.** A `WaiterGuard` clears the in-flight slot on every
417    /// exit path (success, internal failure, internal timeout, *and*
418    /// external cancellation via `tokio::time::timeout` / `select!` /
419    /// dropped JoinHandle). Subsequent `send` and `send_and_wait` calls on
420    /// this session will succeed normally — the slot is never leaked.
421    pub async fn send_and_wait(
422        &self,
423        opts: impl Into<MessageOptions>,
424    ) -> Result<Option<SessionEvent>, Error> {
425        let total_start = Instant::now();
426        let opts = opts.into();
427        let timeout_duration = opts.wait_timeout.unwrap_or(Duration::from_secs(60));
428        let (tx, rx) = oneshot::channel();
429
430        {
431            let mut guard = self.idle_waiter.lock();
432            if guard.is_some() {
433                return Err(ErrorKind::Session(SessionErrorKind::SendWhileWaiting).into());
434            }
435            *guard = Some(IdleWaiter {
436                tx,
437                last_assistant_message: None,
438                started_at: total_start,
439                first_assistant_message_seen: false,
440            });
441        }
442
443        // RAII: clears the idle_waiter slot on every exit path, including
444        // external cancellation (caller's outer `select!` / `timeout` /
445        // dropped future). Without this, an outer cancellation would leak
446        // the slot and brick subsequent `send`/`send_and_wait` calls.
447        let _waiter_guard = WaiterGuard {
448            slot: self.idle_waiter.clone(),
449        };
450
451        let result = tokio::time::timeout(timeout_duration, async {
452            self.send_inner(opts).await?;
453            match rx.await {
454                Ok(result) => result,
455                Err(_) => Err(ErrorKind::Session(SessionErrorKind::EventLoopClosed).into()),
456            }
457        })
458        .await;
459
460        match result {
461            Ok(inner) => {
462                tracing::debug!(
463                    elapsed_ms = total_start.elapsed().as_millis(),
464                    session_id = %self.id,
465                    completed_by = if inner.is_ok() { "idle" } else { "error" },
466                    "Session::send_and_wait complete"
467                );
468                inner
469            }
470            Err(_) => {
471                tracing::warn!(
472                    elapsed_ms = total_start.elapsed().as_millis(),
473                    session_id = %self.id,
474                    completed_by = "timeout",
475                    "Session::send_and_wait failed"
476                );
477                Err(ErrorKind::Session(SessionErrorKind::Timeout(timeout_duration)).into())
478            }
479        }
480    }
481
482    /// Retrieve the session's timeline events.
483    pub async fn get_events(&self) -> Result<Vec<SessionEvent>, Error> {
484        let result = self
485            .client
486            .call(
487                "session.getMessages",
488                Some(serde_json::json!({ "sessionId": self.id })),
489            )
490            .await?;
491        let response: GetMessagesResponse = serde_json::from_value(result)?;
492        Ok(response.events)
493    }
494
495    /// Deprecated alias for [`get_events`](Self::get_events).
496    #[deprecated(since = "0.1.0", note = "Use `get_events()` instead")]
497    pub async fn get_messages(&self) -> Result<Vec<SessionEvent>, Error> {
498        self.get_events().await
499    }
500
501    /// Abort the current agent turn.
502    ///
503    /// # Cancel safety
504    ///
505    /// **Cancel-safe.** Single `session.abort` RPC; the underlying
506    /// [`Client::call`](crate::Client::call) is cancel-safe via the
507    /// writer-actor.
508    pub async fn abort(&self) -> Result<(), Error> {
509        self.client
510            .call(
511                "session.abort",
512                Some(serde_json::json!({ "sessionId": self.id })),
513            )
514            .await?;
515        Ok(())
516    }
517
518    /// Switch to a different model.
519    ///
520    /// Pass `None` for `opts` if no extra configuration is needed.
521    pub async fn set_model(&self, model: &str, opts: Option<SetModelOptions>) -> Result<(), Error> {
522        let opts = opts.unwrap_or_default();
523        let request = ModelSwitchToRequest {
524            model_id: model.to_string(),
525            reasoning_effort: opts.reasoning_effort,
526            reasoning_summary: opts.reasoning_summary,
527            context_tier: opts.context_tier,
528            model_capabilities: opts.model_capabilities,
529        };
530        self.rpc().model().switch_to(request).await?;
531        Ok(())
532    }
533
534    /// Disconnect this session from the CLI.
535    ///
536    /// Sends the `session.destroy` RPC, stops the event loop, and unregisters
537    /// the session from the client. **Session state on disk** (conversation
538    /// history, planning state, artifacts) is **preserved**, so the
539    /// conversation can be resumed later via [`Client::resume_session`]
540    /// using this session's ID. To permanently remove all on-disk session
541    /// data, use [`Client::delete_session`] instead.
542    ///
543    /// The caller should ensure the session is idle (e.g. [`send_and_wait`]
544    /// has returned) before disconnecting; in-flight tool or event handlers
545    /// may otherwise observe failures.
546    ///
547    /// [`Client::resume_session`]: crate::Client::resume_session
548    /// [`Client::delete_session`]: crate::Client::delete_session
549    /// [`send_and_wait`]: Self::send_and_wait
550    pub async fn disconnect(&self) -> Result<(), Error> {
551        self.client
552            .call(
553                "session.destroy",
554                Some(serde_json::json!({ "sessionId": self.id })),
555            )
556            .await?;
557        self.stop_event_loop().await;
558        self.client.unregister_session(&self.id);
559        Ok(())
560    }
561
562    /// Deprecated alias for [`disconnect`](Self::disconnect). The
563    /// underlying wire RPC happens to be named `session.destroy`, but it
564    /// only severs the connection — on-disk session state is preserved.
565    /// Prefer `disconnect` in new code.
566    #[deprecated(since = "0.1.0", note = "Use `disconnect()` instead")]
567    pub async fn destroy(&self) -> Result<(), Error> {
568        self.disconnect().await
569    }
570
571    /// Write a log message to the session.
572    ///
573    /// Pass `None` for `opts` to use defaults (info level, persisted).
574    pub async fn log(
575        &self,
576        message: &str,
577        opts: Option<crate::types::LogOptions>,
578    ) -> Result<(), Error> {
579        let opts = opts.unwrap_or_default();
580        let level = match opts.level {
581            Some(level) => Some(serde_json::from_value(serde_json::to_value(level)?)?),
582            None => None,
583        };
584        let request = LogRequest {
585            message: message.to_string(),
586            level,
587            ephemeral: opts.ephemeral,
588            r#type: None,
589            tip: None,
590            url: None,
591        };
592        self.rpc().log(request).await?;
593        Ok(())
594    }
595
596    /// Returns the UI sub-API for elicitation, confirmation, selection, and
597    /// free-form input.
598    ///
599    /// All UI methods route through `session.ui.*` RPCs and require host
600    /// support — check `session.capabilities().ui.elicitation` before use.
601    pub fn ui(&self) -> SessionUi<'_> {
602        SessionUi { session: self }
603    }
604
605    /// Returns an error if the host doesn't support elicitation.
606    fn assert_elicitation(&self) -> Result<(), Error> {
607        if self
608            .capabilities
609            .read()
610            .ui
611            .as_ref()
612            .and_then(|u| u.elicitation)
613            != Some(true)
614        {
615            return Err(ErrorKind::Session(SessionErrorKind::ElicitationNotSupported).into());
616        }
617        Ok(())
618    }
619}
620
621impl Drop for Session {
622    fn drop(&mut self) {
623        // Cooperative shutdown: cancel the event loop's token to signal
624        // exit between iterations. The loop will see the cancellation on
625        // its next select poll and break cleanly without interrupting an
626        // in-flight handler. We do NOT abort the JoinHandle — that would
627        // land at any await point in the loop body, potentially leaving
628        // the CLI with an unanswered request id. RFD-400 review finding
629        // #3.
630        //
631        // The handle itself is left in `event_loop` to be reaped by the
632        // tokio runtime when it next polls; we intentionally don't await
633        // it here because Drop is sync.
634        self.shutdown.cancel();
635        self.client.unregister_session(&self.id);
636    }
637}
638
639/// UI sub-API for a [`Session`] — elicitation, confirmation, selection,
640/// and free-form input.
641///
642/// Acquired via [`Session::ui`]. Methods route to `session.ui.*` RPCs and
643/// require host elicitation support — check
644/// `session.capabilities().ui.elicitation` before use.
645pub struct SessionUi<'a> {
646    session: &'a Session,
647}
648
649impl<'a> SessionUi<'a> {
650    /// Request user input via an interactive UI form (elicitation).
651    ///
652    /// Sends a JSON Schema describing form fields to the CLI host. The host
653    /// renders a form dialog and returns the user's response.
654    ///
655    /// Prefer the typed convenience methods [`confirm`](Self::confirm),
656    /// [`select`](Self::select), and [`input`](Self::input) for common cases.
657    pub async fn elicitation(
658        &self,
659        message: &str,
660        schema: Value,
661    ) -> Result<ElicitationResult, Error> {
662        self.session.assert_elicitation()?;
663        let result = self
664            .session
665            .client
666            .call(
667                "session.ui.elicitation",
668                Some(serde_json::json!({
669                    "sessionId": self.session.id,
670                    "message": message,
671                    "requestedSchema": schema,
672                })),
673            )
674            .await?;
675        let elicitation: ElicitationResult = serde_json::from_value(result)?;
676        Ok(elicitation)
677    }
678
679    /// Ask the user a yes/no confirmation question.
680    ///
681    /// Returns `true` if the user accepted and confirmed, `false` otherwise.
682    pub async fn confirm(&self, message: &str) -> Result<bool, Error> {
683        self.session.assert_elicitation()?;
684        let schema = serde_json::json!({
685            "type": "object",
686            "properties": {
687                "confirmed": {
688                    "type": "boolean",
689                    "default": true,
690                }
691            },
692            "required": ["confirmed"]
693        });
694        let result = self.elicitation(message, schema).await?;
695        Ok(result.action == "accept"
696            && result
697                .content
698                .and_then(|c| c.get("confirmed").and_then(|v| v.as_bool()))
699                == Some(true))
700    }
701
702    /// Ask the user to select from a list of options.
703    ///
704    /// Returns the selected option string on accept, or `None` on decline/cancel.
705    pub async fn select(&self, message: &str, options: &[&str]) -> Result<Option<String>, Error> {
706        self.session.assert_elicitation()?;
707        let schema = serde_json::json!({
708            "type": "object",
709            "properties": {
710                "selection": {
711                    "type": "string",
712                    "enum": options,
713                }
714            },
715            "required": ["selection"]
716        });
717        let result = self.elicitation(message, schema).await?;
718        if result.action != "accept" {
719            return Ok(None);
720        }
721        let selection = result.content.and_then(|c| {
722            c.get("selection")
723                .and_then(|v| v.as_str())
724                .map(String::from)
725        });
726        Ok(selection)
727    }
728
729    /// Ask the user for free-form text input.
730    ///
731    /// Returns the input string on accept, or `None` on decline/cancel.
732    /// Use [`UiInputOptions`] to set validation constraints and field metadata.
733    pub async fn input(
734        &self,
735        message: &str,
736        options: Option<&UiInputOptions<'_>>,
737    ) -> Result<Option<String>, Error> {
738        self.session.assert_elicitation()?;
739        let mut field = serde_json::json!({ "type": "string" });
740        if let Some(opts) = options {
741            if let Some(title) = opts.title {
742                field["title"] = Value::String(title.to_string());
743            }
744            if let Some(desc) = opts.description {
745                field["description"] = Value::String(desc.to_string());
746            }
747            if let Some(min) = opts.min_length {
748                field["minLength"] = Value::Number(min.into());
749            }
750            if let Some(max) = opts.max_length {
751                field["maxLength"] = Value::Number(max.into());
752            }
753            if let Some(fmt) = &opts.format {
754                field["format"] = Value::String(fmt.as_str().to_string());
755            }
756            if let Some(default) = opts.default {
757                field["default"] = Value::String(default.to_string());
758            }
759        }
760        let schema = serde_json::json!({
761            "type": "object",
762            "properties": { "value": field },
763            "required": ["value"]
764        });
765        let result = self.elicitation(message, schema).await?;
766        if result.action != "accept" {
767            return Ok(None);
768        }
769        let value = result
770            .content
771            .and_then(|c| c.get("value").and_then(|v| v.as_str()).map(String::from));
772        Ok(value)
773    }
774}
775
776impl Client {
777    /// Create a new session on the CLI.
778    ///
779    /// Sends `session.create`, registers the session on the router,
780    /// and spawns an internal event loop that dispatches to the handler.
781    ///
782    /// All callbacks (per-event handlers, tool handlers, hooks, transform)
783    /// are configured via [`SessionConfig`] using its `with_*_handler` /
784    /// `with_tools` / `with_hooks` / `with_system_message_transform` builder
785    /// methods.
786    ///
787    /// If [`hooks_handler`](SessionConfig::hooks_handler) is set, the
788    /// wire-level `hooks` flag is automatically enabled.
789    ///
790    /// If [`system_message_transform`](SessionConfig::system_message_transform) is set, the SDK injects
791    /// `action: "transform"` sections into the [`SystemMessageConfig`] wire
792    /// format and handles `systemMessage.transform` RPC callbacks during
793    /// the session.
794    ///
795    /// Each per-event handler is independently optional. If a handler is
796    /// not installed, the SDK signals the runtime not to emit the matching
797    /// broadcast (and silently skips dispatch if one arrives anyway).
798    pub async fn create_session(&self, mut config: SessionConfig) -> Result<Session, Error> {
799        let total_start = Instant::now();
800        // For cloud sessions, let the CLI/server assign the session id and
801        // register the session lazily once the response arrives. For non-cloud
802        // sessions we generate the id client-side (when the caller didn't
803        // supply one) so the session can be registered BEFORE the RPC — the
804        // CLI may issue session-scoped requests (e.g. sessionFs.writeFile for
805        // workspace metadata) during session.create processing, before it has
806        // sent the response.
807        let caller_session_id = config.session_id.clone();
808        let use_server_generated_id = config.cloud.is_some() && caller_session_id.is_none();
809        let local_session_id: Option<SessionId> = if use_server_generated_id {
810            None
811        } else {
812            Some(
813                caller_session_id
814                    .clone()
815                    .unwrap_or_else(|| SessionId::new(uuid::Uuid::new_v4().to_string())),
816            )
817        };
818        if config.hooks_handler.is_some() && config.hooks.is_none() {
819            config.hooks = Some(true);
820        }
821        if let Some(transforms) = config.system_message_transform.clone() {
822            inject_transform_sections(&mut config, transforms.as_ref());
823        }
824        let mode = self.inner.mode;
825        if mode == crate::ClientMode::Empty && config.available_tools.is_none() {
826            return Err(Error::with_message(
827                ErrorKind::InvalidConfig,
828                "ClientMode::Empty requires available_tools to be set on the session config. \
829                 Use ToolSet to specify which tools the session may use (e.g. \
830                 ToolSet::new().add_builtin_many(BUILTIN_TOOLS_ISOLATED)).",
831            ));
832        }
833        crate::mode::validate_tool_filter_list(
834            "available_tools",
835            config.available_tools.as_deref(),
836        )?;
837        crate::mode::validate_tool_filter_list("excluded_tools", config.excluded_tools.as_deref())?;
838        config.system_message =
839            crate::mode::system_message_for_mode(mode, config.system_message.take());
840        config.memory = crate::mode::memory_for_mode(mode, config.memory.take());
841        if mode == crate::ClientMode::Empty {
842            if config.enable_session_telemetry.is_none() {
843                config.enable_session_telemetry = Some(false);
844            }
845            if config.skip_embedding_retrieval.is_none() {
846                config.skip_embedding_retrieval = Some(true);
847            }
848            if config.enable_on_demand_instruction_discovery.is_none() {
849                config.enable_on_demand_instruction_discovery = Some(false);
850            }
851            if config.enable_file_hooks.is_none() {
852                config.enable_file_hooks = Some(false);
853            }
854            if config.enable_host_git_operations.is_none() {
855                config.enable_host_git_operations = Some(false);
856            }
857            if config.enable_session_store.is_none() {
858                config.enable_session_store = Some(false);
859            }
860            if config.enable_skills.is_none() {
861                config.enable_skills = Some(false);
862            }
863        }
864        if mode == crate::ClientMode::Empty && config.mcp_oauth_token_storage.is_none() {
865            config.mcp_oauth_token_storage = Some("in-memory".into());
866        }
867        if mode == crate::ClientMode::Empty && config.embedding_cache_storage.is_none() {
868            config.embedding_cache_storage = Some("in-memory".into());
869        }
870        let opt_skip_custom_instructions = config.skip_custom_instructions;
871        let opt_custom_agents_local_only = config.custom_agents_local_only;
872        let opt_coauthor_enabled = config.coauthor_enabled;
873        let opt_manage_schedule_enabled = config.manage_schedule_enabled;
874        let (wire, mut runtime) = config.into_wire(local_session_id.clone())?;
875
876        let permission_handler = crate::permission::resolve_handler(
877            runtime.permission_handler.take(),
878            runtime.permission_policy.take(),
879        );
880        let handlers = SessionHandlers {
881            permission: permission_handler,
882            elicitation: runtime.elicitation_handler.take(),
883            user_input: runtime.user_input_handler.take(),
884            exit_plan_mode: runtime.exit_plan_mode_handler.take(),
885            auto_mode_switch: runtime.auto_mode_switch_handler.take(),
886            tools: Arc::new(std::mem::take(&mut runtime.tool_handlers)),
887        };
888        let hooks = runtime.hooks_handler.take();
889        let transforms = runtime.system_message_transform.take();
890        let tools_count = wire.tools.as_ref().map_or(0, Vec::len);
891        let commands_count = runtime.commands.as_ref().map_or(0, Vec::len);
892        let has_hooks = hooks.is_some();
893        let command_handlers = build_command_handler_map(runtime.commands.as_deref());
894        let canvas_handler = runtime.canvas_handler.take();
895        let session_fs_provider = runtime.session_fs_provider.take();
896        if self.inner.session_fs_configured && session_fs_provider.is_none() {
897            return Err(ErrorKind::Session(SessionErrorKind::SessionFsProviderRequired).into());
898        }
899        if self.inner.session_fs_sqlite_declared
900            && let Some(ref provider) = session_fs_provider
901            && provider.sqlite().is_none()
902        {
903            return Err(Error::with_message(
904                ErrorKind::InvalidConfig,
905                "SessionFs capabilities declare SQLite support but the provider \
906                 does not implement SessionFsSqliteProvider",
907            ));
908        }
909
910        let mut params = serde_json::to_value(&wire)?;
911        let trace_ctx = self.resolve_trace_context().await;
912        inject_trace_context(&mut params, &trace_ctx);
913
914        let setup_start = Instant::now();
915        let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default()));
916        let idle_waiter = Arc::new(ParkingLotMutex::new(None));
917        let open_canvases = Arc::new(parking_lot::RwLock::new(Vec::new()));
918        let shutdown = CancellationToken::new();
919        let (event_tx, _) = tokio::sync::broadcast::channel(512);
920
921        // For cloud sessions (use_server_generated_id), defer session
922        // registration to the inline callback so the read task registers
923        // the session synchronously the instant the response arrives.
924        // For non-cloud sessions, register up-front so the CLI can issue
925        // session-scoped requests during session.create processing.
926        let inline_stash: Arc<
927            ParkingLotMutex<Option<(SessionId, crate::router::SessionChannels)>>,
928        > = Arc::new(ParkingLotMutex::new(None));
929
930        let inline_callback: Option<crate::jsonrpc::InlineResponseCallback> = if let Some(ref sid) =
931            local_session_id
932        {
933            let channels = self.register_session(sid);
934            *inline_stash.lock() = Some((sid.clone(), channels));
935            None
936        } else {
937            let client = self.clone();
938            let stash = inline_stash.clone();
939            let expected = caller_session_id.clone();
940            Some(Box::new(move |response| {
941                let result = response.result.as_ref().ok_or_else(|| {
942                    Error::with_message(ErrorKind::Json, "session.create response had no result")
943                })?;
944                let parsed: CreateSessionResult =
945                    serde_json::from_value(result.clone()).map_err(Error::from)?;
946                if let Some(requested) = expected.as_ref()
947                    && parsed.session_id != *requested
948                {
949                    return Err(ErrorKind::Session(SessionErrorKind::SessionIdMismatch {
950                        requested: requested.clone(),
951                        returned: parsed.session_id,
952                    })
953                    .into());
954                }
955                let channels = client.register_session(&parsed.session_id);
956                *stash.lock() = Some((parsed.session_id, channels));
957                Ok(())
958            }))
959        };
960
961        let rpc_start = Instant::now();
962        let result = match self
963            .call_with_inline_callback("session.create", Some(params), inline_callback)
964            .await
965        {
966            Ok(result) => result,
967            Err(error) => {
968                if let Some((id, _channels)) = inline_stash.lock().take() {
969                    self.unregister_session(&id);
970                }
971                return Err(error);
972            }
973        };
974        tracing::debug!(
975            elapsed_ms = rpc_start.elapsed().as_millis(),
976            "Client::create_session session creation request completed successfully"
977        );
978        let create_result: CreateSessionResult = match serde_json::from_value(result) {
979            Ok(result) => result,
980            Err(error) => {
981                if let Some((id, _channels)) = inline_stash.lock().take() {
982                    self.unregister_session(&id);
983                }
984                return Err(error.into());
985            }
986        };
987
988        if let Some(ref requested) = local_session_id
989            && create_result.session_id != *requested
990        {
991            if let Some((id, _channels)) = inline_stash.lock().take() {
992                self.unregister_session(&id);
993            }
994            return Err(ErrorKind::Session(SessionErrorKind::SessionIdMismatch {
995                requested: requested.clone(),
996                returned: create_result.session_id.clone(),
997            })
998            .into());
999        }
1000
1001        let (session_id, channels) = inline_stash
1002            .lock()
1003            .take()
1004            .expect("session registration must have populated stash on success");
1005        let event_loop = spawn_event_loop(
1006            session_id.clone(),
1007            self.clone(),
1008            handlers,
1009            hooks,
1010            transforms,
1011            command_handlers,
1012            canvas_handler,
1013            session_fs_provider,
1014            channels,
1015            idle_waiter.clone(),
1016            capabilities.clone(),
1017            open_canvases.clone(),
1018            event_tx.clone(),
1019            shutdown.clone(),
1020        );
1021        tracing::debug!(
1022            elapsed_ms = setup_start.elapsed().as_millis(),
1023            session_id = %session_id,
1024            tools_count,
1025            commands_count,
1026            has_hooks,
1027            "Client::create_session local setup complete"
1028        );
1029        *capabilities.write() = create_result.capabilities.unwrap_or_default();
1030
1031        tracing::debug!(
1032            elapsed_ms = total_start.elapsed().as_millis(),
1033            session_id = %session_id,
1034            "Client::create_session complete"
1035        );
1036        let session = Session {
1037            id: session_id,
1038            cwd: self.cwd().clone(),
1039            workspace_path: create_result.workspace_path,
1040            remote_url: create_result.remote_url,
1041            client: self.clone(),
1042            event_loop: ParkingLotMutex::new(Some(event_loop)),
1043            shutdown,
1044            idle_waiter,
1045            capabilities,
1046            open_canvases,
1047            event_tx,
1048        };
1049        apply_mode_post_create_patch(
1050            &session,
1051            mode,
1052            opt_skip_custom_instructions,
1053            opt_custom_agents_local_only,
1054            opt_coauthor_enabled,
1055            opt_manage_schedule_enabled,
1056        )
1057        .await?;
1058        Ok(session)
1059    }
1060
1061    /// Resume an existing session on the CLI.
1062    ///
1063    /// Sends `session.resume` and `session.skills.reload`, registers the
1064    /// session on the router, and spawns the event loop.
1065    ///
1066    /// All callbacks (event handler, hooks, transform) are configured
1067    /// via [`ResumeSessionConfig`] using its `with_*` builder methods.
1068    ///
1069    /// See [`Self::create_session`] for the defaults applied when callback
1070    /// fields are unset.
1071    pub async fn resume_session(&self, mut config: ResumeSessionConfig) -> Result<Session, Error> {
1072        let total_start = Instant::now();
1073        let session_id = config.session_id.clone();
1074        if config.hooks_handler.is_some() && config.hooks.is_none() {
1075            config.hooks = Some(true);
1076        }
1077        if let Some(transforms) = config.system_message_transform.clone() {
1078            inject_transform_sections_resume(&mut config, transforms.as_ref());
1079        }
1080        let mode = self.inner.mode;
1081        if mode == crate::ClientMode::Empty && config.available_tools.is_none() {
1082            return Err(Error::with_message(
1083                ErrorKind::InvalidConfig,
1084                "ClientMode::Empty requires available_tools to be set on the session config. \
1085                 Use ToolSet to specify which tools the session may use (e.g. \
1086                 ToolSet::new().add_builtin_many(BUILTIN_TOOLS_ISOLATED)).",
1087            ));
1088        }
1089        crate::mode::validate_tool_filter_list(
1090            "available_tools",
1091            config.available_tools.as_deref(),
1092        )?;
1093        crate::mode::validate_tool_filter_list("excluded_tools", config.excluded_tools.as_deref())?;
1094        config.system_message =
1095            crate::mode::system_message_for_mode(mode, config.system_message.take());
1096        config.memory = crate::mode::memory_for_mode(mode, config.memory.take());
1097        if mode == crate::ClientMode::Empty {
1098            if config.enable_session_telemetry.is_none() {
1099                config.enable_session_telemetry = Some(false);
1100            }
1101            if config.skip_embedding_retrieval.is_none() {
1102                config.skip_embedding_retrieval = Some(true);
1103            }
1104            if config.enable_on_demand_instruction_discovery.is_none() {
1105                config.enable_on_demand_instruction_discovery = Some(false);
1106            }
1107            if config.enable_file_hooks.is_none() {
1108                config.enable_file_hooks = Some(false);
1109            }
1110            if config.enable_host_git_operations.is_none() {
1111                config.enable_host_git_operations = Some(false);
1112            }
1113            if config.enable_session_store.is_none() {
1114                config.enable_session_store = Some(false);
1115            }
1116            if config.enable_skills.is_none() {
1117                config.enable_skills = Some(false);
1118            }
1119        }
1120        if mode == crate::ClientMode::Empty && config.mcp_oauth_token_storage.is_none() {
1121            config.mcp_oauth_token_storage = Some("in-memory".into());
1122        }
1123        if mode == crate::ClientMode::Empty && config.embedding_cache_storage.is_none() {
1124            config.embedding_cache_storage = Some("in-memory".into());
1125        }
1126        let opt_skip_custom_instructions = config.skip_custom_instructions;
1127        let opt_custom_agents_local_only = config.custom_agents_local_only;
1128        let opt_coauthor_enabled = config.coauthor_enabled;
1129        let opt_manage_schedule_enabled = config.manage_schedule_enabled;
1130        let (wire, mut runtime) = config.into_wire()?;
1131
1132        let permission_handler = crate::permission::resolve_handler(
1133            runtime.permission_handler.take(),
1134            runtime.permission_policy.take(),
1135        );
1136        let handlers = SessionHandlers {
1137            permission: permission_handler,
1138            elicitation: runtime.elicitation_handler.take(),
1139            user_input: runtime.user_input_handler.take(),
1140            exit_plan_mode: runtime.exit_plan_mode_handler.take(),
1141            auto_mode_switch: runtime.auto_mode_switch_handler.take(),
1142            tools: Arc::new(std::mem::take(&mut runtime.tool_handlers)),
1143        };
1144        let hooks = runtime.hooks_handler.take();
1145        let transforms = runtime.system_message_transform.take();
1146        let tools_count = wire.tools.as_ref().map_or(0, Vec::len);
1147        let commands_count = runtime.commands.as_ref().map_or(0, Vec::len);
1148        let has_hooks = hooks.is_some();
1149        let command_handlers = build_command_handler_map(runtime.commands.as_deref());
1150        let canvas_handler = runtime.canvas_handler.take();
1151        let session_fs_provider = runtime.session_fs_provider.take();
1152        if self.inner.session_fs_configured && session_fs_provider.is_none() {
1153            return Err(ErrorKind::Session(SessionErrorKind::SessionFsProviderRequired).into());
1154        }
1155        if self.inner.session_fs_sqlite_declared
1156            && let Some(ref provider) = session_fs_provider
1157            && provider.sqlite().is_none()
1158        {
1159            return Err(Error::with_message(
1160                ErrorKind::InvalidConfig,
1161                "SessionFs capabilities declare SQLite support but the provider \
1162                 does not implement SessionFsSqliteProvider",
1163            ));
1164        }
1165
1166        let mut params = serde_json::to_value(&wire)?;
1167        let trace_ctx = self.resolve_trace_context().await;
1168        inject_trace_context(&mut params, &trace_ctx);
1169
1170        let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default()));
1171        let setup_start = Instant::now();
1172        let channels = self.register_session(&session_id);
1173        let idle_waiter = Arc::new(ParkingLotMutex::new(None));
1174        let open_canvases = Arc::new(parking_lot::RwLock::new(Vec::new()));
1175        let shutdown = CancellationToken::new();
1176        let (event_tx, _) = tokio::sync::broadcast::channel(512);
1177        let event_loop = spawn_event_loop(
1178            session_id.clone(),
1179            self.clone(),
1180            handlers,
1181            hooks,
1182            transforms,
1183            command_handlers,
1184            canvas_handler,
1185            session_fs_provider,
1186            channels,
1187            idle_waiter.clone(),
1188            capabilities.clone(),
1189            open_canvases.clone(),
1190            event_tx.clone(),
1191            shutdown.clone(),
1192        );
1193        let mut registration =
1194            PendingSessionRegistration::new(self.clone(), session_id.clone(), shutdown.clone());
1195        tracing::debug!(
1196            elapsed_ms = setup_start.elapsed().as_millis(),
1197            session_id = %session_id,
1198            tools_count,
1199            commands_count,
1200            has_hooks,
1201            "Client::resume_session local setup complete"
1202        );
1203
1204        let rpc_start = Instant::now();
1205        let result = match self.call("session.resume", Some(params)).await {
1206            Ok(result) => result,
1207            Err(error) => {
1208                registration.cleanup(event_loop).await;
1209                return Err(error);
1210            }
1211        };
1212        tracing::debug!(
1213            elapsed_ms = rpc_start.elapsed().as_millis(),
1214            session_id = %session_id,
1215            "Client::resume_session session resume request completed successfully"
1216        );
1217
1218        let resume_result: ResumeSessionResult = match serde_json::from_value(result) {
1219            Ok(result) => result,
1220            Err(error) => {
1221                registration.cleanup(event_loop).await;
1222                return Err(error.into());
1223            }
1224        };
1225        let cli_session_id = resume_result
1226            .session_id
1227            .clone()
1228            .unwrap_or_else(|| session_id.clone());
1229        if cli_session_id != session_id {
1230            registration.cleanup(event_loop).await;
1231            return Err(ErrorKind::Session(SessionErrorKind::SessionIdMismatch {
1232                requested: session_id,
1233                returned: cli_session_id,
1234            })
1235            .into());
1236        }
1237
1238        // Reload skills after resume (best-effort).
1239        let skills_reload_start = Instant::now();
1240        if let Err(e) = self
1241            .call(
1242                "session.skills.reload",
1243                Some(serde_json::json!({ "sessionId": session_id })),
1244            )
1245            .await
1246        {
1247            warn!(
1248                elapsed_ms = skills_reload_start.elapsed().as_millis(),
1249                session_id = %session_id,
1250                error = %e,
1251                "Client::resume_session skills reload request failed"
1252            );
1253        } else {
1254            tracing::debug!(
1255                elapsed_ms = skills_reload_start.elapsed().as_millis(),
1256                session_id = %session_id,
1257                "Client::resume_session skills reload request completed successfully"
1258            );
1259        }
1260
1261        *capabilities.write() = resume_result.capabilities.unwrap_or_default();
1262        // Upsert resume snapshots rather than replacing wholesale. Live
1263        // `session.canvas.opened` notifications can arrive on the event loop
1264        // while `session.resume` is in flight; a wholesale replace would
1265        // discard those updates.
1266        {
1267            let mut snapshots = open_canvases.write();
1268            for snapshot in resume_result.open_canvases.unwrap_or_default() {
1269                upsert_open_canvas_snapshot(&mut snapshots, snapshot);
1270            }
1271        }
1272
1273        tracing::debug!(
1274            elapsed_ms = total_start.elapsed().as_millis(),
1275            session_id = %session_id,
1276            "Client::resume_session complete"
1277        );
1278        registration.disarm();
1279        let session = Session {
1280            id: session_id,
1281            cwd: self.cwd().clone(),
1282            workspace_path: resume_result.workspace_path,
1283            remote_url: resume_result.remote_url,
1284            client: self.clone(),
1285            event_loop: ParkingLotMutex::new(Some(event_loop)),
1286            shutdown,
1287            idle_waiter,
1288            capabilities,
1289            open_canvases,
1290            event_tx,
1291        };
1292        apply_mode_post_create_patch(
1293            &session,
1294            mode,
1295            opt_skip_custom_instructions,
1296            opt_custom_agents_local_only,
1297            opt_coauthor_enabled,
1298            opt_manage_schedule_enabled,
1299        )
1300        .await?;
1301        Ok(session)
1302    }
1303}
1304
1305type CommandHandlerMap = HashMap<String, Arc<dyn CommandHandler>>;
1306
1307async fn apply_mode_post_create_patch(
1308    session: &Session,
1309    mode: crate::ClientMode,
1310    opt_skip_custom_instructions: Option<bool>,
1311    opt_custom_agents_local_only: Option<bool>,
1312    opt_coauthor_enabled: Option<bool>,
1313    opt_manage_schedule_enabled: Option<bool>,
1314) -> Result<(), Error> {
1315    use crate::generated::api_types::SessionUpdateOptionsParams;
1316    let mut patch = SessionUpdateOptionsParams::default();
1317    let should_send = if mode == crate::ClientMode::Empty {
1318        patch.skip_custom_instructions = Some(opt_skip_custom_instructions.unwrap_or(true));
1319        patch.custom_agents_local_only = Some(opt_custom_agents_local_only.unwrap_or(true));
1320        patch.coauthor_enabled = Some(opt_coauthor_enabled.unwrap_or(false));
1321        patch.manage_schedule_enabled = Some(opt_manage_schedule_enabled.unwrap_or(false));
1322        patch.installed_plugins = Some(Vec::new());
1323        true
1324    } else {
1325        let mut any = false;
1326        if let Some(v) = opt_skip_custom_instructions {
1327            patch.skip_custom_instructions = Some(v);
1328            any = true;
1329        }
1330        if let Some(v) = opt_custom_agents_local_only {
1331            patch.custom_agents_local_only = Some(v);
1332            any = true;
1333        }
1334        if let Some(v) = opt_coauthor_enabled {
1335            patch.coauthor_enabled = Some(v);
1336            any = true;
1337        }
1338        if let Some(v) = opt_manage_schedule_enabled {
1339            patch.manage_schedule_enabled = Some(v);
1340            any = true;
1341        }
1342        any
1343    };
1344    if !should_send {
1345        return Ok(());
1346    }
1347    if let Err(error) = session.rpc().options().update(patch).await {
1348        let _ = session.disconnect().await;
1349        return Err(error);
1350    }
1351    Ok(())
1352}
1353
1354fn build_command_handler_map(commands: Option<&[CommandDefinition]>) -> Arc<CommandHandlerMap> {
1355    let map = match commands {
1356        Some(commands) => commands
1357            .iter()
1358            .filter(|cmd| !cmd.name.is_empty())
1359            .map(|cmd| (cmd.name.clone(), cmd.handler.clone()))
1360            .collect(),
1361        None => HashMap::new(),
1362    };
1363    Arc::new(map)
1364}
1365
1366fn upsert_open_canvas_snapshot(
1367    snapshots: &mut Vec<OpenCanvasInstance>,
1368    snapshot: OpenCanvasInstance,
1369) {
1370    if let Some(existing) = snapshots
1371        .iter_mut()
1372        .find(|open| open.instance_id == snapshot.instance_id)
1373    {
1374        *existing = snapshot;
1375    } else {
1376        snapshots.push(snapshot);
1377    }
1378}
1379
1380fn remove_open_canvas_snapshot(snapshots: &mut Vec<OpenCanvasInstance>, instance_id: &str) {
1381    snapshots.retain(|open| open.instance_id != instance_id);
1382}
1383
1384#[allow(clippy::too_many_arguments)]
1385fn spawn_event_loop(
1386    session_id: SessionId,
1387    client: Client,
1388    handlers: SessionHandlers,
1389    hooks: Option<Arc<dyn SessionHooks>>,
1390    transforms: Option<Arc<dyn SystemMessageTransform>>,
1391    command_handlers: Arc<CommandHandlerMap>,
1392    canvas_handler: Option<Arc<dyn CanvasHandler>>,
1393    session_fs_provider: Option<Arc<dyn SessionFsProvider>>,
1394    channels: crate::router::SessionChannels,
1395    idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
1396    capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
1397    open_canvases: Arc<parking_lot::RwLock<Vec<OpenCanvasInstance>>>,
1398    event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
1399    shutdown: CancellationToken,
1400) -> JoinHandle<()> {
1401    let crate::router::SessionChannels {
1402        mut notifications,
1403        mut requests,
1404    } = channels;
1405
1406    let span = tracing::error_span!("session_event_loop", session_id = %session_id);
1407    tokio::spawn(
1408        async move {
1409            loop {
1410                // `mpsc::UnboundedReceiver::recv` and
1411                // `CancellationToken::cancelled` are both cancel-safe per
1412                // RFD 400. The selected branch's `await`'d handler is
1413                // *not* mid-cancelled by the select — once a branch fires
1414                // it runs to completion within the loop's iteration.
1415                // Spawned child tasks inside `handle_notification`
1416                // (permission/tool/elicitation callbacks) intentionally
1417                // outlive the parent loop and own their own cleanup;
1418                // this is RFD 400's "spawn background tasks to perform
1419                // cancel-unsafe operations" pattern and is correct as-is.
1420                tokio::select! {
1421                    _ = shutdown.cancelled() => break,
1422                    Some(notification) = notifications.recv() => {
1423                        handle_notification(
1424                            &session_id, &client, &handlers, &command_handlers, notification, &idle_waiter, &capabilities, &open_canvases, &event_tx,
1425                        ).await;
1426                    }
1427                    Some(request) = requests.recv() => {
1428                        let ctx = RequestDispatchContext {
1429                            client: &client,
1430                            handlers: &handlers,
1431                            hooks: hooks.as_deref(),
1432                            transforms: transforms.as_deref(),
1433                            canvas_handler: canvas_handler.as_ref(),
1434                            session_fs_provider: session_fs_provider.as_ref(),
1435                        };
1436                        handle_request(&session_id, ctx, request).await;
1437                    }
1438                    else => break,
1439                }
1440            }
1441            // Channels closed or shutdown signaled — fail any pending
1442            // send_and_wait so the caller observes a clean error.
1443            if let Some(waiter) = idle_waiter.lock().take() {
1444                let _ = waiter
1445                    .tx
1446                    .send(Err(ErrorKind::Session(SessionErrorKind::EventLoopClosed).into()));
1447            }
1448        }
1449        .instrument(span),
1450    )
1451}
1452
1453fn extract_request_id(data: &Value) -> Option<RequestId> {
1454    data.get("requestId")
1455        .and_then(|v| v.as_str())
1456        .filter(|s| !s.is_empty())
1457        .map(RequestId::new)
1458}
1459
1460/// Map a [`PermissionResult`] to the `result` payload sent back to the
1461/// server via `session.permissions.handlePendingPermissionRequest`.
1462///
1463/// Returns `None` when the SDK must not send a response.
1464fn notification_permission_payload(result: &PermissionResult) -> Option<Value> {
1465    match result {
1466        PermissionResult::NoResult => None,
1467        PermissionResult::Decision(decision) => Some(
1468            serde_json::to_value(decision).expect("serializing permission decision should succeed"),
1469        ),
1470    }
1471}
1472
1473fn tool_failure_result(message: impl Into<String>) -> ToolResult {
1474    let message = message.into();
1475    ToolResult::Expanded(ToolResultExpanded {
1476        text_result_for_llm: message.clone(),
1477        result_type: "failure".to_string(),
1478        binary_results_for_llm: None,
1479        session_log: None,
1480        error: Some(message),
1481        tool_telemetry: None,
1482    })
1483}
1484
1485/// Process a notification from the CLI's broadcast channel.
1486#[allow(clippy::too_many_arguments)]
1487async fn handle_notification(
1488    session_id: &SessionId,
1489    client: &Client,
1490    handlers: &SessionHandlers,
1491    command_handlers: &Arc<CommandHandlerMap>,
1492    notification: SessionEventNotification,
1493    idle_waiter: &Arc<ParkingLotMutex<Option<IdleWaiter>>>,
1494    capabilities: &Arc<parking_lot::RwLock<SessionCapabilities>>,
1495    open_canvases: &Arc<parking_lot::RwLock<Vec<OpenCanvasInstance>>>,
1496    event_tx: &tokio::sync::broadcast::Sender<SessionEvent>,
1497) {
1498    let dispatch_start = Instant::now();
1499    let event = notification.event.clone();
1500    let event_type = event.parsed_type();
1501    if event_type == SessionEventType::PermissionRequested {
1502        tracing::debug!(
1503            session_id = %session_id,
1504            event_type = %event.event_type,
1505            "Session::handle_notification permission request received"
1506        );
1507    }
1508
1509    // Signal send_and_wait if active. The lock is only contended when
1510    // a send_and_wait call is in flight (idle_waiter is Some).
1511    match event_type {
1512        SessionEventType::AssistantMessage
1513        | SessionEventType::SessionIdle
1514        | SessionEventType::SessionError => {
1515            let mut guard = idle_waiter.lock();
1516            if let Some(waiter) = guard.as_mut() {
1517                match event_type {
1518                    SessionEventType::AssistantMessage => {
1519                        if !waiter.first_assistant_message_seen {
1520                            waiter.first_assistant_message_seen = true;
1521                            tracing::debug!(
1522                                elapsed_ms = waiter.started_at.elapsed().as_millis(),
1523                                session_id = %session_id,
1524                                "Session::send_and_wait first assistant message"
1525                            );
1526                        }
1527                        waiter.last_assistant_message = Some(event.clone());
1528                    }
1529                    SessionEventType::SessionIdle | SessionEventType::SessionError => {
1530                        if let Some(waiter) = guard.take() {
1531                            if event_type == SessionEventType::SessionIdle {
1532                                tracing::debug!(
1533                                    elapsed_ms = waiter.started_at.elapsed().as_millis(),
1534                                    session_id = %session_id,
1535                                    "Session::send_and_wait idle received"
1536                                );
1537                                let _ = waiter.tx.send(Ok(waiter.last_assistant_message));
1538                            } else {
1539                                let error_msg = event
1540                                    .typed_data::<SessionErrorData>()
1541                                    .map(|d| d.message)
1542                                    .or_else(|| {
1543                                        event
1544                                            .data
1545                                            .get("message")
1546                                            .and_then(|v| v.as_str())
1547                                            .map(|s| s.to_string())
1548                                    })
1549                                    .unwrap_or_else(|| "session error".to_string());
1550                                let _ = waiter.tx.send(Err(Error::with_message(
1551                                    ErrorKind::Session(SessionErrorKind::AgentError),
1552                                    error_msg,
1553                                )));
1554                            }
1555                        }
1556                    }
1557                    _ => {}
1558                }
1559            }
1560        }
1561        _ => {}
1562    }
1563
1564    // Update the snapshot caches BEFORE broadcasting so subscribers that
1565    // call `Session::capabilities()` / `Session::open_canvases()` in
1566    // response to the event observe the new state.
1567    if event_type == SessionEventType::CapabilitiesChanged {
1568        match serde_json::from_value::<SessionCapabilities>(notification.event.data.clone()) {
1569            Ok(changed) => *capabilities.write() = changed,
1570            Err(e) => warn!(error = %e, "failed to deserialize capabilities.changed payload"),
1571        }
1572    }
1573    if event_type == SessionEventType::SessionCanvasOpened {
1574        match serde_json::from_value::<OpenCanvasInstance>(notification.event.data.clone()) {
1575            Ok(open_canvas) => {
1576                upsert_open_canvas_snapshot(&mut open_canvases.write(), open_canvas);
1577            }
1578            Err(e) => warn!(error = %e, "failed to deserialize session.canvas.opened payload"),
1579        }
1580    }
1581    if event_type == SessionEventType::SessionCanvasClosed {
1582        match serde_json::from_value::<SessionCanvasClosedData>(notification.event.data.clone()) {
1583            Ok(closed) => {
1584                if closed.instance_id.is_empty() {
1585                    warn!("failed to deserialize session.canvas.closed payload");
1586                } else {
1587                    remove_open_canvas_snapshot(&mut open_canvases.write(), &closed.instance_id);
1588                }
1589            }
1590            Err(e) => warn!(error = %e, "failed to deserialize session.canvas.closed payload"),
1591        }
1592    }
1593
1594    // Fan out the event to runtime subscribers (`Session::subscribe`). `send`
1595    // only errors when there are no receivers, which is the normal case
1596    // before any consumer subscribes.
1597    let _ = event_tx.send(event.clone());
1598
1599    tracing::debug!(
1600        elapsed_ms = dispatch_start.elapsed().as_millis(),
1601        session_id = %session_id,
1602        event_type = %notification.event.event_type,
1603        "Session::handle_notification dispatch"
1604    );
1605
1606    // Notification-based permission/tool/elicitation requests require a
1607    // separate RPC callback. Spawn concurrently since the CLI doesn't block.
1608    match event_type {
1609        SessionEventType::PermissionRequested => {
1610            let Some(request_id) = extract_request_id(&notification.event.data) else {
1611                return;
1612            };
1613            // Honor the runtime's `resolvedByHook` signal — when the
1614            // server has already resolved the permission via a hook,
1615            // clients must not send a second response.
1616            if notification
1617                .event
1618                .data
1619                .get("resolvedByHook")
1620                .and_then(|v| v.as_bool())
1621                .unwrap_or(false)
1622            {
1623                return;
1624            }
1625            // Multi-client safety: if this client has no permission
1626            // handler installed, don't respond — another client on the
1627            // same CLI may handle it.
1628            let Some(permission_handler) = handlers.permission.clone() else {
1629                return;
1630            };
1631            let client = client.clone();
1632            let sid = session_id.clone();
1633            let data: PermissionRequestData =
1634                serde_json::from_value(notification.event.data.clone()).unwrap_or_else(|_| {
1635                    PermissionRequestData {
1636                        kind: None,
1637                        tool_call_id: None,
1638                        extra: notification.event.data.clone(),
1639                    }
1640                });
1641            let span = tracing::error_span!(
1642                "permission_request_handler",
1643                session_id = %sid,
1644                request_id = %request_id
1645            );
1646            tokio::spawn(
1647                async move {
1648                    let handler_start = Instant::now();
1649                    let result = permission_handler
1650                        .handle(sid.clone(), request_id.clone(), data)
1651                        .await;
1652                    tracing::debug!(
1653                        elapsed_ms = handler_start.elapsed().as_millis(),
1654                        session_id = %sid,
1655                        request_id = %request_id,
1656                        "PermissionHandler::handle dispatch"
1657                    );
1658                    let Some(result_value) = notification_permission_payload(&result) else {
1659                        // Handler returned Deferred / NoResult — it will
1660                        // call handlePendingPermissionRequest itself (or
1661                        // leave the request unanswered).
1662                        return;
1663                    };
1664                    let rpc_start = Instant::now();
1665                    let _ = client
1666                        .call(
1667                            "session.permissions.handlePendingPermissionRequest",
1668                            Some(serde_json::json!({
1669                                "sessionId": sid,
1670                                "requestId": request_id,
1671                                "result": result_value,
1672                            })),
1673                        )
1674                        .await;
1675                    tracing::debug!(
1676                        elapsed_ms = rpc_start.elapsed().as_millis(),
1677                        session_id = %sid,
1678                        request_id = %request_id,
1679                        "Session::handle_notification response sent successfully"
1680                    );
1681                }
1682                .instrument(span),
1683            );
1684        }
1685        SessionEventType::ExternalToolRequested => {
1686            let Some(request_id) = extract_request_id(&notification.event.data) else {
1687                return;
1688            };
1689            let data: ExternalToolRequestedData =
1690                match serde_json::from_value(notification.event.data.clone()) {
1691                    Ok(d) => d,
1692                    Err(e) => {
1693                        warn!(error = %e, "failed to deserialize external_tool.requested");
1694                        let client = client.clone();
1695                        let sid = session_id.clone();
1696                        let span = tracing::error_span!(
1697                            "external_tool_deserialize_error",
1698                            session_id = %sid,
1699                            request_id = %request_id
1700                        );
1701                        tokio::spawn(
1702                            async move {
1703                                let rpc_start = Instant::now();
1704                                let _ = client
1705                                .call(
1706                                    "session.tools.handlePendingToolCall",
1707                                    Some(serde_json::json!({
1708                                        "sessionId": sid,
1709                                        "requestId": request_id,
1710                                        "error": format!("Failed to deserialize tool request: {e}"),
1711                                    })),
1712                                )
1713                                .await;
1714                                tracing::debug!(
1715                                    elapsed_ms = rpc_start.elapsed().as_millis(),
1716                                    session_id = %sid,
1717                                    request_id = %request_id,
1718                                    "Session::handle_notification response sent successfully"
1719                                );
1720                            }
1721                            .instrument(span),
1722                        );
1723                        return;
1724                    }
1725                };
1726            // Multi-client safety: look up a handler for the requested
1727            // tool name. If this client has no handler installed for that
1728            // tool, don't respond — another connected client may have one.
1729            let tool_handler = if data.tool_name.is_empty() {
1730                None
1731            } else {
1732                handlers.tools.get(&data.tool_name).cloned()
1733            };
1734            let Some(tool_handler) = tool_handler else {
1735                return;
1736            };
1737            let client = client.clone();
1738            let sid = session_id.clone();
1739            let span = tracing::error_span!(
1740                "external_tool_handler",
1741                session_id = %sid,
1742                request_id = %request_id
1743            );
1744            tokio::spawn(
1745                async move {
1746                    // `tool_name.is_empty()` would have produced a `None`
1747                    // lookup in `handlers.tools` and short-circuited at the
1748                    // outer guard above, so only the tool_call_id check is
1749                    // reachable here.
1750                    if data.tool_call_id.is_empty() {
1751                        let error_msg = "Missing toolCallId";
1752                        let rpc_start = Instant::now();
1753                        let _ = client
1754                            .call(
1755                                "session.tools.handlePendingToolCall",
1756                                Some(serde_json::json!({
1757                                    "sessionId": sid,
1758                                    "requestId": request_id,
1759                                    "error": error_msg,
1760                                })),
1761                            )
1762                            .await;
1763                        tracing::debug!(
1764                            elapsed_ms = rpc_start.elapsed().as_millis(),
1765                            session_id = %sid,
1766                            request_id = %request_id,
1767                            "Session::handle_notification response sent successfully"
1768                        );
1769                        return;
1770                    }
1771                    let tool_call_id = data.tool_call_id.clone();
1772                    let tool_name = data.tool_name.clone();
1773                    let invocation = ToolInvocation {
1774                        session_id: sid.clone(),
1775                        tool_call_id: data.tool_call_id,
1776                        tool_name: data.tool_name,
1777                        arguments: data
1778                            .arguments
1779                            .unwrap_or(Value::Object(serde_json::Map::new())),
1780                        traceparent: data.traceparent,
1781                        tracestate: data.tracestate,
1782                    };
1783                    let handler_start = Instant::now();
1784                    let tool_result = match tool_handler.call(invocation).await {
1785                        Ok(r) => r,
1786                        Err(e) => tool_failure_result(e.to_string()),
1787                    };
1788                    tracing::debug!(
1789                        elapsed_ms = handler_start.elapsed().as_millis(),
1790                        session_id = %sid,
1791                        request_id = %request_id,
1792                        tool_call_id = %tool_call_id,
1793                        tool_name = %tool_name,
1794                        "ToolHandler::call dispatch"
1795                    );
1796                    let result_value = serde_json::to_value(tool_result).unwrap_or(Value::Null);
1797                    let rpc_start = Instant::now();
1798                    let _ = client
1799                        .call(
1800                            "session.tools.handlePendingToolCall",
1801                            Some(serde_json::json!({
1802                                "sessionId": sid,
1803                                "requestId": request_id,
1804                                "result": result_value,
1805                            })),
1806                        )
1807                        .await;
1808                    tracing::debug!(
1809                        elapsed_ms = rpc_start.elapsed().as_millis(),
1810                        session_id = %sid,
1811                        request_id = %request_id,
1812                        tool_call_id = %tool_call_id,
1813                        tool_name = %tool_name,
1814                        "Session::handle_notification response sent successfully"
1815                    );
1816                }
1817                .instrument(span),
1818            );
1819        }
1820        SessionEventType::UserInputRequested => {
1821            // Notification-only signal for observers (UI, telemetry).
1822            // The CLI follows up with a `userInput.request` JSON-RPC call
1823            // that drives the `UserInputHandler` dispatch — handling
1824            // the notification here too would double-fire the handler
1825            // and produce duplicate prompts on the consumer side. See
1826            // github/github-app#4249.
1827        }
1828        SessionEventType::ElicitationRequested => {
1829            let Some(request_id) = extract_request_id(&notification.event.data) else {
1830                return;
1831            };
1832            // Multi-client safety: if this client has no elicitation
1833            // handler installed, don't respond — another client on the
1834            // same CLI may handle it.
1835            let Some(elicitation_handler) = handlers.elicitation.clone() else {
1836                return;
1837            };
1838            let elicitation_data: ElicitationRequestedData =
1839                match serde_json::from_value(notification.event.data.clone()) {
1840                    Ok(d) => d,
1841                    Err(e) => {
1842                        warn!(error = %e, "failed to deserialize elicitation request");
1843                        return;
1844                    }
1845                };
1846            let request = ElicitationRequest {
1847                message: elicitation_data.message,
1848                requested_schema: elicitation_data
1849                    .requested_schema
1850                    .map(|s| serde_json::to_value(s).unwrap_or(Value::Null)),
1851                mode: elicitation_data.mode.map(|m| match m {
1852                    crate::generated::session_events::ElicitationRequestedMode::Form => {
1853                        crate::types::ElicitationMode::Form
1854                    }
1855                    crate::generated::session_events::ElicitationRequestedMode::Url => {
1856                        crate::types::ElicitationMode::Url
1857                    }
1858                    _ => crate::types::ElicitationMode::Unknown,
1859                }),
1860                elicitation_source: elicitation_data.elicitation_source,
1861                url: elicitation_data.url,
1862            };
1863            let client = client.clone();
1864            let sid = session_id.clone();
1865            let span = tracing::error_span!(
1866                "elicitation_request_handler",
1867                session_id = %sid,
1868                request_id = %request_id
1869            );
1870            tokio::spawn(
1871                async move {
1872                    let cancel = ElicitationResult {
1873                        action: "cancel".to_string(),
1874                        content: None,
1875                    };
1876                    // Dispatch to a nested task so panics are caught as JoinErrors.
1877                    let handler_task = tokio::spawn({
1878                        let sid = sid.clone();
1879                        let request_id = request_id.clone();
1880                        let span = tracing::error_span!(
1881                            "elicitation_callback",
1882                            session_id = %sid,
1883                            request_id = %request_id
1884                        );
1885                        async move {
1886                            let handler_start = Instant::now();
1887                            let response = elicitation_handler
1888                                .handle(sid.clone(), request_id.clone(), request)
1889                                .await;
1890                            tracing::debug!(
1891                                elapsed_ms = handler_start.elapsed().as_millis(),
1892                                session_id = %sid,
1893                                request_id = %request_id,
1894                                "ElicitationHandler::handle dispatch"
1895                            );
1896                            response
1897                        }
1898                        .instrument(span)
1899                    });
1900                    let result = match handler_task.await {
1901                        Ok(r) => r,
1902                        Err(_) => cancel.clone(),
1903                    };
1904                    let rpc_start = Instant::now();
1905                    if let Err(e) = client
1906                        .call(
1907                            "session.ui.handlePendingElicitation",
1908                            Some(serde_json::json!({
1909                                "sessionId": sid,
1910                                "requestId": request_id,
1911                                "result": result,
1912                            })),
1913                        )
1914                        .await
1915                    {
1916                        // RPC failed — attempt cancel as last resort
1917                        warn!(error = %e, "handlePendingElicitation failed, sending cancel");
1918                        let _ = client
1919                            .call(
1920                                "session.ui.handlePendingElicitation",
1921                                Some(serde_json::json!({
1922                                    "sessionId": sid,
1923                                    "requestId": request_id,
1924                                    "result": cancel,
1925                                })),
1926                            )
1927                            .await;
1928                    } else {
1929                        tracing::debug!(
1930                            elapsed_ms = rpc_start.elapsed().as_millis(),
1931                            session_id = %sid,
1932                            request_id = %request_id,
1933                            "Session::handle_notification response sent successfully"
1934                        );
1935                    }
1936                }
1937                .instrument(span),
1938            );
1939        }
1940        SessionEventType::CommandExecute => {
1941            let data: CommandExecuteData =
1942                match serde_json::from_value(notification.event.data.clone()) {
1943                    Ok(d) => d,
1944                    Err(e) => {
1945                        warn!(error = %e, "failed to deserialize command.execute");
1946                        return;
1947                    }
1948                };
1949            let client = client.clone();
1950            let command_handlers = command_handlers.clone();
1951            let sid = session_id.clone();
1952            let span = tracing::error_span!("command_handler", session_id = %sid);
1953            tokio::spawn(
1954                async move {
1955                    let request_id = data.request_id;
1956                    let ack_error = match command_handlers.get(&data.command_name).cloned() {
1957                        None => Some(format!("Unknown command: {}", data.command_name)),
1958                        Some(handler) => {
1959                            let command_name = data.command_name.clone();
1960                            let ctx = CommandContext {
1961                                session_id: sid.clone(),
1962                                command: data.command,
1963                                command_name: data.command_name,
1964                                args: data.args,
1965                            };
1966                            let handler_start = Instant::now();
1967                            let result = handler.on_command(ctx).await;
1968                            tracing::debug!(
1969                                elapsed_ms = handler_start.elapsed().as_millis(),
1970                                session_id = %sid,
1971                                request_id = %request_id,
1972                                command_name = %command_name,
1973                                "CommandHandler::call dispatch"
1974                            );
1975                            match result {
1976                                Ok(()) => None,
1977                                Err(e) => Some(e.to_string()),
1978                            }
1979                        }
1980                    };
1981                    let mut params = serde_json::json!({
1982                        "sessionId": sid,
1983                        "requestId": request_id,
1984                    });
1985                    if let Some(error_msg) = ack_error {
1986                        params["error"] = serde_json::Value::String(error_msg);
1987                    }
1988                    let rpc_start = Instant::now();
1989                    let _ = client
1990                        .call("session.commands.handlePendingCommand", Some(params))
1991                        .await;
1992                    tracing::debug!(
1993                        elapsed_ms = rpc_start.elapsed().as_millis(),
1994                        session_id = %sid,
1995                        request_id = %request_id,
1996                        "Session::handle_notification response sent successfully"
1997                    );
1998                }
1999                .instrument(span),
2000            );
2001        }
2002        _ => {}
2003    }
2004}
2005
2006struct RequestDispatchContext<'a> {
2007    client: &'a Client,
2008    handlers: &'a SessionHandlers,
2009    hooks: Option<&'a dyn SessionHooks>,
2010    transforms: Option<&'a dyn SystemMessageTransform>,
2011    canvas_handler: Option<&'a Arc<dyn CanvasHandler>>,
2012    session_fs_provider: Option<&'a Arc<dyn SessionFsProvider>>,
2013}
2014
2015/// Process a JSON-RPC request from the CLI.
2016async fn handle_request(
2017    session_id: &SessionId,
2018    ctx: RequestDispatchContext<'_>,
2019    request: crate::JsonRpcRequest,
2020) {
2021    let sid = session_id.clone();
2022    let client = ctx.client;
2023    let handlers = ctx.handlers;
2024    let hooks = ctx.hooks;
2025    let transforms = ctx.transforms;
2026    let canvas_handler = ctx.canvas_handler;
2027    let session_fs_provider = ctx.session_fs_provider;
2028
2029    if request.method.starts_with("sessionFs.") {
2030        crate::session_fs_dispatch::dispatch(client, session_fs_provider, request).await;
2031        return;
2032    }
2033
2034    if request.method.starts_with("canvas.") {
2035        crate::canvas_dispatch::dispatch(client, canvas_handler, request).await;
2036        return;
2037    }
2038
2039    match request.method.as_str() {
2040        "hooks.invoke" => {
2041            let params = request.params.as_ref();
2042            let hook_type = params
2043                .and_then(|p| p.get("hookType"))
2044                .and_then(|v| v.as_str())
2045                .unwrap_or("");
2046            let input = params
2047                .and_then(|p| p.get("input"))
2048                .cloned()
2049                .unwrap_or(Value::Object(Default::default()));
2050
2051            let rpc_result = if let Some(hooks) = hooks {
2052                match crate::hooks::dispatch_hook(hooks, &sid, hook_type, input).await {
2053                    Ok(output) => output,
2054                    Err(e) => {
2055                        warn!(error = %e, hook_type = hook_type, "hook dispatch failed");
2056                        serde_json::json!({ "output": {} })
2057                    }
2058                }
2059            } else {
2060                serde_json::json!({ "output": {} })
2061            };
2062
2063            let rpc_response = JsonRpcResponse {
2064                jsonrpc: "2.0".to_string(),
2065                id: request.id,
2066                result: Some(rpc_result),
2067                error: None,
2068            };
2069            let _ = client.send_response(&rpc_response).await;
2070        }
2071
2072        "userInput.request" => {
2073            let params = request.params.as_ref();
2074            let Some(question) = params
2075                .and_then(|p| p.get("question"))
2076                .and_then(|v| v.as_str())
2077            else {
2078                warn!("userInput.request missing 'question' field");
2079                let rpc_response = JsonRpcResponse {
2080                    jsonrpc: "2.0".to_string(),
2081                    id: request.id,
2082                    result: None,
2083                    error: Some(crate::JsonRpcError {
2084                        code: error_codes::INVALID_PARAMS,
2085                        message: "missing required field: question".to_string(),
2086                        data: None,
2087                    }),
2088                };
2089                let _ = client.send_response(&rpc_response).await;
2090                return;
2091            };
2092            let question = question.to_string();
2093            let choices = params
2094                .and_then(|p| p.get("choices"))
2095                .and_then(|v| v.as_array())
2096                .map(|arr| {
2097                    arr.iter()
2098                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
2099                        .collect()
2100                });
2101            let allow_freeform = params
2102                .and_then(|p| p.get("allowFreeform"))
2103                .and_then(|v| v.as_bool());
2104
2105            let handler_start = Instant::now();
2106            let response = if let Some(user_input_handler) = handlers.user_input.as_ref() {
2107                user_input_handler
2108                    .handle(sid.clone(), question, choices, allow_freeform)
2109                    .await
2110            } else {
2111                None
2112            };
2113            tracing::debug!(
2114                elapsed_ms = handler_start.elapsed().as_millis(),
2115                session_id = %sid,
2116                "UserInputHandler::handle dispatch"
2117            );
2118
2119            let rpc_result = match response {
2120                Some(UserInputResponse {
2121                    answer,
2122                    was_freeform,
2123                }) => serde_json::json!({
2124                    "answer": answer,
2125                    "wasFreeform": was_freeform,
2126                }),
2127                None => serde_json::json!({ "noResponse": true }),
2128            };
2129            let rpc_response = JsonRpcResponse {
2130                jsonrpc: "2.0".to_string(),
2131                id: request.id,
2132                result: Some(rpc_result),
2133                error: None,
2134            };
2135            let _ = client.send_response(&rpc_response).await;
2136        }
2137
2138        "exitPlanMode.request" => {
2139            let params = request
2140                .params
2141                .as_ref()
2142                .cloned()
2143                .unwrap_or(Value::Object(serde_json::Map::new()));
2144            let data: ExitPlanModeData = match serde_json::from_value(params) {
2145                Ok(d) => d,
2146                Err(e) => {
2147                    warn!(error = %e, "failed to deserialize exitPlanMode.request params, using defaults");
2148                    ExitPlanModeData::default()
2149                }
2150            };
2151
2152            let rpc_result = if let Some(exit_plan_handler) = handlers.exit_plan_mode.as_ref() {
2153                let result = exit_plan_handler.handle(sid, data).await;
2154                serde_json::to_value(result).expect("ExitPlanModeResult serialization cannot fail")
2155            } else {
2156                serde_json::json!({ "approved": true })
2157            };
2158            let rpc_response = JsonRpcResponse {
2159                jsonrpc: "2.0".to_string(),
2160                id: request.id,
2161                result: Some(rpc_result),
2162                error: None,
2163            };
2164            let _ = client.send_response(&rpc_response).await;
2165        }
2166
2167        "autoModeSwitch.request" => {
2168            let error_code = request
2169                .params
2170                .as_ref()
2171                .and_then(|p| p.get("errorCode"))
2172                .and_then(|v| v.as_str())
2173                .map(|s| s.to_string());
2174            let retry_after_seconds = request
2175                .params
2176                .as_ref()
2177                .and_then(|p| p.get("retryAfterSeconds"))
2178                .and_then(|v| v.as_f64());
2179
2180            let answer = if let Some(auto_mode_handler) = handlers.auto_mode_switch.as_ref() {
2181                auto_mode_handler
2182                    .handle(sid, error_code, retry_after_seconds)
2183                    .await
2184            } else {
2185                AutoModeSwitchResponse::No
2186            };
2187            let rpc_response = JsonRpcResponse {
2188                jsonrpc: "2.0".to_string(),
2189                id: request.id,
2190                result: Some(serde_json::json!({ "response": answer })),
2191                error: None,
2192            };
2193            let _ = client.send_response(&rpc_response).await;
2194        }
2195
2196        "systemMessage.transform" => {
2197            let params = request.params.as_ref();
2198            let sections: HashMap<String, crate::transforms::TransformSection> =
2199                match params.and_then(|p| p.get("sections")) {
2200                    Some(v) => match serde_json::from_value(v.clone()) {
2201                        Ok(s) => s,
2202                        Err(e) => {
2203                            let _ = send_error_response(
2204                                client,
2205                                request.id,
2206                                error_codes::INVALID_PARAMS,
2207                                &format!("invalid sections: {e}"),
2208                            )
2209                            .await;
2210                            return;
2211                        }
2212                    },
2213                    None => {
2214                        let _ = send_error_response(
2215                            client,
2216                            request.id,
2217                            error_codes::INVALID_PARAMS,
2218                            "missing sections parameter",
2219                        )
2220                        .await;
2221                        return;
2222                    }
2223                };
2224
2225            let rpc_result = if let Some(transforms) = transforms {
2226                let transform_start = Instant::now();
2227                let response =
2228                    crate::transforms::dispatch_transform(transforms, &sid, sections).await;
2229                tracing::debug!(
2230                    elapsed_ms = transform_start.elapsed().as_millis(),
2231                    session_id = %sid,
2232                    "SystemMessageTransform::transform_section dispatch"
2233                );
2234                match serde_json::to_value(response) {
2235                    Ok(v) => v,
2236                    Err(e) => {
2237                        warn!(error = %e, "failed to serialize transform response");
2238                        serde_json::json!({ "sections": {} })
2239                    }
2240                }
2241            } else {
2242                // No transforms registered — pass through all sections unchanged.
2243                let passthrough: HashMap<String, crate::transforms::TransformSection> = sections;
2244                serde_json::json!({ "sections": passthrough })
2245            };
2246
2247            let rpc_response = JsonRpcResponse {
2248                jsonrpc: "2.0".to_string(),
2249                id: request.id,
2250                result: Some(rpc_result),
2251                error: None,
2252            };
2253            let _ = client.send_response(&rpc_response).await;
2254        }
2255
2256        method => {
2257            warn!(
2258                method = method,
2259                "unhandled request method in session event loop"
2260            );
2261            let _ = send_error_response(
2262                client,
2263                request.id,
2264                error_codes::METHOD_NOT_FOUND,
2265                &format!("unknown method: {method}"),
2266            )
2267            .await;
2268        }
2269    }
2270}
2271
2272async fn send_error_response(
2273    client: &Client,
2274    id: u64,
2275    code: i32,
2276    message: &str,
2277) -> Result<(), Error> {
2278    let response = JsonRpcResponse {
2279        jsonrpc: "2.0".to_string(),
2280        id,
2281        result: None,
2282        error: Some(crate::JsonRpcError {
2283            code,
2284            message: message.to_string(),
2285            data: None,
2286        }),
2287    };
2288    client.send_response(&response).await
2289}
2290
2291/// Inject `action: "transform"` sections into a `SystemMessageConfig`,
2292/// forcing `mode: "customize"` (required by the CLI for transforms to fire).
2293/// Preserves any existing caller-provided section overrides.
2294fn apply_transform_sections(
2295    sys_msg: &mut SystemMessageConfig,
2296    transforms: &dyn SystemMessageTransform,
2297) {
2298    sys_msg.mode = Some("customize".to_string());
2299    let sections = sys_msg.sections.get_or_insert_with(HashMap::new);
2300    for id in transforms.section_ids() {
2301        sections.entry(id).or_insert_with(|| SectionOverride {
2302            action: Some("transform".to_string()),
2303            content: None,
2304        });
2305    }
2306}
2307
2308fn inject_transform_sections(config: &mut SessionConfig, transforms: &dyn SystemMessageTransform) {
2309    let sys_msg = config.system_message.get_or_insert_with(Default::default);
2310    apply_transform_sections(sys_msg, transforms);
2311}
2312
2313fn inject_transform_sections_resume(
2314    config: &mut ResumeSessionConfig,
2315    transforms: &dyn SystemMessageTransform,
2316) {
2317    let sys_msg = config.system_message.get_or_insert_with(Default::default);
2318    apply_transform_sections(sys_msg, transforms);
2319}
2320
2321#[cfg(test)]
2322mod tests {
2323    use serde_json::json;
2324
2325    use super::notification_permission_payload;
2326    use crate::handler::PermissionResult;
2327
2328    #[test]
2329    fn notification_payload_suppresses_no_result() {
2330        assert!(notification_permission_payload(&PermissionResult::NoResult).is_none());
2331    }
2332
2333    #[test]
2334    fn notification_payload_serializes_decisions() {
2335        assert_eq!(
2336            notification_permission_payload(&PermissionResult::approve_once()),
2337            Some(json!({ "kind": "approve-once" }))
2338        );
2339        assert_eq!(
2340            notification_permission_payload(&PermissionResult::reject(None)),
2341            Some(json!({ "kind": "reject" }))
2342        );
2343        assert_eq!(
2344            notification_permission_payload(&PermissionResult::reject(Some("bad".to_string()))),
2345            Some(json!({ "kind": "reject", "feedback": "bad" }))
2346        );
2347        assert_eq!(
2348            notification_permission_payload(&PermissionResult::user_not_available()),
2349            Some(json!({ "kind": "user-not-available" }))
2350        );
2351    }
2352}