Skip to main content

github_copilot_sdk/
session.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use parking_lot::Mutex as ParkingLotMutex;
7use serde_json::Value;
8use tokio::sync::oneshot;
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tracing::{Instrument, warn};
12
13use crate::canvas::CanvasHandler;
14use crate::generated::api_types::{
15    LogRequest, ModelSwitchToRequest, OpenCanvasInstance, RegisterEventInterestParams, rpc_methods,
16};
17use crate::generated::session_events::{
18    CommandExecuteData, ElicitationRequestedData, ExternalToolRequestedData, McpOauthRequiredData,
19    SessionCanvasClosedData, SessionErrorData, SessionEventType,
20};
21use crate::handler::{
22    AutoModeSwitchHandler, AutoModeSwitchResponse, ElicitationHandler, ExitPlanModeHandler,
23    McpAuthHandler, McpAuthRequest, McpAuthResult, PermissionHandler, PermissionResult,
24    UserInputHandler, UserInputResponse,
25};
26use crate::hooks::SessionHooks;
27use crate::provider_token::BearerTokenProvider;
28use crate::session_fs::SessionFsProvider;
29use crate::trace_context::inject_trace_context;
30use crate::transforms::SystemMessageTransform;
31use crate::types::{
32    CommandContext, CommandDefinition, CommandHandler, CreateSessionResult, ElicitationRequest,
33    ElicitationResult, ExitPlanModeData, GetMessagesResponse, MessageOptions,
34    PermissionRequestData, RequestId, ResumeSessionConfig, ResumeSessionResult, SectionOverride,
35    SessionCapabilities, SessionConfig, SessionEvent, SessionId, SetModelOptions,
36    SystemMessageConfig, ToolInvocation, ToolResult, ToolResultExpanded, TraceContext,
37    UiInputOptions, ensure_attachment_display_names,
38};
39use crate::{
40    Client, Error, ErrorKind, JsonRpcResponse, SessionErrorKind, SessionEventNotification,
41    error_codes,
42};
43
44/// Bundle of the per-session callbacks the SDK dispatches to. Built from a
45/// [`SessionConfig`] / [`ResumeSessionConfig`] at
46/// [`Client::create_session`] / [`Client::resume_session`] time. Each
47/// field is `None` (or an empty map for tools) when the caller didn't
48/// install a handler -- in that case the SDK skips dispatch for that
49/// event type. The wire flags on `session.create` / `session.resume`
50/// are derived from these fields.
51#[derive(Clone)]
52pub(crate) struct SessionHandlers {
53    pub permission: Option<Arc<dyn PermissionHandler>>,
54    pub elicitation: Option<Arc<dyn ElicitationHandler>>,
55    pub mcp_auth: Option<Arc<dyn McpAuthHandler>>,
56    pub user_input: Option<Arc<dyn UserInputHandler>>,
57    pub exit_plan_mode: Option<Arc<dyn ExitPlanModeHandler>>,
58    pub auto_mode_switch: Option<Arc<dyn AutoModeSwitchHandler>>,
59    pub tools: Arc<HashMap<String, Arc<dyn crate::tool::ToolHandler>>>,
60}
61
62/// Shared state between a [`Session`] and its event loop, used by [`Session::send_and_wait`].
63struct IdleWaiter {
64    tx: oneshot::Sender<Result<Option<SessionEvent>, Error>>,
65    last_assistant_message: Option<SessionEvent>,
66    started_at: Instant,
67    first_assistant_message_seen: bool,
68}
69
70/// RAII guard that clears the [`Session::idle_waiter`] slot on drop. Used
71/// by [`Session::send_and_wait`] to ensure the slot doesn't leak if the
72/// caller's future is cancelled (outer `tokio::time::timeout` / `select!`
73/// / dropped JoinHandle). Synchronous clear via `parking_lot::Mutex` —
74/// no async drop needed.
75///
76/// Without this, an outer cancellation between "install waiter" and
77/// "drain channel" would leave the slot occupied, causing all subsequent
78/// `send` and `send_and_wait` calls on the session to return
79/// [`SendWhileWaiting`](SessionErrorKind::SendWhileWaiting). Closes RFD-400
80/// review finding #2.
81struct WaiterGuard {
82    slot: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
83}
84
85impl Drop for WaiterGuard {
86    fn drop(&mut self) {
87        self.slot.lock().take();
88    }
89}
90
91struct PendingSessionRegistration {
92    client: Client,
93    session_id: SessionId,
94    shutdown: CancellationToken,
95    disarmed: bool,
96}
97
98impl PendingSessionRegistration {
99    fn new(client: Client, session_id: SessionId, shutdown: CancellationToken) -> Self {
100        Self {
101            client,
102            session_id,
103            shutdown,
104            disarmed: false,
105        }
106    }
107
108    async fn cleanup(mut self, event_loop: JoinHandle<()>) {
109        self.shutdown.cancel();
110        let _ = event_loop.await;
111        self.client.unregister_session(&self.session_id);
112        self.disarmed = true;
113    }
114
115    fn disarm(&mut self) {
116        self.disarmed = true;
117    }
118}
119
120impl Drop for PendingSessionRegistration {
121    fn drop(&mut self) {
122        if !self.disarmed {
123            self.shutdown.cancel();
124            self.client.unregister_session(&self.session_id);
125        }
126    }
127}
128
129/// A session on a GitHub Copilot CLI server.
130///
131/// Created via [`Client::create_session`] or [`Client::resume_session`].
132/// Owns an internal event loop that dispatches events to the per-callback
133/// handlers installed on the session config.
134///
135/// Protocol methods (`send`, `get_events`, `abort`, etc.) automatically
136/// inject the session ID into RPC params.
137///
138/// Call [`destroy`](Self::destroy) for graceful cleanup (RPC + local). If dropped
139/// without calling `destroy`, the `Drop` impl aborts the event loop and
140/// unregisters from the router as a best-effort safety net.
141pub struct Session {
142    id: SessionId,
143    cwd: PathBuf,
144    workspace_path: Option<PathBuf>,
145    remote_url: Option<String>,
146    client: Client,
147    /// Handle to the spawned event-loop task. Sync `parking_lot::Mutex`
148    /// because the lock is never held across an `.await` and the `Drop`
149    /// impl needs to take the handle synchronously without `try_lock`
150    /// fallibility.
151    event_loop: ParkingLotMutex<Option<JoinHandle<()>>>,
152    /// Cooperative shutdown signal for the event loop. The loop selects
153    /// on [`shutdown.cancelled()`](CancellationToken::cancelled) alongside
154    /// its inbound channels; [`Session::stop_event_loop`] and [`Drop`]
155    /// both call [`cancel()`](CancellationToken::cancel) to ask the loop
156    /// to exit between iterations rather than aborting the task (which
157    /// can land at any await point and leave the session mid-protocol).
158    /// See RFD-400 review finding #3.
159    ///
160    /// `CancellationToken` is the canonical signalling primitive in
161    /// `tokio_util`; it is what `tonic` uses for the equivalent task-
162    /// coordination case. Advanced consumers can obtain a child token
163    /// via [`Session::cancellation_token`] to bind their own work to
164    /// the session lifetime.
165    shutdown: CancellationToken,
166    /// Only populated while a `send_and_wait` call is in flight.
167    ///
168    /// Sync `parking_lot::Mutex` because the lock is never held across an
169    /// `.await`, and synchronous access lets the `WaiterGuard` RAII helper
170    /// in `send_and_wait` clear the slot from a `Drop` impl on caller-side
171    /// cancellation. See RFD-400 review (cancel-safety hardening).
172    idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
173    /// Capabilities negotiated with the CLI, updated on `capabilities.changed` events.
174    capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
175    /// Canvas instances currently known to be open for this session.
176    open_canvases: Arc<parking_lot::RwLock<Vec<OpenCanvasInstance>>>,
177    /// Broadcast channel for runtime event subscribers — see [`Session::subscribe`].
178    event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
179}
180
181impl Session {
182    /// Session ID assigned by the CLI.
183    pub fn id(&self) -> &SessionId {
184        &self.id
185    }
186
187    /// Working directory of the CLI process.
188    pub fn cwd(&self) -> &PathBuf {
189        &self.cwd
190    }
191
192    /// Workspace directory for the session (if using infinite sessions).
193    pub fn workspace_path(&self) -> Option<&Path> {
194        self.workspace_path.as_deref()
195    }
196
197    /// Remote session URL, if the session is running remotely.
198    pub fn remote_url(&self) -> Option<&str> {
199        self.remote_url.as_deref()
200    }
201
202    /// Session capabilities negotiated with the CLI.
203    ///
204    /// Capabilities are set during session creation and updated at runtime
205    /// via `capabilities.changed` events.
206    pub fn capabilities(&self) -> SessionCapabilities {
207        self.capabilities.read().clone()
208    }
209
210    /// Open canvas instances reported by the most recent `session.resume`
211    /// response or surfaced by inbound `canvas.opened` events.
212    pub fn open_canvases(&self) -> Vec<OpenCanvasInstance> {
213        self.open_canvases.read().clone()
214    }
215
216    /// Returns a [`CancellationToken`] that fires when this session shuts
217    /// down (via [`Session::stop_event_loop`], [`Session::destroy`], or
218    /// [`Drop`]).
219    ///
220    /// Use this to bind an external task's lifetime to the session — when
221    /// the session shuts down, awaiting [`cancelled()`](CancellationToken::cancelled)
222    /// resolves so cooperative consumers can stop cleanly.
223    ///
224    /// The returned handle is a *child* token: calling
225    /// [`cancel()`](CancellationToken::cancel) on it cancels only the
226    /// caller's child, not the session itself. To cancel the session, call
227    /// [`Session::stop_event_loop`].
228    ///
229    /// # Example
230    ///
231    /// ```no_run
232    /// # async fn example(session: github_copilot_sdk::session::Session) {
233    /// let token = session.cancellation_token();
234    /// tokio::select! {
235    ///     _ = token.cancelled() => println!("session shut down"),
236    ///     _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
237    ///         println!("60s elapsed, session still alive");
238    ///     }
239    /// }
240    /// # }
241    /// ```
242    pub fn cancellation_token(&self) -> CancellationToken {
243        self.shutdown.child_token()
244    }
245
246    /// Subscribe to events for this session.
247    ///
248    /// Returns an [`EventSubscription`](crate::subscription::EventSubscription)
249    /// that yields every [`SessionEvent`] dispatched on this session's
250    /// event loop. Drop the value to unsubscribe; there is no separate
251    /// cancel handle.
252    ///
253    /// **Observe-only.** Subscribers receive a clone of every
254    /// [`SessionEvent`] but cannot influence permission decisions, tool
255    /// results, or anything else that requires returning a value. Those
256    /// remain the responsibility of the per-callback handlers passed via
257    /// [`SessionConfig`]'s `with_*_handler`
258    /// builder methods.
259    ///
260    /// The returned handle implements both an inherent
261    /// [`recv`](crate::subscription::EventSubscription::recv) method and
262    /// [`Stream`](tokio_stream::Stream), so callers can use a `while let`
263    /// loop or any combinator from `tokio_stream::StreamExt` /
264    /// `futures::StreamExt`.
265    ///
266    /// Each subscriber maintains its own queue. If a consumer cannot keep
267    /// up, the oldest events are dropped and `recv` returns
268    /// [`RecvErrorKind::Lagged`](crate::subscription::RecvErrorKind::Lagged)
269    /// reporting the count of skipped events. Slow consumers do not block
270    /// the session's event loop.
271    ///
272    /// # Example
273    ///
274    /// ```no_run
275    /// # async fn example(session: github_copilot_sdk::session::Session) {
276    /// let mut events = session.subscribe();
277    /// tokio::spawn(async move {
278    ///     while let Ok(event) = events.recv().await {
279    ///         println!("[{}] event {}", event.id, event.event_type);
280    ///     }
281    /// });
282    /// # }
283    /// ```
284    pub fn subscribe(&self) -> crate::subscription::EventSubscription {
285        crate::subscription::EventSubscription::new(self.event_tx.subscribe())
286    }
287
288    /// The underlying Client (for advanced use cases).
289    pub fn client(&self) -> &Client {
290        &self.client
291    }
292
293    /// Typed RPC namespace for this session.
294    ///
295    /// Every protocol method lives here under its schema-aligned path —
296    /// e.g. `session.rpc().workspaces().list_files()`. Wire method names
297    /// and request/response types are generated from the protocol schema,
298    /// so the typed namespace can't drift from the wire contract.
299    ///
300    /// The hand-authored helpers on [`Session`] delegate to this namespace
301    /// and remain the recommended entry point for everyday use; reach for
302    /// `rpc()` when you want a method without a hand-written wrapper.
303    pub fn rpc(&self) -> crate::generated::rpc::SessionRpc<'_> {
304        crate::generated::rpc::SessionRpc { session: self }
305    }
306
307    /// Stop the internal event loop. Called automatically on [`destroy`](Self::destroy).
308    ///
309    /// Cooperative: signals shutdown via the session's [`CancellationToken`]
310    /// and awaits the loop's natural exit rather than aborting the task.
311    /// Any in-flight handler (permission callback, tool call, elicitation
312    /// response) completes before the loop exits, so the CLI never sees a
313    /// half-handled request. See RFD-400 review finding #3.
314    pub async fn stop_event_loop(&self) {
315        self.shutdown.cancel();
316        let handle = self.event_loop.lock().take();
317        if let Some(handle) = handle {
318            let _ = handle.await;
319        }
320        // Fail any pending send_and_wait so it returns immediately.
321        if let Some(waiter) = self.idle_waiter.lock().take() {
322            let _ = waiter.tx.send(Err(
323                ErrorKind::Session(SessionErrorKind::EventLoopClosed).into()
324            ));
325        }
326    }
327
328    /// Send a user message to the agent.
329    ///
330    /// Accepts anything convertible to [`MessageOptions`] — pass a `&str` for the
331    /// trivial case, or build a `MessageOptions` for mode/attachments. The
332    /// `wait_timeout` field on `MessageOptions` is ignored here (use
333    /// [`send_and_wait`](Self::send_and_wait) if you need to wait).
334    ///
335    /// Returns the assigned message ID, which can be used to correlate the
336    /// send with later [`SessionEvent`]s emitted in
337    /// response (assistant messages, tool requests, etc.).
338    ///
339    /// Returns an error if a [`send_and_wait`](Self::send_and_wait) call is
340    /// currently in flight, since the plain send would race with the waiter.
341    ///
342    /// # Cancel safety
343    ///
344    /// **Cancel-safe.** The underlying `session.send` RPC is dispatched
345    /// through the writer-actor (see [`Client::call`](crate::Client::call)),
346    /// so dropping this future after the actor has committed to writing
347    /// will not produce a partial frame on the wire. If the caller's
348    /// future is dropped between "frame enqueued" and "response received",
349    /// the message has already landed on the wire — the agent will process
350    /// it and emit events normally; the caller just won't see the returned
351    /// message ID.
352    pub async fn send(&self, opts: impl Into<MessageOptions>) -> Result<String, Error> {
353        if self.idle_waiter.lock().is_some() {
354            return Err(ErrorKind::Session(SessionErrorKind::SendWhileWaiting).into());
355        }
356        self.send_inner(opts.into()).await
357    }
358
359    async fn send_inner(&self, opts: MessageOptions) -> Result<String, Error> {
360        let mut params = serde_json::json!({
361            "sessionId": self.id,
362            "prompt": opts.prompt,
363        });
364        if let Some(m) = opts.mode {
365            params["mode"] = serde_json::to_value(m)?;
366        }
367        if let Some(am) = opts.agent_mode {
368            params["agentMode"] = serde_json::to_value(am)?;
369        }
370        if let Some(mut a) = opts.attachments {
371            ensure_attachment_display_names(&mut a);
372            params["attachments"] = serde_json::to_value(a)?;
373        }
374        if let Some(headers) = opts.request_headers
375            && !headers.is_empty()
376        {
377            params["requestHeaders"] = serde_json::to_value(headers)?;
378        }
379        if let Some(display_prompt) = opts.display_prompt {
380            params["displayPrompt"] = serde_json::to_value(display_prompt)?;
381        }
382        let trace_ctx = if opts.traceparent.is_some() || opts.tracestate.is_some() {
383            TraceContext {
384                traceparent: opts.traceparent,
385                tracestate: opts.tracestate,
386            }
387        } else {
388            self.client.resolve_trace_context().await
389        };
390        inject_trace_context(&mut params, &trace_ctx);
391        let rpc_start = Instant::now();
392        let result = self.client.call("session.send", Some(params)).await?;
393        let message_id = result
394            .get("messageId")
395            .and_then(|v| v.as_str())
396            .map(|s| s.to_string())
397            .unwrap_or_default();
398        tracing::debug!(
399            elapsed_ms = rpc_start.elapsed().as_millis(),
400            session_id = %self.id,
401            message_id = %message_id,
402            "Session::send completed successfully"
403        );
404        Ok(message_id)
405    }
406
407    /// Send a user message and wait for the agent to finish processing.
408    ///
409    /// Accepts anything convertible to [`MessageOptions`] — pass a `&str` for the
410    /// trivial case, or build a `MessageOptions` for mode/attachments/timeout.
411    /// Blocks until `session.idle` (success) or `session.error` (failure),
412    /// returning the last `assistant.message` event captured during streaming.
413    /// Times out after `MessageOptions::wait_timeout` (default 60 seconds).
414    ///
415    /// Only one `send_and_wait` call may be active per session at a time.
416    /// Calling [`send`](Self::send) while a `send_and_wait`
417    /// is in flight will also return an error.
418    ///
419    /// # Cancel safety
420    ///
421    /// **Cancel-safe.** A `WaiterGuard` clears the in-flight slot on every
422    /// exit path (success, internal failure, internal timeout, *and*
423    /// external cancellation via `tokio::time::timeout` / `select!` /
424    /// dropped JoinHandle). Subsequent `send` and `send_and_wait` calls on
425    /// this session will succeed normally — the slot is never leaked.
426    pub async fn send_and_wait(
427        &self,
428        opts: impl Into<MessageOptions>,
429    ) -> Result<Option<SessionEvent>, Error> {
430        let total_start = Instant::now();
431        let opts = opts.into();
432        let timeout_duration = opts.wait_timeout.unwrap_or(Duration::from_secs(60));
433        let (tx, rx) = oneshot::channel();
434
435        {
436            let mut guard = self.idle_waiter.lock();
437            if guard.is_some() {
438                return Err(ErrorKind::Session(SessionErrorKind::SendWhileWaiting).into());
439            }
440            *guard = Some(IdleWaiter {
441                tx,
442                last_assistant_message: None,
443                started_at: total_start,
444                first_assistant_message_seen: false,
445            });
446        }
447
448        // RAII: clears the idle_waiter slot on every exit path, including
449        // external cancellation (caller's outer `select!` / `timeout` /
450        // dropped future). Without this, an outer cancellation would leak
451        // the slot and brick subsequent `send`/`send_and_wait` calls.
452        let _waiter_guard = WaiterGuard {
453            slot: self.idle_waiter.clone(),
454        };
455
456        let result = tokio::time::timeout(timeout_duration, async {
457            self.send_inner(opts).await?;
458            match rx.await {
459                Ok(result) => result,
460                Err(_) => Err(ErrorKind::Session(SessionErrorKind::EventLoopClosed).into()),
461            }
462        })
463        .await;
464
465        match result {
466            Ok(inner) => {
467                tracing::debug!(
468                    elapsed_ms = total_start.elapsed().as_millis(),
469                    session_id = %self.id,
470                    completed_by = if inner.is_ok() { "idle" } else { "error" },
471                    "Session::send_and_wait complete"
472                );
473                inner
474            }
475            Err(_) => {
476                tracing::warn!(
477                    elapsed_ms = total_start.elapsed().as_millis(),
478                    session_id = %self.id,
479                    completed_by = "timeout",
480                    "Session::send_and_wait failed"
481                );
482                Err(ErrorKind::Session(SessionErrorKind::Timeout(timeout_duration)).into())
483            }
484        }
485    }
486
487    /// Retrieve the session's timeline events.
488    pub async fn get_events(&self) -> Result<Vec<SessionEvent>, Error> {
489        let result = self
490            .client
491            .call(
492                "session.getMessages",
493                Some(serde_json::json!({ "sessionId": self.id })),
494            )
495            .await?;
496        let response: GetMessagesResponse = serde_json::from_value(result)?;
497        Ok(response.events)
498    }
499
500    /// Deprecated alias for [`get_events`](Self::get_events).
501    #[deprecated(since = "0.1.0", note = "Use `get_events()` instead")]
502    pub async fn get_messages(&self) -> Result<Vec<SessionEvent>, Error> {
503        self.get_events().await
504    }
505
506    /// Abort the current agent turn.
507    ///
508    /// # Cancel safety
509    ///
510    /// **Cancel-safe.** Single `session.abort` RPC; the underlying
511    /// [`Client::call`](crate::Client::call) is cancel-safe via the
512    /// writer-actor.
513    pub async fn abort(&self) -> Result<(), Error> {
514        self.client
515            .call(
516                "session.abort",
517                Some(serde_json::json!({ "sessionId": self.id })),
518            )
519            .await?;
520        Ok(())
521    }
522
523    /// Switch to a different model.
524    ///
525    /// Pass `None` for `opts` if no extra configuration is needed.
526    pub async fn set_model(&self, model: &str, opts: Option<SetModelOptions>) -> Result<(), Error> {
527        let opts = opts.unwrap_or_default();
528        let request = ModelSwitchToRequest {
529            model_id: model.to_string(),
530            reasoning_effort: opts.reasoning_effort,
531            reasoning_summary: opts.reasoning_summary,
532            context_tier: opts.context_tier,
533            model_capabilities: opts.model_capabilities,
534        };
535        self.rpc().model().switch_to(request).await?;
536        Ok(())
537    }
538
539    /// Disconnect this session from the CLI.
540    ///
541    /// Sends the `session.destroy` RPC, stops the event loop, and unregisters
542    /// the session from the client. **Session state on disk** (conversation
543    /// history, planning state, artifacts) is **preserved**, so the
544    /// conversation can be resumed later via [`Client::resume_session`]
545    /// using this session's ID. To permanently remove all on-disk session
546    /// data, use [`Client::delete_session`] instead.
547    ///
548    /// The caller should ensure the session is idle (e.g. [`send_and_wait`]
549    /// has returned) before disconnecting; in-flight tool or event handlers
550    /// may otherwise observe failures.
551    ///
552    /// [`Client::resume_session`]: crate::Client::resume_session
553    /// [`Client::delete_session`]: crate::Client::delete_session
554    /// [`send_and_wait`]: Self::send_and_wait
555    pub async fn disconnect(&self) -> Result<(), Error> {
556        self.client
557            .call(
558                "session.destroy",
559                Some(serde_json::json!({ "sessionId": self.id })),
560            )
561            .await?;
562        self.stop_event_loop().await;
563        self.client.unregister_session(&self.id);
564        Ok(())
565    }
566
567    /// Deprecated alias for [`disconnect`](Self::disconnect). The
568    /// underlying wire RPC happens to be named `session.destroy`, but it
569    /// only severs the connection — on-disk session state is preserved.
570    /// Prefer `disconnect` in new code.
571    #[deprecated(since = "0.1.0", note = "Use `disconnect()` instead")]
572    pub async fn destroy(&self) -> Result<(), Error> {
573        self.disconnect().await
574    }
575
576    /// Write a log message to the session.
577    ///
578    /// Pass `None` for `opts` to use defaults (info level, persisted).
579    pub async fn log(
580        &self,
581        message: &str,
582        opts: Option<crate::types::LogOptions>,
583    ) -> Result<(), Error> {
584        let opts = opts.unwrap_or_default();
585        let level = match opts.level {
586            Some(level) => Some(serde_json::from_value(serde_json::to_value(level)?)?),
587            None => None,
588        };
589        let request = LogRequest {
590            message: message.to_string(),
591            level,
592            ephemeral: opts.ephemeral,
593            r#type: None,
594            tip: None,
595            url: None,
596        };
597        self.rpc().log(request).await?;
598        Ok(())
599    }
600
601    /// Returns the UI sub-API for elicitation, confirmation, selection, and
602    /// free-form input.
603    ///
604    /// All UI methods route through `session.ui.*` RPCs and require host
605    /// support — check `session.capabilities().ui.elicitation` before use.
606    pub fn ui(&self) -> SessionUi<'_> {
607        SessionUi { session: self }
608    }
609
610    /// Returns an error if the host doesn't support elicitation.
611    fn assert_elicitation(&self) -> Result<(), Error> {
612        if self
613            .capabilities
614            .read()
615            .ui
616            .as_ref()
617            .and_then(|u| u.elicitation)
618            != Some(true)
619        {
620            return Err(ErrorKind::Session(SessionErrorKind::ElicitationNotSupported).into());
621        }
622        Ok(())
623    }
624}
625
626impl Drop for Session {
627    fn drop(&mut self) {
628        // Cooperative shutdown: cancel the event loop's token to signal
629        // exit between iterations. The loop will see the cancellation on
630        // its next select poll and break cleanly without interrupting an
631        // in-flight handler. We do NOT abort the JoinHandle — that would
632        // land at any await point in the loop body, potentially leaving
633        // the CLI with an unanswered request id. RFD-400 review finding
634        // #3.
635        //
636        // The handle itself is left in `event_loop` to be reaped by the
637        // tokio runtime when it next polls; we intentionally don't await
638        // it here because Drop is sync.
639        self.shutdown.cancel();
640        self.client.unregister_session(&self.id);
641    }
642}
643
644/// UI sub-API for a [`Session`] — elicitation, confirmation, selection,
645/// and free-form input.
646///
647/// Acquired via [`Session::ui`]. Methods route to `session.ui.*` RPCs and
648/// require host elicitation support — check
649/// `session.capabilities().ui.elicitation` before use.
650pub struct SessionUi<'a> {
651    session: &'a Session,
652}
653
654impl<'a> SessionUi<'a> {
655    /// Request user input via an interactive UI form (elicitation).
656    ///
657    /// Sends a JSON Schema describing form fields to the CLI host. The host
658    /// renders a form dialog and returns the user's response.
659    ///
660    /// Prefer the typed convenience methods [`confirm`](Self::confirm),
661    /// [`select`](Self::select), and [`input`](Self::input) for common cases.
662    pub async fn elicitation(
663        &self,
664        message: &str,
665        schema: Value,
666    ) -> Result<ElicitationResult, Error> {
667        self.session.assert_elicitation()?;
668        let result = self
669            .session
670            .client
671            .call(
672                "session.ui.elicitation",
673                Some(serde_json::json!({
674                    "sessionId": self.session.id,
675                    "message": message,
676                    "requestedSchema": schema,
677                })),
678            )
679            .await?;
680        let elicitation: ElicitationResult = serde_json::from_value(result)?;
681        Ok(elicitation)
682    }
683
684    /// Ask the user a yes/no confirmation question.
685    ///
686    /// Returns `true` if the user accepted and confirmed, `false` otherwise.
687    pub async fn confirm(&self, message: &str) -> Result<bool, Error> {
688        self.session.assert_elicitation()?;
689        let schema = serde_json::json!({
690            "type": "object",
691            "properties": {
692                "confirmed": {
693                    "type": "boolean",
694                    "default": true,
695                }
696            },
697            "required": ["confirmed"]
698        });
699        let result = self.elicitation(message, schema).await?;
700        Ok(result.action == "accept"
701            && result
702                .content
703                .and_then(|c| c.get("confirmed").and_then(|v| v.as_bool()))
704                == Some(true))
705    }
706
707    /// Ask the user to select from a list of options.
708    ///
709    /// Returns the selected option string on accept, or `None` on decline/cancel.
710    pub async fn select(&self, message: &str, options: &[&str]) -> Result<Option<String>, Error> {
711        self.session.assert_elicitation()?;
712        let schema = serde_json::json!({
713            "type": "object",
714            "properties": {
715                "selection": {
716                    "type": "string",
717                    "enum": options,
718                }
719            },
720            "required": ["selection"]
721        });
722        let result = self.elicitation(message, schema).await?;
723        if result.action != "accept" {
724            return Ok(None);
725        }
726        let selection = result.content.and_then(|c| {
727            c.get("selection")
728                .and_then(|v| v.as_str())
729                .map(String::from)
730        });
731        Ok(selection)
732    }
733
734    /// Ask the user for free-form text input.
735    ///
736    /// Returns the input string on accept, or `None` on decline/cancel.
737    /// Use [`UiInputOptions`] to set validation constraints and field metadata.
738    pub async fn input(
739        &self,
740        message: &str,
741        options: Option<&UiInputOptions<'_>>,
742    ) -> Result<Option<String>, Error> {
743        self.session.assert_elicitation()?;
744        let mut field = serde_json::json!({ "type": "string" });
745        if let Some(opts) = options {
746            if let Some(title) = opts.title {
747                field["title"] = Value::String(title.to_string());
748            }
749            if let Some(desc) = opts.description {
750                field["description"] = Value::String(desc.to_string());
751            }
752            if let Some(min) = opts.min_length {
753                field["minLength"] = Value::Number(min.into());
754            }
755            if let Some(max) = opts.max_length {
756                field["maxLength"] = Value::Number(max.into());
757            }
758            if let Some(fmt) = &opts.format {
759                field["format"] = Value::String(fmt.as_str().to_string());
760            }
761            if let Some(default) = opts.default {
762                field["default"] = Value::String(default.to_string());
763            }
764        }
765        let schema = serde_json::json!({
766            "type": "object",
767            "properties": { "value": field },
768            "required": ["value"]
769        });
770        let result = self.elicitation(message, schema).await?;
771        if result.action != "accept" {
772            return Ok(None);
773        }
774        let value = result
775            .content
776            .and_then(|c| c.get("value").and_then(|v| v.as_str()).map(String::from));
777        Ok(value)
778    }
779}
780
781impl Client {
782    /// Create a new session on the CLI.
783    ///
784    /// Sends `session.create`, registers the session on the router,
785    /// and spawns an internal event loop that dispatches to the handler.
786    ///
787    /// All callbacks (per-event handlers, tool handlers, hooks, transform)
788    /// are configured via [`SessionConfig`] using its `with_*_handler` /
789    /// `with_tools` / `with_hooks` / `with_system_message_transform` builder
790    /// methods.
791    ///
792    /// If [`hooks_handler`](SessionConfig::hooks_handler) is set, the
793    /// wire-level `hooks` flag is automatically enabled.
794    ///
795    /// If [`system_message_transform`](SessionConfig::system_message_transform) is set, the SDK injects
796    /// `action: "transform"` sections into the [`SystemMessageConfig`] wire
797    /// format and handles `systemMessage.transform` RPC callbacks during
798    /// the session.
799    ///
800    /// Each per-event handler is independently optional. If a handler is
801    /// not installed, the SDK signals the runtime not to emit the matching
802    /// broadcast (and silently skips dispatch if one arrives anyway).
803    pub async fn create_session(&self, mut config: SessionConfig) -> Result<Session, Error> {
804        let total_start = Instant::now();
805        // For cloud sessions, let the CLI/server assign the session id and
806        // register the session lazily once the response arrives. For non-cloud
807        // sessions we generate the id client-side (when the caller didn't
808        // supply one) so the session can be registered BEFORE the RPC — the
809        // CLI may issue session-scoped requests (e.g. sessionFs.writeFile for
810        // workspace metadata) during session.create processing, before it has
811        // sent the response.
812        let caller_session_id = config.session_id.clone();
813        let use_server_generated_id = config.cloud.is_some() && caller_session_id.is_none();
814        let local_session_id: Option<SessionId> = if use_server_generated_id {
815            None
816        } else {
817            Some(
818                caller_session_id
819                    .clone()
820                    .unwrap_or_else(|| SessionId::new(uuid::Uuid::new_v4().to_string())),
821            )
822        };
823        if config.hooks_handler.is_some() && config.hooks.is_none() {
824            config.hooks = Some(true);
825        }
826        if let Some(transforms) = config.system_message_transform.clone() {
827            inject_transform_sections(&mut config, transforms.as_ref());
828        }
829        let mode = self.inner.mode;
830        if mode == crate::ClientMode::Empty && config.available_tools.is_none() {
831            return Err(Error::with_message(
832                ErrorKind::InvalidConfig,
833                "ClientMode::Empty requires available_tools to be set on the session config. \
834                 Use ToolSet to specify which tools the session may use (e.g. \
835                 ToolSet::new().add_builtin_many(BUILTIN_TOOLS_ISOLATED)).",
836            ));
837        }
838        crate::mode::validate_tool_filter_list(
839            "available_tools",
840            config.available_tools.as_deref(),
841        )?;
842        crate::mode::validate_tool_filter_list("excluded_tools", config.excluded_tools.as_deref())?;
843        config.system_message =
844            crate::mode::system_message_for_mode(mode, config.system_message.take());
845        config.memory = crate::mode::memory_for_mode(mode, config.memory.take());
846        if mode == crate::ClientMode::Empty {
847            if config.enable_session_telemetry.is_none() {
848                config.enable_session_telemetry = Some(false);
849            }
850            if config.skip_embedding_retrieval.is_none() {
851                config.skip_embedding_retrieval = Some(true);
852            }
853            if config.enable_on_demand_instruction_discovery.is_none() {
854                config.enable_on_demand_instruction_discovery = Some(false);
855            }
856            if config.enable_file_hooks.is_none() {
857                config.enable_file_hooks = Some(false);
858            }
859            if config.enable_host_git_operations.is_none() {
860                config.enable_host_git_operations = Some(false);
861            }
862            if config.enable_session_store.is_none() {
863                config.enable_session_store = Some(false);
864            }
865            if config.enable_skills.is_none() {
866                config.enable_skills = Some(false);
867            }
868        }
869        if mode == crate::ClientMode::Empty && config.mcp_oauth_token_storage.is_none() {
870            config.mcp_oauth_token_storage = Some("in-memory".into());
871        }
872        if mode == crate::ClientMode::Empty && config.embedding_cache_storage.is_none() {
873            config.embedding_cache_storage = Some("in-memory".into());
874        }
875        let opt_skip_custom_instructions = config.skip_custom_instructions;
876        let opt_custom_agents_local_only = config.custom_agents_local_only;
877        let opt_coauthor_enabled = config.coauthor_enabled;
878        let opt_manage_schedule_enabled = config.manage_schedule_enabled;
879        let (wire, mut runtime) = config.into_wire(local_session_id.clone())?;
880
881        let permission_handler = crate::permission::resolve_handler(
882            runtime.permission_handler.take(),
883            runtime.permission_policy.take(),
884        );
885        let handlers = SessionHandlers {
886            permission: permission_handler,
887            elicitation: runtime.elicitation_handler.take(),
888            mcp_auth: runtime.mcp_auth_handler.take(),
889            user_input: runtime.user_input_handler.take(),
890            exit_plan_mode: runtime.exit_plan_mode_handler.take(),
891            auto_mode_switch: runtime.auto_mode_switch_handler.take(),
892            tools: Arc::new(std::mem::take(&mut runtime.tool_handlers)),
893        };
894        let hooks = runtime.hooks_handler.take();
895        let transforms = runtime.system_message_transform.take();
896        let tools_count = wire.tools.as_ref().map_or(0, Vec::len);
897        let commands_count = runtime.commands.as_ref().map_or(0, Vec::len);
898        let has_hooks = hooks.is_some();
899        let command_handlers = build_command_handler_map(runtime.commands.as_deref());
900        let canvas_handler = runtime.canvas_handler.take();
901        let session_fs_provider = runtime.session_fs_provider.take();
902        let bearer_token_providers = std::mem::take(&mut runtime.bearer_token_providers);
903        let has_mcp_auth_handler = handlers.mcp_auth.is_some();
904        if self.inner.session_fs_configured && session_fs_provider.is_none() {
905            return Err(ErrorKind::Session(SessionErrorKind::SessionFsProviderRequired).into());
906        }
907        if self.inner.session_fs_sqlite_declared
908            && let Some(ref provider) = session_fs_provider
909            && provider.sqlite().is_none()
910        {
911            return Err(Error::with_message(
912                ErrorKind::InvalidConfig,
913                "SessionFs capabilities declare SQLite support but the provider \
914                 does not implement SessionFsSqliteProvider",
915            ));
916        }
917
918        let mut params = serde_json::to_value(&wire)?;
919        let trace_ctx = self.resolve_trace_context().await;
920        inject_trace_context(&mut params, &trace_ctx);
921
922        let setup_start = Instant::now();
923        let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default()));
924        let idle_waiter = Arc::new(ParkingLotMutex::new(None));
925        let open_canvases = Arc::new(parking_lot::RwLock::new(Vec::new()));
926        let shutdown = CancellationToken::new();
927        let (event_tx, _) = tokio::sync::broadcast::channel(512);
928
929        // For cloud sessions (use_server_generated_id), defer session
930        // registration to the inline callback so the read task registers
931        // the session synchronously the instant the response arrives.
932        // For non-cloud sessions, register up-front so the CLI can issue
933        // session-scoped requests during session.create processing.
934        let inline_stash: Arc<
935            ParkingLotMutex<Option<(SessionId, crate::router::SessionChannels)>>,
936        > = Arc::new(ParkingLotMutex::new(None));
937
938        let inline_callback: Option<crate::jsonrpc::InlineResponseCallback> = if let Some(ref sid) =
939            local_session_id
940        {
941            let channels = self.register_session(sid);
942            *inline_stash.lock() = Some((sid.clone(), channels));
943            None
944        } else {
945            let client = self.clone();
946            let stash = inline_stash.clone();
947            let expected = caller_session_id.clone();
948            Some(Box::new(move |response| {
949                let result = response.result.as_ref().ok_or_else(|| {
950                    Error::with_message(ErrorKind::Json, "session.create response had no result")
951                })?;
952                let parsed: CreateSessionResult =
953                    serde_json::from_value(result.clone()).map_err(Error::from)?;
954                if let Some(requested) = expected.as_ref()
955                    && parsed.session_id != *requested
956                {
957                    return Err(ErrorKind::Session(SessionErrorKind::SessionIdMismatch {
958                        requested: requested.clone(),
959                        returned: parsed.session_id,
960                    })
961                    .into());
962                }
963                let channels = client.register_session(&parsed.session_id);
964                *stash.lock() = Some((parsed.session_id, channels));
965                Ok(())
966            }))
967        };
968
969        let rpc_start = Instant::now();
970        let result = match self
971            .call_with_inline_callback("session.create", Some(params), inline_callback)
972            .await
973        {
974            Ok(result) => result,
975            Err(error) => {
976                if let Some((id, _channels)) = inline_stash.lock().take() {
977                    self.unregister_session(&id);
978                }
979                return Err(error);
980            }
981        };
982        tracing::debug!(
983            elapsed_ms = rpc_start.elapsed().as_millis(),
984            "Client::create_session session creation request completed successfully"
985        );
986        let create_result: CreateSessionResult = match serde_json::from_value(result) {
987            Ok(result) => result,
988            Err(error) => {
989                if let Some((id, _channels)) = inline_stash.lock().take() {
990                    self.unregister_session(&id);
991                }
992                return Err(error.into());
993            }
994        };
995
996        if let Some(ref requested) = local_session_id
997            && create_result.session_id != *requested
998        {
999            if let Some((id, _channels)) = inline_stash.lock().take() {
1000                self.unregister_session(&id);
1001            }
1002            return Err(ErrorKind::Session(SessionErrorKind::SessionIdMismatch {
1003                requested: requested.clone(),
1004                returned: create_result.session_id.clone(),
1005            })
1006            .into());
1007        }
1008
1009        let (session_id, channels) = inline_stash
1010            .lock()
1011            .take()
1012            .expect("session registration must have populated stash on success");
1013        let event_loop = spawn_event_loop(
1014            session_id.clone(),
1015            self.clone(),
1016            handlers,
1017            hooks,
1018            transforms,
1019            command_handlers,
1020            canvas_handler,
1021            session_fs_provider,
1022            bearer_token_providers,
1023            channels,
1024            idle_waiter.clone(),
1025            capabilities.clone(),
1026            open_canvases.clone(),
1027            event_tx.clone(),
1028            shutdown.clone(),
1029        );
1030        tracing::debug!(
1031            elapsed_ms = setup_start.elapsed().as_millis(),
1032            session_id = %session_id,
1033            tools_count,
1034            commands_count,
1035            has_hooks,
1036            "Client::create_session local setup complete"
1037        );
1038        *capabilities.write() = create_result.capabilities.unwrap_or_default();
1039        if has_mcp_auth_handler {
1040            register_mcp_auth_interest(self, &session_id).await?;
1041        }
1042
1043        tracing::debug!(
1044            elapsed_ms = total_start.elapsed().as_millis(),
1045            session_id = %session_id,
1046            "Client::create_session complete"
1047        );
1048        let session = Session {
1049            id: session_id,
1050            cwd: self.cwd().clone(),
1051            workspace_path: create_result.workspace_path,
1052            remote_url: create_result.remote_url,
1053            client: self.clone(),
1054            event_loop: ParkingLotMutex::new(Some(event_loop)),
1055            shutdown,
1056            idle_waiter,
1057            capabilities,
1058            open_canvases,
1059            event_tx,
1060        };
1061        apply_mode_post_create_patch(
1062            &session,
1063            mode,
1064            opt_skip_custom_instructions,
1065            opt_custom_agents_local_only,
1066            opt_coauthor_enabled,
1067            opt_manage_schedule_enabled,
1068        )
1069        .await?;
1070        Ok(session)
1071    }
1072
1073    /// Resume an existing session on the CLI.
1074    ///
1075    /// Sends `session.resume` and `session.skills.reload`, registers the
1076    /// session on the router, and spawns the event loop.
1077    ///
1078    /// All callbacks (event handler, hooks, transform) are configured
1079    /// via [`ResumeSessionConfig`] using its `with_*` builder methods.
1080    ///
1081    /// See [`Self::create_session`] for the defaults applied when callback
1082    /// fields are unset.
1083    pub async fn resume_session(&self, mut config: ResumeSessionConfig) -> Result<Session, Error> {
1084        let total_start = Instant::now();
1085        let session_id = config.session_id.clone();
1086        if config.hooks_handler.is_some() && config.hooks.is_none() {
1087            config.hooks = Some(true);
1088        }
1089        if let Some(transforms) = config.system_message_transform.clone() {
1090            inject_transform_sections_resume(&mut config, transforms.as_ref());
1091        }
1092        let mode = self.inner.mode;
1093        if mode == crate::ClientMode::Empty && config.available_tools.is_none() {
1094            return Err(Error::with_message(
1095                ErrorKind::InvalidConfig,
1096                "ClientMode::Empty requires available_tools to be set on the session config. \
1097                 Use ToolSet to specify which tools the session may use (e.g. \
1098                 ToolSet::new().add_builtin_many(BUILTIN_TOOLS_ISOLATED)).",
1099            ));
1100        }
1101        crate::mode::validate_tool_filter_list(
1102            "available_tools",
1103            config.available_tools.as_deref(),
1104        )?;
1105        crate::mode::validate_tool_filter_list("excluded_tools", config.excluded_tools.as_deref())?;
1106        config.system_message =
1107            crate::mode::system_message_for_mode(mode, config.system_message.take());
1108        config.memory = crate::mode::memory_for_mode(mode, config.memory.take());
1109        if mode == crate::ClientMode::Empty {
1110            if config.enable_session_telemetry.is_none() {
1111                config.enable_session_telemetry = Some(false);
1112            }
1113            if config.skip_embedding_retrieval.is_none() {
1114                config.skip_embedding_retrieval = Some(true);
1115            }
1116            if config.enable_on_demand_instruction_discovery.is_none() {
1117                config.enable_on_demand_instruction_discovery = Some(false);
1118            }
1119            if config.enable_file_hooks.is_none() {
1120                config.enable_file_hooks = Some(false);
1121            }
1122            if config.enable_host_git_operations.is_none() {
1123                config.enable_host_git_operations = Some(false);
1124            }
1125            if config.enable_session_store.is_none() {
1126                config.enable_session_store = Some(false);
1127            }
1128            if config.enable_skills.is_none() {
1129                config.enable_skills = Some(false);
1130            }
1131        }
1132        if mode == crate::ClientMode::Empty && config.mcp_oauth_token_storage.is_none() {
1133            config.mcp_oauth_token_storage = Some("in-memory".into());
1134        }
1135        if mode == crate::ClientMode::Empty && config.embedding_cache_storage.is_none() {
1136            config.embedding_cache_storage = Some("in-memory".into());
1137        }
1138        let opt_skip_custom_instructions = config.skip_custom_instructions;
1139        let opt_custom_agents_local_only = config.custom_agents_local_only;
1140        let opt_coauthor_enabled = config.coauthor_enabled;
1141        let opt_manage_schedule_enabled = config.manage_schedule_enabled;
1142        let (wire, mut runtime) = config.into_wire()?;
1143
1144        let permission_handler = crate::permission::resolve_handler(
1145            runtime.permission_handler.take(),
1146            runtime.permission_policy.take(),
1147        );
1148        let handlers = SessionHandlers {
1149            permission: permission_handler,
1150            elicitation: runtime.elicitation_handler.take(),
1151            mcp_auth: runtime.mcp_auth_handler.take(),
1152            user_input: runtime.user_input_handler.take(),
1153            exit_plan_mode: runtime.exit_plan_mode_handler.take(),
1154            auto_mode_switch: runtime.auto_mode_switch_handler.take(),
1155            tools: Arc::new(std::mem::take(&mut runtime.tool_handlers)),
1156        };
1157        let hooks = runtime.hooks_handler.take();
1158        let transforms = runtime.system_message_transform.take();
1159        let tools_count = wire.tools.as_ref().map_or(0, Vec::len);
1160        let commands_count = runtime.commands.as_ref().map_or(0, Vec::len);
1161        let has_hooks = hooks.is_some();
1162        let command_handlers = build_command_handler_map(runtime.commands.as_deref());
1163        let canvas_handler = runtime.canvas_handler.take();
1164        let session_fs_provider = runtime.session_fs_provider.take();
1165        let bearer_token_providers = std::mem::take(&mut runtime.bearer_token_providers);
1166        let has_mcp_auth_handler = handlers.mcp_auth.is_some();
1167        if self.inner.session_fs_configured && session_fs_provider.is_none() {
1168            return Err(ErrorKind::Session(SessionErrorKind::SessionFsProviderRequired).into());
1169        }
1170        if self.inner.session_fs_sqlite_declared
1171            && let Some(ref provider) = session_fs_provider
1172            && provider.sqlite().is_none()
1173        {
1174            return Err(Error::with_message(
1175                ErrorKind::InvalidConfig,
1176                "SessionFs capabilities declare SQLite support but the provider \
1177                 does not implement SessionFsSqliteProvider",
1178            ));
1179        }
1180
1181        let mut params = serde_json::to_value(&wire)?;
1182        let trace_ctx = self.resolve_trace_context().await;
1183        inject_trace_context(&mut params, &trace_ctx);
1184        if has_mcp_auth_handler {
1185            register_mcp_auth_interest(self, &session_id).await?;
1186        }
1187
1188        let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default()));
1189        let setup_start = Instant::now();
1190        let channels = self.register_session(&session_id);
1191        let idle_waiter = Arc::new(ParkingLotMutex::new(None));
1192        let open_canvases = Arc::new(parking_lot::RwLock::new(Vec::new()));
1193        let shutdown = CancellationToken::new();
1194        let (event_tx, _) = tokio::sync::broadcast::channel(512);
1195        let event_loop = spawn_event_loop(
1196            session_id.clone(),
1197            self.clone(),
1198            handlers,
1199            hooks,
1200            transforms,
1201            command_handlers,
1202            canvas_handler,
1203            session_fs_provider,
1204            bearer_token_providers,
1205            channels,
1206            idle_waiter.clone(),
1207            capabilities.clone(),
1208            open_canvases.clone(),
1209            event_tx.clone(),
1210            shutdown.clone(),
1211        );
1212        let mut registration =
1213            PendingSessionRegistration::new(self.clone(), session_id.clone(), shutdown.clone());
1214        tracing::debug!(
1215            elapsed_ms = setup_start.elapsed().as_millis(),
1216            session_id = %session_id,
1217            tools_count,
1218            commands_count,
1219            has_hooks,
1220            "Client::resume_session local setup complete"
1221        );
1222
1223        let rpc_start = Instant::now();
1224        let result = match self.call("session.resume", Some(params)).await {
1225            Ok(result) => result,
1226            Err(error) => {
1227                registration.cleanup(event_loop).await;
1228                return Err(error);
1229            }
1230        };
1231        tracing::debug!(
1232            elapsed_ms = rpc_start.elapsed().as_millis(),
1233            session_id = %session_id,
1234            "Client::resume_session session resume request completed successfully"
1235        );
1236
1237        let resume_result: ResumeSessionResult = match serde_json::from_value(result) {
1238            Ok(result) => result,
1239            Err(error) => {
1240                registration.cleanup(event_loop).await;
1241                return Err(error.into());
1242            }
1243        };
1244        let cli_session_id = resume_result
1245            .session_id
1246            .clone()
1247            .unwrap_or_else(|| session_id.clone());
1248        if cli_session_id != session_id {
1249            registration.cleanup(event_loop).await;
1250            return Err(ErrorKind::Session(SessionErrorKind::SessionIdMismatch {
1251                requested: session_id,
1252                returned: cli_session_id,
1253            })
1254            .into());
1255        }
1256
1257        // Reload skills after resume (best-effort).
1258        let skills_reload_start = Instant::now();
1259        if let Err(e) = self
1260            .call(
1261                "session.skills.reload",
1262                Some(serde_json::json!({ "sessionId": session_id })),
1263            )
1264            .await
1265        {
1266            warn!(
1267                elapsed_ms = skills_reload_start.elapsed().as_millis(),
1268                session_id = %session_id,
1269                error = %e,
1270                "Client::resume_session skills reload request failed"
1271            );
1272        } else {
1273            tracing::debug!(
1274                elapsed_ms = skills_reload_start.elapsed().as_millis(),
1275                session_id = %session_id,
1276                "Client::resume_session skills reload request completed successfully"
1277            );
1278        }
1279
1280        *capabilities.write() = resume_result.capabilities.unwrap_or_default();
1281        // Upsert resume snapshots rather than replacing wholesale. Live
1282        // `session.canvas.opened` notifications can arrive on the event loop
1283        // while `session.resume` is in flight; a wholesale replace would
1284        // discard those updates.
1285        {
1286            let mut snapshots = open_canvases.write();
1287            for snapshot in resume_result.open_canvases.unwrap_or_default() {
1288                upsert_open_canvas_snapshot(&mut snapshots, snapshot);
1289            }
1290        }
1291
1292        tracing::debug!(
1293            elapsed_ms = total_start.elapsed().as_millis(),
1294            session_id = %session_id,
1295            "Client::resume_session complete"
1296        );
1297        registration.disarm();
1298        let session = Session {
1299            id: session_id,
1300            cwd: self.cwd().clone(),
1301            workspace_path: resume_result.workspace_path,
1302            remote_url: resume_result.remote_url,
1303            client: self.clone(),
1304            event_loop: ParkingLotMutex::new(Some(event_loop)),
1305            shutdown,
1306            idle_waiter,
1307            capabilities,
1308            open_canvases,
1309            event_tx,
1310        };
1311        apply_mode_post_create_patch(
1312            &session,
1313            mode,
1314            opt_skip_custom_instructions,
1315            opt_custom_agents_local_only,
1316            opt_coauthor_enabled,
1317            opt_manage_schedule_enabled,
1318        )
1319        .await?;
1320        Ok(session)
1321    }
1322}
1323
1324type CommandHandlerMap = HashMap<String, Arc<dyn CommandHandler>>;
1325
1326async fn apply_mode_post_create_patch(
1327    session: &Session,
1328    mode: crate::ClientMode,
1329    opt_skip_custom_instructions: Option<bool>,
1330    opt_custom_agents_local_only: Option<bool>,
1331    opt_coauthor_enabled: Option<bool>,
1332    opt_manage_schedule_enabled: Option<bool>,
1333) -> Result<(), Error> {
1334    use crate::generated::api_types::SessionUpdateOptionsParams;
1335    let mut patch = SessionUpdateOptionsParams::default();
1336    let should_send = if mode == crate::ClientMode::Empty {
1337        patch.skip_custom_instructions = Some(opt_skip_custom_instructions.unwrap_or(true));
1338        patch.custom_agents_local_only = Some(opt_custom_agents_local_only.unwrap_or(true));
1339        patch.coauthor_enabled = Some(opt_coauthor_enabled.unwrap_or(false));
1340        patch.manage_schedule_enabled = Some(opt_manage_schedule_enabled.unwrap_or(false));
1341        patch.installed_plugins = Some(Vec::new());
1342        true
1343    } else {
1344        let mut any = false;
1345        if let Some(v) = opt_skip_custom_instructions {
1346            patch.skip_custom_instructions = Some(v);
1347            any = true;
1348        }
1349        if let Some(v) = opt_custom_agents_local_only {
1350            patch.custom_agents_local_only = Some(v);
1351            any = true;
1352        }
1353        if let Some(v) = opt_coauthor_enabled {
1354            patch.coauthor_enabled = Some(v);
1355            any = true;
1356        }
1357        if let Some(v) = opt_manage_schedule_enabled {
1358            patch.manage_schedule_enabled = Some(v);
1359            any = true;
1360        }
1361        any
1362    };
1363    if !should_send {
1364        return Ok(());
1365    }
1366    if let Err(error) = session.rpc().options().update(patch).await {
1367        let _ = session.disconnect().await;
1368        return Err(error);
1369    }
1370    Ok(())
1371}
1372
1373fn build_command_handler_map(commands: Option<&[CommandDefinition]>) -> Arc<CommandHandlerMap> {
1374    let map = match commands {
1375        Some(commands) => commands
1376            .iter()
1377            .filter(|cmd| !cmd.name.is_empty())
1378            .map(|cmd| (cmd.name.clone(), cmd.handler.clone()))
1379            .collect(),
1380        None => HashMap::new(),
1381    };
1382    Arc::new(map)
1383}
1384
1385fn upsert_open_canvas_snapshot(
1386    snapshots: &mut Vec<OpenCanvasInstance>,
1387    snapshot: OpenCanvasInstance,
1388) {
1389    if let Some(existing) = snapshots
1390        .iter_mut()
1391        .find(|open| open.instance_id == snapshot.instance_id)
1392    {
1393        *existing = snapshot;
1394    } else {
1395        snapshots.push(snapshot);
1396    }
1397}
1398
1399fn remove_open_canvas_snapshot(snapshots: &mut Vec<OpenCanvasInstance>, instance_id: &str) {
1400    snapshots.retain(|open| open.instance_id != instance_id);
1401}
1402
1403#[allow(clippy::too_many_arguments)]
1404fn spawn_event_loop(
1405    session_id: SessionId,
1406    client: Client,
1407    handlers: SessionHandlers,
1408    hooks: Option<Arc<dyn SessionHooks>>,
1409    transforms: Option<Arc<dyn SystemMessageTransform>>,
1410    command_handlers: Arc<CommandHandlerMap>,
1411    canvas_handler: Option<Arc<dyn CanvasHandler>>,
1412    session_fs_provider: Option<Arc<dyn SessionFsProvider>>,
1413    bearer_token_providers: HashMap<String, Arc<dyn BearerTokenProvider>>,
1414    channels: crate::router::SessionChannels,
1415    idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
1416    capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
1417    open_canvases: Arc<parking_lot::RwLock<Vec<OpenCanvasInstance>>>,
1418    event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
1419    shutdown: CancellationToken,
1420) -> JoinHandle<()> {
1421    let crate::router::SessionChannels {
1422        mut notifications,
1423        mut requests,
1424    } = channels;
1425
1426    let span = tracing::error_span!("session_event_loop", session_id = %session_id);
1427    tokio::spawn(
1428        async move {
1429            loop {
1430                // `mpsc::UnboundedReceiver::recv` and
1431                // `CancellationToken::cancelled` are both cancel-safe per
1432                // RFD 400. The selected branch's `await`'d handler is
1433                // *not* mid-cancelled by the select — once a branch fires
1434                // it runs to completion within the loop's iteration.
1435                // Spawned child tasks inside `handle_notification`
1436                // (permission/tool/elicitation callbacks) intentionally
1437                // outlive the parent loop and own their own cleanup;
1438                // this is RFD 400's "spawn background tasks to perform
1439                // cancel-unsafe operations" pattern and is correct as-is.
1440                tokio::select! {
1441                    _ = shutdown.cancelled() => break,
1442                    Some(notification) = notifications.recv() => {
1443                        handle_notification(
1444                            &session_id, &client, &handlers, &command_handlers, notification, &idle_waiter, &capabilities, &open_canvases, &event_tx,
1445                        ).await;
1446                    }
1447                    Some(request) = requests.recv() => {
1448                        let ctx = RequestDispatchContext {
1449                            client: &client,
1450                            handlers: &handlers,
1451                            hooks: hooks.as_deref(),
1452                            transforms: transforms.as_deref(),
1453                            canvas_handler: canvas_handler.as_ref(),
1454                            session_fs_provider: session_fs_provider.as_ref(),
1455                            bearer_token_providers: &bearer_token_providers,
1456                        };
1457                        handle_request(&session_id, ctx, request).await;
1458                    }
1459                    else => break,
1460                }
1461            }
1462            // Channels closed or shutdown signaled — fail any pending
1463            // send_and_wait so the caller observes a clean error.
1464            if let Some(waiter) = idle_waiter.lock().take() {
1465                let _ = waiter
1466                    .tx
1467                    .send(Err(ErrorKind::Session(SessionErrorKind::EventLoopClosed).into()));
1468            }
1469        }
1470        .instrument(span),
1471    )
1472}
1473
1474fn extract_request_id(data: &Value) -> Option<RequestId> {
1475    data.get("requestId")
1476        .and_then(|v| v.as_str())
1477        .filter(|s| !s.is_empty())
1478        .map(RequestId::new)
1479}
1480
1481/// Map a [`PermissionResult`] to the `result` payload sent back to the
1482/// server via `session.permissions.handlePendingPermissionRequest`.
1483///
1484/// Returns `None` when the SDK must not send a response.
1485fn notification_permission_payload(result: &PermissionResult) -> Option<Value> {
1486    match result {
1487        PermissionResult::NoResult => None,
1488        PermissionResult::Decision(decision) => Some(
1489            serde_json::to_value(decision).expect("serializing permission decision should succeed"),
1490        ),
1491    }
1492}
1493
1494async fn register_mcp_auth_interest(client: &Client, session_id: &SessionId) -> Result<(), Error> {
1495    let mut params = serde_json::to_value(RegisterEventInterestParams {
1496        event_type: "mcp.oauth_required".to_string(),
1497    })?;
1498    params["sessionId"] = Value::String(session_id.to_string());
1499    client
1500        .call(rpc_methods::SESSION_EVENTLOG_REGISTERINTEREST, Some(params))
1501        .await?;
1502    Ok(())
1503}
1504
1505fn tool_failure_result(message: impl Into<String>) -> ToolResult {
1506    let message = message.into();
1507    ToolResult::Expanded(ToolResultExpanded {
1508        text_result_for_llm: message.clone(),
1509        result_type: "failure".to_string(),
1510        binary_results_for_llm: None,
1511        session_log: None,
1512        error: Some(message),
1513        tool_telemetry: None,
1514    })
1515}
1516
1517/// Process a notification from the CLI's broadcast channel.
1518#[allow(clippy::too_many_arguments)]
1519async fn handle_notification(
1520    session_id: &SessionId,
1521    client: &Client,
1522    handlers: &SessionHandlers,
1523    command_handlers: &Arc<CommandHandlerMap>,
1524    notification: SessionEventNotification,
1525    idle_waiter: &Arc<ParkingLotMutex<Option<IdleWaiter>>>,
1526    capabilities: &Arc<parking_lot::RwLock<SessionCapabilities>>,
1527    open_canvases: &Arc<parking_lot::RwLock<Vec<OpenCanvasInstance>>>,
1528    event_tx: &tokio::sync::broadcast::Sender<SessionEvent>,
1529) {
1530    let dispatch_start = Instant::now();
1531    let event = notification.event.clone();
1532    let event_type = event.parsed_type();
1533    if event_type == SessionEventType::PermissionRequested {
1534        tracing::debug!(
1535            session_id = %session_id,
1536            event_type = %event.event_type,
1537            "Session::handle_notification permission request received"
1538        );
1539    }
1540
1541    // Signal send_and_wait if active. The lock is only contended when
1542    // a send_and_wait call is in flight (idle_waiter is Some).
1543    match event_type {
1544        SessionEventType::AssistantMessage
1545        | SessionEventType::SessionIdle
1546        | SessionEventType::SessionError => {
1547            let mut guard = idle_waiter.lock();
1548            if let Some(waiter) = guard.as_mut() {
1549                match event_type {
1550                    SessionEventType::AssistantMessage => {
1551                        if !waiter.first_assistant_message_seen {
1552                            waiter.first_assistant_message_seen = true;
1553                            tracing::debug!(
1554                                elapsed_ms = waiter.started_at.elapsed().as_millis(),
1555                                session_id = %session_id,
1556                                "Session::send_and_wait first assistant message"
1557                            );
1558                        }
1559                        waiter.last_assistant_message = Some(event.clone());
1560                    }
1561                    SessionEventType::SessionIdle | SessionEventType::SessionError => {
1562                        if let Some(waiter) = guard.take() {
1563                            if event_type == SessionEventType::SessionIdle {
1564                                tracing::debug!(
1565                                    elapsed_ms = waiter.started_at.elapsed().as_millis(),
1566                                    session_id = %session_id,
1567                                    "Session::send_and_wait idle received"
1568                                );
1569                                let _ = waiter.tx.send(Ok(waiter.last_assistant_message));
1570                            } else {
1571                                let error_msg = event
1572                                    .typed_data::<SessionErrorData>()
1573                                    .map(|d| d.message)
1574                                    .or_else(|| {
1575                                        event
1576                                            .data
1577                                            .get("message")
1578                                            .and_then(|v| v.as_str())
1579                                            .map(|s| s.to_string())
1580                                    })
1581                                    .unwrap_or_else(|| "session error".to_string());
1582                                let _ = waiter.tx.send(Err(Error::with_message(
1583                                    ErrorKind::Session(SessionErrorKind::AgentError),
1584                                    error_msg,
1585                                )));
1586                            }
1587                        }
1588                    }
1589                    _ => {}
1590                }
1591            }
1592        }
1593        _ => {}
1594    }
1595
1596    // Update the snapshot caches BEFORE broadcasting so subscribers that
1597    // call `Session::capabilities()` / `Session::open_canvases()` in
1598    // response to the event observe the new state.
1599    if event_type == SessionEventType::CapabilitiesChanged {
1600        match serde_json::from_value::<SessionCapabilities>(notification.event.data.clone()) {
1601            Ok(changed) => *capabilities.write() = changed,
1602            Err(e) => warn!(error = %e, "failed to deserialize capabilities.changed payload"),
1603        }
1604    }
1605    if event_type == SessionEventType::SessionCanvasOpened {
1606        match serde_json::from_value::<OpenCanvasInstance>(notification.event.data.clone()) {
1607            Ok(open_canvas) => {
1608                upsert_open_canvas_snapshot(&mut open_canvases.write(), open_canvas);
1609            }
1610            Err(e) => warn!(error = %e, "failed to deserialize session.canvas.opened payload"),
1611        }
1612    }
1613    if event_type == SessionEventType::SessionCanvasClosed {
1614        match serde_json::from_value::<SessionCanvasClosedData>(notification.event.data.clone()) {
1615            Ok(closed) => {
1616                if closed.instance_id.is_empty() {
1617                    warn!("failed to deserialize session.canvas.closed payload");
1618                } else {
1619                    remove_open_canvas_snapshot(&mut open_canvases.write(), &closed.instance_id);
1620                }
1621            }
1622            Err(e) => warn!(error = %e, "failed to deserialize session.canvas.closed payload"),
1623        }
1624    }
1625
1626    // Fan out the event to runtime subscribers (`Session::subscribe`). `send`
1627    // only errors when there are no receivers, which is the normal case
1628    // before any consumer subscribes.
1629    let _ = event_tx.send(event.clone());
1630
1631    tracing::debug!(
1632        elapsed_ms = dispatch_start.elapsed().as_millis(),
1633        session_id = %session_id,
1634        event_type = %notification.event.event_type,
1635        "Session::handle_notification dispatch"
1636    );
1637
1638    // Notification-based permission/tool/elicitation requests require a
1639    // separate RPC callback. Spawn concurrently since the CLI doesn't block.
1640    match event_type {
1641        SessionEventType::PermissionRequested => {
1642            let Some(request_id) = extract_request_id(&notification.event.data) else {
1643                return;
1644            };
1645            // Honor the runtime's `resolvedByHook` signal — when the
1646            // server has already resolved the permission via a hook,
1647            // clients must not send a second response.
1648            if notification
1649                .event
1650                .data
1651                .get("resolvedByHook")
1652                .and_then(|v| v.as_bool())
1653                .unwrap_or(false)
1654            {
1655                return;
1656            }
1657            // Multi-client safety: if this client has no permission
1658            // handler installed, don't respond — another client on the
1659            // same CLI may handle it.
1660            let Some(permission_handler) = handlers.permission.clone() else {
1661                return;
1662            };
1663            let client = client.clone();
1664            let sid = session_id.clone();
1665            let data: PermissionRequestData =
1666                serde_json::from_value(notification.event.data.clone()).unwrap_or_else(|_| {
1667                    PermissionRequestData {
1668                        kind: None,
1669                        tool_call_id: None,
1670                        extra: notification.event.data.clone(),
1671                    }
1672                });
1673            let span = tracing::error_span!(
1674                "permission_request_handler",
1675                session_id = %sid,
1676                request_id = %request_id
1677            );
1678            tokio::spawn(
1679                async move {
1680                    let handler_start = Instant::now();
1681                    let result = permission_handler
1682                        .handle(sid.clone(), request_id.clone(), data)
1683                        .await;
1684                    tracing::debug!(
1685                        elapsed_ms = handler_start.elapsed().as_millis(),
1686                        session_id = %sid,
1687                        request_id = %request_id,
1688                        "PermissionHandler::handle dispatch"
1689                    );
1690                    let Some(result_value) = notification_permission_payload(&result) else {
1691                        // Handler returned Deferred / NoResult — it will
1692                        // call handlePendingPermissionRequest itself (or
1693                        // leave the request unanswered).
1694                        return;
1695                    };
1696                    let rpc_start = Instant::now();
1697                    let _ = client
1698                        .call(
1699                            "session.permissions.handlePendingPermissionRequest",
1700                            Some(serde_json::json!({
1701                                "sessionId": sid,
1702                                "requestId": request_id,
1703                                "result": result_value,
1704                            })),
1705                        )
1706                        .await;
1707                    tracing::debug!(
1708                        elapsed_ms = rpc_start.elapsed().as_millis(),
1709                        session_id = %sid,
1710                        request_id = %request_id,
1711                        "Session::handle_notification response sent successfully"
1712                    );
1713                }
1714                .instrument(span),
1715            );
1716        }
1717        SessionEventType::ExternalToolRequested => {
1718            let Some(request_id) = extract_request_id(&notification.event.data) else {
1719                return;
1720            };
1721            let data: ExternalToolRequestedData =
1722                match serde_json::from_value(notification.event.data.clone()) {
1723                    Ok(d) => d,
1724                    Err(e) => {
1725                        warn!(error = %e, "failed to deserialize external_tool.requested");
1726                        let client = client.clone();
1727                        let sid = session_id.clone();
1728                        let span = tracing::error_span!(
1729                            "external_tool_deserialize_error",
1730                            session_id = %sid,
1731                            request_id = %request_id
1732                        );
1733                        tokio::spawn(
1734                            async move {
1735                                let rpc_start = Instant::now();
1736                                let _ = client
1737                                .call(
1738                                    "session.tools.handlePendingToolCall",
1739                                    Some(serde_json::json!({
1740                                        "sessionId": sid,
1741                                        "requestId": request_id,
1742                                        "error": format!("Failed to deserialize tool request: {e}"),
1743                                    })),
1744                                )
1745                                .await;
1746                                tracing::debug!(
1747                                    elapsed_ms = rpc_start.elapsed().as_millis(),
1748                                    session_id = %sid,
1749                                    request_id = %request_id,
1750                                    "Session::handle_notification response sent successfully"
1751                                );
1752                            }
1753                            .instrument(span),
1754                        );
1755                        return;
1756                    }
1757                };
1758            // Multi-client safety: look up a handler for the requested
1759            // tool name. If this client has no handler installed for that
1760            // tool, don't respond — another connected client may have one.
1761            let tool_handler = if data.tool_name.is_empty() {
1762                None
1763            } else {
1764                handlers.tools.get(&data.tool_name).cloned()
1765            };
1766            let Some(tool_handler) = tool_handler else {
1767                return;
1768            };
1769            let client = client.clone();
1770            let sid = session_id.clone();
1771            let span = tracing::error_span!(
1772                "external_tool_handler",
1773                session_id = %sid,
1774                request_id = %request_id
1775            );
1776            tokio::spawn(
1777                async move {
1778                    // `tool_name.is_empty()` would have produced a `None`
1779                    // lookup in `handlers.tools` and short-circuited at the
1780                    // outer guard above, so only the tool_call_id check is
1781                    // reachable here.
1782                    if data.tool_call_id.is_empty() {
1783                        let error_msg = "Missing toolCallId";
1784                        let rpc_start = Instant::now();
1785                        let _ = client
1786                            .call(
1787                                "session.tools.handlePendingToolCall",
1788                                Some(serde_json::json!({
1789                                    "sessionId": sid,
1790                                    "requestId": request_id,
1791                                    "error": error_msg,
1792                                })),
1793                            )
1794                            .await;
1795                        tracing::debug!(
1796                            elapsed_ms = rpc_start.elapsed().as_millis(),
1797                            session_id = %sid,
1798                            request_id = %request_id,
1799                            "Session::handle_notification response sent successfully"
1800                        );
1801                        return;
1802                    }
1803                    let tool_call_id = data.tool_call_id.clone();
1804                    let tool_name = data.tool_name.clone();
1805                    let invocation = ToolInvocation {
1806                        session_id: sid.clone(),
1807                        tool_call_id: data.tool_call_id,
1808                        tool_name: data.tool_name,
1809                        arguments: data
1810                            .arguments
1811                            .unwrap_or(Value::Object(serde_json::Map::new())),
1812                        traceparent: data.traceparent,
1813                        tracestate: data.tracestate,
1814                    };
1815                    let handler_start = Instant::now();
1816                    let tool_result = match tool_handler.call(invocation).await {
1817                        Ok(r) => r,
1818                        Err(e) => tool_failure_result(e.to_string()),
1819                    };
1820                    tracing::debug!(
1821                        elapsed_ms = handler_start.elapsed().as_millis(),
1822                        session_id = %sid,
1823                        request_id = %request_id,
1824                        tool_call_id = %tool_call_id,
1825                        tool_name = %tool_name,
1826                        "ToolHandler::call dispatch"
1827                    );
1828                    let result_value = serde_json::to_value(tool_result).unwrap_or(Value::Null);
1829                    let rpc_start = Instant::now();
1830                    let _ = client
1831                        .call(
1832                            "session.tools.handlePendingToolCall",
1833                            Some(serde_json::json!({
1834                                "sessionId": sid,
1835                                "requestId": request_id,
1836                                "result": result_value,
1837                            })),
1838                        )
1839                        .await;
1840                    tracing::debug!(
1841                        elapsed_ms = rpc_start.elapsed().as_millis(),
1842                        session_id = %sid,
1843                        request_id = %request_id,
1844                        tool_call_id = %tool_call_id,
1845                        tool_name = %tool_name,
1846                        "Session::handle_notification response sent successfully"
1847                    );
1848                }
1849                .instrument(span),
1850            );
1851        }
1852        SessionEventType::UserInputRequested => {
1853            // Notification-only signal for observers (UI, telemetry).
1854            // The CLI follows up with a `userInput.request` JSON-RPC call
1855            // that drives the `UserInputHandler` dispatch — handling
1856            // the notification here too would double-fire the handler
1857            // and produce duplicate prompts on the consumer side. See
1858            // github/github-app#4249.
1859        }
1860        SessionEventType::ElicitationRequested => {
1861            let Some(request_id) = extract_request_id(&notification.event.data) else {
1862                return;
1863            };
1864            // Multi-client safety: if this client has no elicitation
1865            // handler installed, don't respond — another client on the
1866            // same CLI may handle it.
1867            let Some(elicitation_handler) = handlers.elicitation.clone() else {
1868                return;
1869            };
1870            let elicitation_data: ElicitationRequestedData =
1871                match serde_json::from_value(notification.event.data.clone()) {
1872                    Ok(d) => d,
1873                    Err(e) => {
1874                        warn!(error = %e, "failed to deserialize elicitation request");
1875                        return;
1876                    }
1877                };
1878            let request = ElicitationRequest {
1879                message: elicitation_data.message,
1880                requested_schema: elicitation_data
1881                    .requested_schema
1882                    .map(|s| serde_json::to_value(s).unwrap_or(Value::Null)),
1883                mode: elicitation_data.mode.map(|m| match m {
1884                    crate::generated::session_events::ElicitationRequestedMode::Form => {
1885                        crate::types::ElicitationMode::Form
1886                    }
1887                    crate::generated::session_events::ElicitationRequestedMode::Url => {
1888                        crate::types::ElicitationMode::Url
1889                    }
1890                    _ => crate::types::ElicitationMode::Unknown,
1891                }),
1892                elicitation_source: elicitation_data.elicitation_source,
1893                url: elicitation_data.url,
1894            };
1895            let client = client.clone();
1896            let sid = session_id.clone();
1897            let span = tracing::error_span!(
1898                "elicitation_request_handler",
1899                session_id = %sid,
1900                request_id = %request_id
1901            );
1902            tokio::spawn(
1903                async move {
1904                    let cancel = ElicitationResult {
1905                        action: "cancel".to_string(),
1906                        content: None,
1907                    };
1908                    // Dispatch to a nested task so panics are caught as JoinErrors.
1909                    let handler_task = tokio::spawn({
1910                        let sid = sid.clone();
1911                        let request_id = request_id.clone();
1912                        let span = tracing::error_span!(
1913                            "elicitation_callback",
1914                            session_id = %sid,
1915                            request_id = %request_id
1916                        );
1917                        async move {
1918                            let handler_start = Instant::now();
1919                            let response = elicitation_handler
1920                                .handle(sid.clone(), request_id.clone(), request)
1921                                .await;
1922                            tracing::debug!(
1923                                elapsed_ms = handler_start.elapsed().as_millis(),
1924                                session_id = %sid,
1925                                request_id = %request_id,
1926                                "ElicitationHandler::handle dispatch"
1927                            );
1928                            response
1929                        }
1930                        .instrument(span)
1931                    });
1932                    let result = match handler_task.await {
1933                        Ok(r) => r,
1934                        Err(_) => cancel.clone(),
1935                    };
1936                    let rpc_start = Instant::now();
1937                    if let Err(e) = client
1938                        .call(
1939                            "session.ui.handlePendingElicitation",
1940                            Some(serde_json::json!({
1941                                "sessionId": sid,
1942                                "requestId": request_id,
1943                                "result": result,
1944                            })),
1945                        )
1946                        .await
1947                    {
1948                        // RPC failed — attempt cancel as last resort
1949                        warn!(error = %e, "handlePendingElicitation failed, sending cancel");
1950                        let _ = client
1951                            .call(
1952                                "session.ui.handlePendingElicitation",
1953                                Some(serde_json::json!({
1954                                    "sessionId": sid,
1955                                    "requestId": request_id,
1956                                    "result": cancel,
1957                                })),
1958                            )
1959                            .await;
1960                    } else {
1961                        tracing::debug!(
1962                            elapsed_ms = rpc_start.elapsed().as_millis(),
1963                            session_id = %sid,
1964                            request_id = %request_id,
1965                            "Session::handle_notification response sent successfully"
1966                        );
1967                    }
1968                }
1969                .instrument(span),
1970            );
1971        }
1972        SessionEventType::McpOauthRequired => {
1973            let Some(request_id) = extract_request_id(&notification.event.data) else {
1974                return;
1975            };
1976            let Some(mcp_auth_handler) = handlers.mcp_auth.clone() else {
1977                warn!(
1978                    session_id = %session_id,
1979                    request_id = %request_id,
1980                    "received MCP OAuth request without a registered MCP auth handler"
1981                );
1982                return;
1983            };
1984            let data: McpOauthRequiredData =
1985                match serde_json::from_value(notification.event.data.clone()) {
1986                    Ok(d) => d,
1987                    Err(e) => {
1988                        warn!(error = %e, "failed to deserialize MCP OAuth request");
1989                        return;
1990                    }
1991                };
1992            let request = McpAuthRequest {
1993                request_id: request_id.clone(),
1994                server_name: data.server_name,
1995                server_url: data.server_url,
1996                reason: data.reason,
1997                www_authenticate_params: data.www_authenticate_params,
1998                resource_metadata: data.resource_metadata,
1999                static_client_config: data.static_client_config,
2000            };
2001            let client = client.clone();
2002            let sid = session_id.clone();
2003            let span = tracing::error_span!(
2004                "mcp_auth_request_handler",
2005                session_id = %sid,
2006                request_id = %request_id
2007            );
2008            tokio::spawn(
2009                async move {
2010                    let cancel = McpAuthResult::Cancelled;
2011                    let handler_task = tokio::spawn({
2012                        let sid = sid.clone();
2013                        let request_id = request_id.clone();
2014                        let span = tracing::error_span!(
2015                            "mcp_auth_callback",
2016                            session_id = %sid,
2017                            request_id = %request_id
2018                        );
2019                        async move {
2020                            let handler_start = Instant::now();
2021                            let response = mcp_auth_handler
2022                                .handle(sid.clone(), request_id.clone(), request)
2023                                .await;
2024                            tracing::debug!(
2025                                elapsed_ms = handler_start.elapsed().as_millis(),
2026                                session_id = %sid,
2027                                request_id = %request_id,
2028                                "McpAuthHandler::handle dispatch"
2029                            );
2030                            response
2031                        }
2032                        .instrument(span)
2033                    });
2034                    let result = match handler_task.await {
2035                        Ok(result) => result,
2036                        Err(_) => cancel,
2037                    };
2038                    let rpc_start = Instant::now();
2039                    let _ = client
2040                        .call(
2041                            "session.mcp.oauth.handlePendingRequest",
2042                            Some(serde_json::json!({
2043                                "sessionId": sid,
2044                                "requestId": request_id,
2045                                "result": result.into_wire(),
2046                            })),
2047                        )
2048                        .await;
2049                    tracing::debug!(
2050                        elapsed_ms = rpc_start.elapsed().as_millis(),
2051                        "Session::handle_notification MCP auth response sent"
2052                    );
2053                }
2054                .instrument(span),
2055            );
2056        }
2057        SessionEventType::CommandExecute => {
2058            let data: CommandExecuteData =
2059                match serde_json::from_value(notification.event.data.clone()) {
2060                    Ok(d) => d,
2061                    Err(e) => {
2062                        warn!(error = %e, "failed to deserialize command.execute");
2063                        return;
2064                    }
2065                };
2066            let client = client.clone();
2067            let command_handlers = command_handlers.clone();
2068            let sid = session_id.clone();
2069            let span = tracing::error_span!("command_handler", session_id = %sid);
2070            tokio::spawn(
2071                async move {
2072                    let request_id = data.request_id;
2073                    let ack_error = match command_handlers.get(&data.command_name).cloned() {
2074                        None => Some(format!("Unknown command: {}", data.command_name)),
2075                        Some(handler) => {
2076                            let command_name = data.command_name.clone();
2077                            let ctx = CommandContext {
2078                                session_id: sid.clone(),
2079                                command: data.command,
2080                                command_name: data.command_name,
2081                                args: data.args,
2082                            };
2083                            let handler_start = Instant::now();
2084                            let result = handler.on_command(ctx).await;
2085                            tracing::debug!(
2086                                elapsed_ms = handler_start.elapsed().as_millis(),
2087                                session_id = %sid,
2088                                request_id = %request_id,
2089                                command_name = %command_name,
2090                                "CommandHandler::call dispatch"
2091                            );
2092                            match result {
2093                                Ok(()) => None,
2094                                Err(e) => Some(e.to_string()),
2095                            }
2096                        }
2097                    };
2098                    let mut params = serde_json::json!({
2099                        "sessionId": sid,
2100                        "requestId": request_id,
2101                    });
2102                    if let Some(error_msg) = ack_error {
2103                        params["error"] = serde_json::Value::String(error_msg);
2104                    }
2105                    let rpc_start = Instant::now();
2106                    let _ = client
2107                        .call("session.commands.handlePendingCommand", Some(params))
2108                        .await;
2109                    tracing::debug!(
2110                        elapsed_ms = rpc_start.elapsed().as_millis(),
2111                        session_id = %sid,
2112                        request_id = %request_id,
2113                        "Session::handle_notification response sent successfully"
2114                    );
2115                }
2116                .instrument(span),
2117            );
2118        }
2119        _ => {}
2120    }
2121}
2122
2123struct RequestDispatchContext<'a> {
2124    client: &'a Client,
2125    handlers: &'a SessionHandlers,
2126    hooks: Option<&'a dyn SessionHooks>,
2127    transforms: Option<&'a dyn SystemMessageTransform>,
2128    canvas_handler: Option<&'a Arc<dyn CanvasHandler>>,
2129    session_fs_provider: Option<&'a Arc<dyn SessionFsProvider>>,
2130    bearer_token_providers: &'a HashMap<String, Arc<dyn BearerTokenProvider>>,
2131}
2132
2133/// Process a JSON-RPC request from the CLI.
2134async fn handle_request(
2135    session_id: &SessionId,
2136    ctx: RequestDispatchContext<'_>,
2137    request: crate::JsonRpcRequest,
2138) {
2139    let sid = session_id.clone();
2140    let client = ctx.client;
2141    let handlers = ctx.handlers;
2142    let hooks = ctx.hooks;
2143    let transforms = ctx.transforms;
2144    let canvas_handler = ctx.canvas_handler;
2145    let session_fs_provider = ctx.session_fs_provider;
2146    let bearer_token_providers = ctx.bearer_token_providers;
2147
2148    if request.method.starts_with("sessionFs.") {
2149        crate::session_fs_dispatch::dispatch(client, session_fs_provider, request).await;
2150        return;
2151    }
2152
2153    if request.method.starts_with("canvas.") {
2154        crate::canvas_dispatch::dispatch(client, canvas_handler, request).await;
2155        return;
2156    }
2157
2158    if request.method == crate::generated::api_types::rpc_methods::PROVIDERTOKEN_GETTOKEN {
2159        crate::provider_token_dispatch::dispatch(client, bearer_token_providers, request).await;
2160        return;
2161    }
2162
2163    match request.method.as_str() {
2164        "hooks.invoke" => {
2165            let params = request.params.as_ref();
2166            let hook_type = params
2167                .and_then(|p| p.get("hookType"))
2168                .and_then(|v| v.as_str())
2169                .unwrap_or("");
2170            let input = params
2171                .and_then(|p| p.get("input"))
2172                .cloned()
2173                .unwrap_or(Value::Object(Default::default()));
2174
2175            let rpc_result = if let Some(hooks) = hooks {
2176                match crate::hooks::dispatch_hook(hooks, &sid, hook_type, input).await {
2177                    Ok(output) => output,
2178                    Err(e) => {
2179                        warn!(error = %e, hook_type = hook_type, "hook dispatch failed");
2180                        serde_json::json!({ "output": {} })
2181                    }
2182                }
2183            } else {
2184                serde_json::json!({ "output": {} })
2185            };
2186
2187            let rpc_response = JsonRpcResponse {
2188                jsonrpc: "2.0".to_string(),
2189                id: request.id,
2190                result: Some(rpc_result),
2191                error: None,
2192            };
2193            let _ = client.send_response(&rpc_response).await;
2194        }
2195
2196        "userInput.request" => {
2197            let params = request.params.as_ref();
2198            let Some(question) = params
2199                .and_then(|p| p.get("question"))
2200                .and_then(|v| v.as_str())
2201            else {
2202                warn!("userInput.request missing 'question' field");
2203                let rpc_response = JsonRpcResponse {
2204                    jsonrpc: "2.0".to_string(),
2205                    id: request.id,
2206                    result: None,
2207                    error: Some(crate::JsonRpcError {
2208                        code: error_codes::INVALID_PARAMS,
2209                        message: "missing required field: question".to_string(),
2210                        data: None,
2211                    }),
2212                };
2213                let _ = client.send_response(&rpc_response).await;
2214                return;
2215            };
2216            let question = question.to_string();
2217            let choices = params
2218                .and_then(|p| p.get("choices"))
2219                .and_then(|v| v.as_array())
2220                .map(|arr| {
2221                    arr.iter()
2222                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
2223                        .collect()
2224                });
2225            let allow_freeform = params
2226                .and_then(|p| p.get("allowFreeform"))
2227                .and_then(|v| v.as_bool());
2228
2229            let handler_start = Instant::now();
2230            let response = if let Some(user_input_handler) = handlers.user_input.as_ref() {
2231                user_input_handler
2232                    .handle(sid.clone(), question, choices, allow_freeform)
2233                    .await
2234            } else {
2235                None
2236            };
2237            tracing::debug!(
2238                elapsed_ms = handler_start.elapsed().as_millis(),
2239                session_id = %sid,
2240                "UserInputHandler::handle dispatch"
2241            );
2242
2243            let rpc_result = match response {
2244                Some(UserInputResponse {
2245                    answer,
2246                    was_freeform,
2247                }) => serde_json::json!({
2248                    "answer": answer,
2249                    "wasFreeform": was_freeform,
2250                }),
2251                None => serde_json::json!({ "noResponse": true }),
2252            };
2253            let rpc_response = JsonRpcResponse {
2254                jsonrpc: "2.0".to_string(),
2255                id: request.id,
2256                result: Some(rpc_result),
2257                error: None,
2258            };
2259            let _ = client.send_response(&rpc_response).await;
2260        }
2261
2262        "exitPlanMode.request" => {
2263            let params = request
2264                .params
2265                .as_ref()
2266                .cloned()
2267                .unwrap_or(Value::Object(serde_json::Map::new()));
2268            let data: ExitPlanModeData = match serde_json::from_value(params) {
2269                Ok(d) => d,
2270                Err(e) => {
2271                    warn!(error = %e, "failed to deserialize exitPlanMode.request params, using defaults");
2272                    ExitPlanModeData::default()
2273                }
2274            };
2275
2276            let rpc_result = if let Some(exit_plan_handler) = handlers.exit_plan_mode.as_ref() {
2277                let result = exit_plan_handler.handle(sid, data).await;
2278                serde_json::to_value(result).expect("ExitPlanModeResult serialization cannot fail")
2279            } else {
2280                serde_json::json!({ "approved": true })
2281            };
2282            let rpc_response = JsonRpcResponse {
2283                jsonrpc: "2.0".to_string(),
2284                id: request.id,
2285                result: Some(rpc_result),
2286                error: None,
2287            };
2288            let _ = client.send_response(&rpc_response).await;
2289        }
2290
2291        "autoModeSwitch.request" => {
2292            let error_code = request
2293                .params
2294                .as_ref()
2295                .and_then(|p| p.get("errorCode"))
2296                .and_then(|v| v.as_str())
2297                .map(|s| s.to_string());
2298            let retry_after_seconds = request
2299                .params
2300                .as_ref()
2301                .and_then(|p| p.get("retryAfterSeconds"))
2302                .and_then(|v| v.as_f64());
2303
2304            let answer = if let Some(auto_mode_handler) = handlers.auto_mode_switch.as_ref() {
2305                auto_mode_handler
2306                    .handle(sid, error_code, retry_after_seconds)
2307                    .await
2308            } else {
2309                AutoModeSwitchResponse::No
2310            };
2311            let rpc_response = JsonRpcResponse {
2312                jsonrpc: "2.0".to_string(),
2313                id: request.id,
2314                result: Some(serde_json::json!({ "response": answer })),
2315                error: None,
2316            };
2317            let _ = client.send_response(&rpc_response).await;
2318        }
2319
2320        "systemMessage.transform" => {
2321            let params = request.params.as_ref();
2322            let sections: HashMap<String, crate::transforms::TransformSection> =
2323                match params.and_then(|p| p.get("sections")) {
2324                    Some(v) => match serde_json::from_value(v.clone()) {
2325                        Ok(s) => s,
2326                        Err(e) => {
2327                            let _ = send_error_response(
2328                                client,
2329                                request.id,
2330                                error_codes::INVALID_PARAMS,
2331                                &format!("invalid sections: {e}"),
2332                            )
2333                            .await;
2334                            return;
2335                        }
2336                    },
2337                    None => {
2338                        let _ = send_error_response(
2339                            client,
2340                            request.id,
2341                            error_codes::INVALID_PARAMS,
2342                            "missing sections parameter",
2343                        )
2344                        .await;
2345                        return;
2346                    }
2347                };
2348
2349            let rpc_result = if let Some(transforms) = transforms {
2350                let transform_start = Instant::now();
2351                let response =
2352                    crate::transforms::dispatch_transform(transforms, &sid, sections).await;
2353                tracing::debug!(
2354                    elapsed_ms = transform_start.elapsed().as_millis(),
2355                    session_id = %sid,
2356                    "SystemMessageTransform::transform_section dispatch"
2357                );
2358                match serde_json::to_value(response) {
2359                    Ok(v) => v,
2360                    Err(e) => {
2361                        warn!(error = %e, "failed to serialize transform response");
2362                        serde_json::json!({ "sections": {} })
2363                    }
2364                }
2365            } else {
2366                // No transforms registered — pass through all sections unchanged.
2367                let passthrough: HashMap<String, crate::transforms::TransformSection> = sections;
2368                serde_json::json!({ "sections": passthrough })
2369            };
2370
2371            let rpc_response = JsonRpcResponse {
2372                jsonrpc: "2.0".to_string(),
2373                id: request.id,
2374                result: Some(rpc_result),
2375                error: None,
2376            };
2377            let _ = client.send_response(&rpc_response).await;
2378        }
2379
2380        method => {
2381            warn!(
2382                method = method,
2383                "unhandled request method in session event loop"
2384            );
2385            let _ = send_error_response(
2386                client,
2387                request.id,
2388                error_codes::METHOD_NOT_FOUND,
2389                &format!("unknown method: {method}"),
2390            )
2391            .await;
2392        }
2393    }
2394}
2395
2396async fn send_error_response(
2397    client: &Client,
2398    id: u64,
2399    code: i32,
2400    message: &str,
2401) -> Result<(), Error> {
2402    let response = JsonRpcResponse {
2403        jsonrpc: "2.0".to_string(),
2404        id,
2405        result: None,
2406        error: Some(crate::JsonRpcError {
2407            code,
2408            message: message.to_string(),
2409            data: None,
2410        }),
2411    };
2412    client.send_response(&response).await
2413}
2414
2415/// Inject `action: "transform"` sections into a `SystemMessageConfig`,
2416/// forcing `mode: "customize"` (required by the CLI for transforms to fire).
2417/// Preserves any existing caller-provided section overrides.
2418fn apply_transform_sections(
2419    sys_msg: &mut SystemMessageConfig,
2420    transforms: &dyn SystemMessageTransform,
2421) {
2422    sys_msg.mode = Some("customize".to_string());
2423    let sections = sys_msg.sections.get_or_insert_with(HashMap::new);
2424    for id in transforms.section_ids() {
2425        sections.entry(id).or_insert_with(|| SectionOverride {
2426            action: Some("transform".to_string()),
2427            content: None,
2428        });
2429    }
2430}
2431
2432fn inject_transform_sections(config: &mut SessionConfig, transforms: &dyn SystemMessageTransform) {
2433    let sys_msg = config.system_message.get_or_insert_with(Default::default);
2434    apply_transform_sections(sys_msg, transforms);
2435}
2436
2437fn inject_transform_sections_resume(
2438    config: &mut ResumeSessionConfig,
2439    transforms: &dyn SystemMessageTransform,
2440) {
2441    let sys_msg = config.system_message.get_or_insert_with(Default::default);
2442    apply_transform_sections(sys_msg, transforms);
2443}
2444
2445#[cfg(test)]
2446mod tests {
2447    use serde_json::json;
2448
2449    use super::notification_permission_payload;
2450    use crate::handler::PermissionResult;
2451
2452    #[test]
2453    fn notification_payload_suppresses_no_result() {
2454        assert!(notification_permission_payload(&PermissionResult::NoResult).is_none());
2455    }
2456
2457    #[test]
2458    fn notification_payload_serializes_decisions() {
2459        assert_eq!(
2460            notification_permission_payload(&PermissionResult::approve_once()),
2461            Some(json!({ "kind": "approve-once" }))
2462        );
2463        assert_eq!(
2464            notification_permission_payload(&PermissionResult::reject(None)),
2465            Some(json!({ "kind": "reject" }))
2466        );
2467        assert_eq!(
2468            notification_permission_payload(&PermissionResult::reject(Some("bad".to_string()))),
2469            Some(json!({ "kind": "reject", "feedback": "bad" }))
2470        );
2471        assert_eq!(
2472            notification_permission_payload(&PermissionResult::user_not_available()),
2473            Some(json!({ "kind": "user-not-available" }))
2474        );
2475    }
2476}