Skip to main content

github_copilot_sdk/
session.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use parking_lot::Mutex as ParkingLotMutex;
7use serde_json::Value;
8use tokio::sync::oneshot;
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tracing::{Instrument, warn};
12
13use crate::canvas::CanvasHandler;
14use crate::generated::api_types::{LogRequest, ModelSwitchToRequest, OpenCanvasInstance};
15use crate::generated::session_events::{
16    CommandExecuteData, ElicitationRequestedData, ExternalToolRequestedData,
17    SessionCanvasClosedData, SessionErrorData, SessionEventType,
18};
19use crate::handler::{
20    AutoModeSwitchHandler, AutoModeSwitchResponse, ElicitationHandler, ExitPlanModeHandler,
21    PermissionHandler, PermissionResult, UserInputHandler, UserInputResponse,
22};
23use crate::hooks::SessionHooks;
24use crate::provider_token::BearerTokenProvider;
25use crate::session_fs::SessionFsProvider;
26use crate::trace_context::inject_trace_context;
27use crate::transforms::SystemMessageTransform;
28use crate::types::{
29    CommandContext, CommandDefinition, CommandHandler, CreateSessionResult, ElicitationRequest,
30    ElicitationResult, ExitPlanModeData, GetMessagesResponse, MessageOptions,
31    PermissionRequestData, RequestId, ResumeSessionConfig, ResumeSessionResult, SectionOverride,
32    SessionCapabilities, SessionConfig, SessionEvent, SessionId, SetModelOptions,
33    SystemMessageConfig, ToolInvocation, ToolResult, ToolResultExpanded, TraceContext,
34    UiInputOptions, ensure_attachment_display_names,
35};
36use crate::{
37    Client, Error, ErrorKind, JsonRpcResponse, SessionErrorKind, SessionEventNotification,
38    error_codes,
39};
40
41/// Bundle of the per-session callbacks the SDK dispatches to. Built from a
42/// [`SessionConfig`] / [`ResumeSessionConfig`] at
43/// [`Client::create_session`] / [`Client::resume_session`] time. Each
44/// field is `None` (or an empty map for tools) when the caller didn't
45/// install a handler -- in that case the SDK skips dispatch for that
46/// event type. The wire flags on `session.create` / `session.resume`
47/// are derived from these fields.
48#[derive(Clone)]
49pub(crate) struct SessionHandlers {
50    pub permission: Option<Arc<dyn PermissionHandler>>,
51    pub elicitation: Option<Arc<dyn ElicitationHandler>>,
52    pub user_input: Option<Arc<dyn UserInputHandler>>,
53    pub exit_plan_mode: Option<Arc<dyn ExitPlanModeHandler>>,
54    pub auto_mode_switch: Option<Arc<dyn AutoModeSwitchHandler>>,
55    pub tools: Arc<HashMap<String, Arc<dyn crate::tool::ToolHandler>>>,
56}
57
58/// Shared state between a [`Session`] and its event loop, used by [`Session::send_and_wait`].
59struct IdleWaiter {
60    tx: oneshot::Sender<Result<Option<SessionEvent>, Error>>,
61    last_assistant_message: Option<SessionEvent>,
62    started_at: Instant,
63    first_assistant_message_seen: bool,
64}
65
66/// RAII guard that clears the [`Session::idle_waiter`] slot on drop. Used
67/// by [`Session::send_and_wait`] to ensure the slot doesn't leak if the
68/// caller's future is cancelled (outer `tokio::time::timeout` / `select!`
69/// / dropped JoinHandle). Synchronous clear via `parking_lot::Mutex` —
70/// no async drop needed.
71///
72/// Without this, an outer cancellation between "install waiter" and
73/// "drain channel" would leave the slot occupied, causing all subsequent
74/// `send` and `send_and_wait` calls on the session to return
75/// [`SendWhileWaiting`](SessionErrorKind::SendWhileWaiting). Closes RFD-400
76/// review finding #2.
77struct WaiterGuard {
78    slot: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
79}
80
81impl Drop for WaiterGuard {
82    fn drop(&mut self) {
83        self.slot.lock().take();
84    }
85}
86
87struct PendingSessionRegistration {
88    client: Client,
89    session_id: SessionId,
90    shutdown: CancellationToken,
91    disarmed: bool,
92}
93
94impl PendingSessionRegistration {
95    fn new(client: Client, session_id: SessionId, shutdown: CancellationToken) -> Self {
96        Self {
97            client,
98            session_id,
99            shutdown,
100            disarmed: false,
101        }
102    }
103
104    async fn cleanup(mut self, event_loop: JoinHandle<()>) {
105        self.shutdown.cancel();
106        let _ = event_loop.await;
107        self.client.unregister_session(&self.session_id);
108        self.disarmed = true;
109    }
110
111    fn disarm(&mut self) {
112        self.disarmed = true;
113    }
114}
115
116impl Drop for PendingSessionRegistration {
117    fn drop(&mut self) {
118        if !self.disarmed {
119            self.shutdown.cancel();
120            self.client.unregister_session(&self.session_id);
121        }
122    }
123}
124
125/// A session on a GitHub Copilot CLI server.
126///
127/// Created via [`Client::create_session`] or [`Client::resume_session`].
128/// Owns an internal event loop that dispatches events to the per-callback
129/// handlers installed on the session config.
130///
131/// Protocol methods (`send`, `get_events`, `abort`, etc.) automatically
132/// inject the session ID into RPC params.
133///
134/// Call [`destroy`](Self::destroy) for graceful cleanup (RPC + local). If dropped
135/// without calling `destroy`, the `Drop` impl aborts the event loop and
136/// unregisters from the router as a best-effort safety net.
137pub struct Session {
138    id: SessionId,
139    cwd: PathBuf,
140    workspace_path: Option<PathBuf>,
141    remote_url: Option<String>,
142    client: Client,
143    /// Handle to the spawned event-loop task. Sync `parking_lot::Mutex`
144    /// because the lock is never held across an `.await` and the `Drop`
145    /// impl needs to take the handle synchronously without `try_lock`
146    /// fallibility.
147    event_loop: ParkingLotMutex<Option<JoinHandle<()>>>,
148    /// Cooperative shutdown signal for the event loop. The loop selects
149    /// on [`shutdown.cancelled()`](CancellationToken::cancelled) alongside
150    /// its inbound channels; [`Session::stop_event_loop`] and [`Drop`]
151    /// both call [`cancel()`](CancellationToken::cancel) to ask the loop
152    /// to exit between iterations rather than aborting the task (which
153    /// can land at any await point and leave the session mid-protocol).
154    /// See RFD-400 review finding #3.
155    ///
156    /// `CancellationToken` is the canonical signalling primitive in
157    /// `tokio_util`; it is what `tonic` uses for the equivalent task-
158    /// coordination case. Advanced consumers can obtain a child token
159    /// via [`Session::cancellation_token`] to bind their own work to
160    /// the session lifetime.
161    shutdown: CancellationToken,
162    /// Only populated while a `send_and_wait` call is in flight.
163    ///
164    /// Sync `parking_lot::Mutex` because the lock is never held across an
165    /// `.await`, and synchronous access lets the `WaiterGuard` RAII helper
166    /// in `send_and_wait` clear the slot from a `Drop` impl on caller-side
167    /// cancellation. See RFD-400 review (cancel-safety hardening).
168    idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
169    /// Capabilities negotiated with the CLI, updated on `capabilities.changed` events.
170    capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
171    /// Canvas instances currently known to be open for this session.
172    open_canvases: Arc<parking_lot::RwLock<Vec<OpenCanvasInstance>>>,
173    /// Broadcast channel for runtime event subscribers — see [`Session::subscribe`].
174    event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
175}
176
177impl Session {
178    /// Session ID assigned by the CLI.
179    pub fn id(&self) -> &SessionId {
180        &self.id
181    }
182
183    /// Working directory of the CLI process.
184    pub fn cwd(&self) -> &PathBuf {
185        &self.cwd
186    }
187
188    /// Workspace directory for the session (if using infinite sessions).
189    pub fn workspace_path(&self) -> Option<&Path> {
190        self.workspace_path.as_deref()
191    }
192
193    /// Remote session URL, if the session is running remotely.
194    pub fn remote_url(&self) -> Option<&str> {
195        self.remote_url.as_deref()
196    }
197
198    /// Session capabilities negotiated with the CLI.
199    ///
200    /// Capabilities are set during session creation and updated at runtime
201    /// via `capabilities.changed` events.
202    pub fn capabilities(&self) -> SessionCapabilities {
203        self.capabilities.read().clone()
204    }
205
206    /// Open canvas instances reported by the most recent `session.resume`
207    /// response or surfaced by inbound `canvas.opened` events.
208    pub fn open_canvases(&self) -> Vec<OpenCanvasInstance> {
209        self.open_canvases.read().clone()
210    }
211
212    /// Returns a [`CancellationToken`] that fires when this session shuts
213    /// down (via [`Session::stop_event_loop`], [`Session::destroy`], or
214    /// [`Drop`]).
215    ///
216    /// Use this to bind an external task's lifetime to the session — when
217    /// the session shuts down, awaiting [`cancelled()`](CancellationToken::cancelled)
218    /// resolves so cooperative consumers can stop cleanly.
219    ///
220    /// The returned handle is a *child* token: calling
221    /// [`cancel()`](CancellationToken::cancel) on it cancels only the
222    /// caller's child, not the session itself. To cancel the session, call
223    /// [`Session::stop_event_loop`].
224    ///
225    /// # Example
226    ///
227    /// ```no_run
228    /// # async fn example(session: github_copilot_sdk::session::Session) {
229    /// let token = session.cancellation_token();
230    /// tokio::select! {
231    ///     _ = token.cancelled() => println!("session shut down"),
232    ///     _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
233    ///         println!("60s elapsed, session still alive");
234    ///     }
235    /// }
236    /// # }
237    /// ```
238    pub fn cancellation_token(&self) -> CancellationToken {
239        self.shutdown.child_token()
240    }
241
242    /// Subscribe to events for this session.
243    ///
244    /// Returns an [`EventSubscription`](crate::subscription::EventSubscription)
245    /// that yields every [`SessionEvent`] dispatched on this session's
246    /// event loop. Drop the value to unsubscribe; there is no separate
247    /// cancel handle.
248    ///
249    /// **Observe-only.** Subscribers receive a clone of every
250    /// [`SessionEvent`] but cannot influence permission decisions, tool
251    /// results, or anything else that requires returning a value. Those
252    /// remain the responsibility of the per-callback handlers passed via
253    /// [`SessionConfig`]'s `with_*_handler`
254    /// builder methods.
255    ///
256    /// The returned handle implements both an inherent
257    /// [`recv`](crate::subscription::EventSubscription::recv) method and
258    /// [`Stream`](tokio_stream::Stream), so callers can use a `while let`
259    /// loop or any combinator from `tokio_stream::StreamExt` /
260    /// `futures::StreamExt`.
261    ///
262    /// Each subscriber maintains its own queue. If a consumer cannot keep
263    /// up, the oldest events are dropped and `recv` returns
264    /// [`RecvErrorKind::Lagged`](crate::subscription::RecvErrorKind::Lagged)
265    /// reporting the count of skipped events. Slow consumers do not block
266    /// the session's event loop.
267    ///
268    /// # Example
269    ///
270    /// ```no_run
271    /// # async fn example(session: github_copilot_sdk::session::Session) {
272    /// let mut events = session.subscribe();
273    /// tokio::spawn(async move {
274    ///     while let Ok(event) = events.recv().await {
275    ///         println!("[{}] event {}", event.id, event.event_type);
276    ///     }
277    /// });
278    /// # }
279    /// ```
280    pub fn subscribe(&self) -> crate::subscription::EventSubscription {
281        crate::subscription::EventSubscription::new(self.event_tx.subscribe())
282    }
283
284    /// The underlying Client (for advanced use cases).
285    pub fn client(&self) -> &Client {
286        &self.client
287    }
288
289    /// Typed RPC namespace for this session.
290    ///
291    /// Every protocol method lives here under its schema-aligned path —
292    /// e.g. `session.rpc().workspaces().list_files()`. Wire method names
293    /// and request/response types are generated from the protocol schema,
294    /// so the typed namespace can't drift from the wire contract.
295    ///
296    /// The hand-authored helpers on [`Session`] delegate to this namespace
297    /// and remain the recommended entry point for everyday use; reach for
298    /// `rpc()` when you want a method without a hand-written wrapper.
299    pub fn rpc(&self) -> crate::generated::rpc::SessionRpc<'_> {
300        crate::generated::rpc::SessionRpc { session: self }
301    }
302
303    /// Stop the internal event loop. Called automatically on [`destroy`](Self::destroy).
304    ///
305    /// Cooperative: signals shutdown via the session's [`CancellationToken`]
306    /// and awaits the loop's natural exit rather than aborting the task.
307    /// Any in-flight handler (permission callback, tool call, elicitation
308    /// response) completes before the loop exits, so the CLI never sees a
309    /// half-handled request. See RFD-400 review finding #3.
310    pub async fn stop_event_loop(&self) {
311        self.shutdown.cancel();
312        let handle = self.event_loop.lock().take();
313        if let Some(handle) = handle {
314            let _ = handle.await;
315        }
316        // Fail any pending send_and_wait so it returns immediately.
317        if let Some(waiter) = self.idle_waiter.lock().take() {
318            let _ = waiter.tx.send(Err(
319                ErrorKind::Session(SessionErrorKind::EventLoopClosed).into()
320            ));
321        }
322    }
323
324    /// Send a user message to the agent.
325    ///
326    /// Accepts anything convertible to [`MessageOptions`] — pass a `&str` for the
327    /// trivial case, or build a `MessageOptions` for mode/attachments. The
328    /// `wait_timeout` field on `MessageOptions` is ignored here (use
329    /// [`send_and_wait`](Self::send_and_wait) if you need to wait).
330    ///
331    /// Returns the assigned message ID, which can be used to correlate the
332    /// send with later [`SessionEvent`]s emitted in
333    /// response (assistant messages, tool requests, etc.).
334    ///
335    /// Returns an error if a [`send_and_wait`](Self::send_and_wait) call is
336    /// currently in flight, since the plain send would race with the waiter.
337    ///
338    /// # Cancel safety
339    ///
340    /// **Cancel-safe.** The underlying `session.send` RPC is dispatched
341    /// through the writer-actor (see [`Client::call`](crate::Client::call)),
342    /// so dropping this future after the actor has committed to writing
343    /// will not produce a partial frame on the wire. If the caller's
344    /// future is dropped between "frame enqueued" and "response received",
345    /// the message has already landed on the wire — the agent will process
346    /// it and emit events normally; the caller just won't see the returned
347    /// message ID.
348    pub async fn send(&self, opts: impl Into<MessageOptions>) -> Result<String, Error> {
349        if self.idle_waiter.lock().is_some() {
350            return Err(ErrorKind::Session(SessionErrorKind::SendWhileWaiting).into());
351        }
352        self.send_inner(opts.into()).await
353    }
354
355    async fn send_inner(&self, opts: MessageOptions) -> Result<String, Error> {
356        let mut params = serde_json::json!({
357            "sessionId": self.id,
358            "prompt": opts.prompt,
359        });
360        if let Some(m) = opts.mode {
361            params["mode"] = serde_json::to_value(m)?;
362        }
363        if let Some(am) = opts.agent_mode {
364            params["agentMode"] = serde_json::to_value(am)?;
365        }
366        if let Some(mut a) = opts.attachments {
367            ensure_attachment_display_names(&mut a);
368            params["attachments"] = serde_json::to_value(a)?;
369        }
370        if let Some(headers) = opts.request_headers
371            && !headers.is_empty()
372        {
373            params["requestHeaders"] = serde_json::to_value(headers)?;
374        }
375        if let Some(display_prompt) = opts.display_prompt {
376            params["displayPrompt"] = serde_json::to_value(display_prompt)?;
377        }
378        let trace_ctx = if opts.traceparent.is_some() || opts.tracestate.is_some() {
379            TraceContext {
380                traceparent: opts.traceparent,
381                tracestate: opts.tracestate,
382            }
383        } else {
384            self.client.resolve_trace_context().await
385        };
386        inject_trace_context(&mut params, &trace_ctx);
387        let rpc_start = Instant::now();
388        let result = self.client.call("session.send", Some(params)).await?;
389        let message_id = result
390            .get("messageId")
391            .and_then(|v| v.as_str())
392            .map(|s| s.to_string())
393            .unwrap_or_default();
394        tracing::debug!(
395            elapsed_ms = rpc_start.elapsed().as_millis(),
396            session_id = %self.id,
397            message_id = %message_id,
398            "Session::send completed successfully"
399        );
400        Ok(message_id)
401    }
402
403    /// Send a user message and wait for the agent to finish processing.
404    ///
405    /// Accepts anything convertible to [`MessageOptions`] — pass a `&str` for the
406    /// trivial case, or build a `MessageOptions` for mode/attachments/timeout.
407    /// Blocks until `session.idle` (success) or `session.error` (failure),
408    /// returning the last `assistant.message` event captured during streaming.
409    /// Times out after `MessageOptions::wait_timeout` (default 60 seconds).
410    ///
411    /// Only one `send_and_wait` call may be active per session at a time.
412    /// Calling [`send`](Self::send) while a `send_and_wait`
413    /// is in flight will also return an error.
414    ///
415    /// # Cancel safety
416    ///
417    /// **Cancel-safe.** A `WaiterGuard` clears the in-flight slot on every
418    /// exit path (success, internal failure, internal timeout, *and*
419    /// external cancellation via `tokio::time::timeout` / `select!` /
420    /// dropped JoinHandle). Subsequent `send` and `send_and_wait` calls on
421    /// this session will succeed normally — the slot is never leaked.
422    pub async fn send_and_wait(
423        &self,
424        opts: impl Into<MessageOptions>,
425    ) -> Result<Option<SessionEvent>, Error> {
426        let total_start = Instant::now();
427        let opts = opts.into();
428        let timeout_duration = opts.wait_timeout.unwrap_or(Duration::from_secs(60));
429        let (tx, rx) = oneshot::channel();
430
431        {
432            let mut guard = self.idle_waiter.lock();
433            if guard.is_some() {
434                return Err(ErrorKind::Session(SessionErrorKind::SendWhileWaiting).into());
435            }
436            *guard = Some(IdleWaiter {
437                tx,
438                last_assistant_message: None,
439                started_at: total_start,
440                first_assistant_message_seen: false,
441            });
442        }
443
444        // RAII: clears the idle_waiter slot on every exit path, including
445        // external cancellation (caller's outer `select!` / `timeout` /
446        // dropped future). Without this, an outer cancellation would leak
447        // the slot and brick subsequent `send`/`send_and_wait` calls.
448        let _waiter_guard = WaiterGuard {
449            slot: self.idle_waiter.clone(),
450        };
451
452        let result = tokio::time::timeout(timeout_duration, async {
453            self.send_inner(opts).await?;
454            match rx.await {
455                Ok(result) => result,
456                Err(_) => Err(ErrorKind::Session(SessionErrorKind::EventLoopClosed).into()),
457            }
458        })
459        .await;
460
461        match result {
462            Ok(inner) => {
463                tracing::debug!(
464                    elapsed_ms = total_start.elapsed().as_millis(),
465                    session_id = %self.id,
466                    completed_by = if inner.is_ok() { "idle" } else { "error" },
467                    "Session::send_and_wait complete"
468                );
469                inner
470            }
471            Err(_) => {
472                tracing::warn!(
473                    elapsed_ms = total_start.elapsed().as_millis(),
474                    session_id = %self.id,
475                    completed_by = "timeout",
476                    "Session::send_and_wait failed"
477                );
478                Err(ErrorKind::Session(SessionErrorKind::Timeout(timeout_duration)).into())
479            }
480        }
481    }
482
483    /// Retrieve the session's timeline events.
484    pub async fn get_events(&self) -> Result<Vec<SessionEvent>, Error> {
485        let result = self
486            .client
487            .call(
488                "session.getMessages",
489                Some(serde_json::json!({ "sessionId": self.id })),
490            )
491            .await?;
492        let response: GetMessagesResponse = serde_json::from_value(result)?;
493        Ok(response.events)
494    }
495
496    /// Deprecated alias for [`get_events`](Self::get_events).
497    #[deprecated(since = "0.1.0", note = "Use `get_events()` instead")]
498    pub async fn get_messages(&self) -> Result<Vec<SessionEvent>, Error> {
499        self.get_events().await
500    }
501
502    /// Abort the current agent turn.
503    ///
504    /// # Cancel safety
505    ///
506    /// **Cancel-safe.** Single `session.abort` RPC; the underlying
507    /// [`Client::call`](crate::Client::call) is cancel-safe via the
508    /// writer-actor.
509    pub async fn abort(&self) -> Result<(), Error> {
510        self.client
511            .call(
512                "session.abort",
513                Some(serde_json::json!({ "sessionId": self.id })),
514            )
515            .await?;
516        Ok(())
517    }
518
519    /// Switch to a different model.
520    ///
521    /// Pass `None` for `opts` if no extra configuration is needed.
522    pub async fn set_model(&self, model: &str, opts: Option<SetModelOptions>) -> Result<(), Error> {
523        let opts = opts.unwrap_or_default();
524        let request = ModelSwitchToRequest {
525            model_id: model.to_string(),
526            reasoning_effort: opts.reasoning_effort,
527            reasoning_summary: opts.reasoning_summary,
528            context_tier: opts.context_tier,
529            model_capabilities: opts.model_capabilities,
530        };
531        self.rpc().model().switch_to(request).await?;
532        Ok(())
533    }
534
535    /// Disconnect this session from the CLI.
536    ///
537    /// Sends the `session.destroy` RPC, stops the event loop, and unregisters
538    /// the session from the client. **Session state on disk** (conversation
539    /// history, planning state, artifacts) is **preserved**, so the
540    /// conversation can be resumed later via [`Client::resume_session`]
541    /// using this session's ID. To permanently remove all on-disk session
542    /// data, use [`Client::delete_session`] instead.
543    ///
544    /// The caller should ensure the session is idle (e.g. [`send_and_wait`]
545    /// has returned) before disconnecting; in-flight tool or event handlers
546    /// may otherwise observe failures.
547    ///
548    /// [`Client::resume_session`]: crate::Client::resume_session
549    /// [`Client::delete_session`]: crate::Client::delete_session
550    /// [`send_and_wait`]: Self::send_and_wait
551    pub async fn disconnect(&self) -> Result<(), Error> {
552        self.client
553            .call(
554                "session.destroy",
555                Some(serde_json::json!({ "sessionId": self.id })),
556            )
557            .await?;
558        self.stop_event_loop().await;
559        self.client.unregister_session(&self.id);
560        Ok(())
561    }
562
563    /// Deprecated alias for [`disconnect`](Self::disconnect). The
564    /// underlying wire RPC happens to be named `session.destroy`, but it
565    /// only severs the connection — on-disk session state is preserved.
566    /// Prefer `disconnect` in new code.
567    #[deprecated(since = "0.1.0", note = "Use `disconnect()` instead")]
568    pub async fn destroy(&self) -> Result<(), Error> {
569        self.disconnect().await
570    }
571
572    /// Write a log message to the session.
573    ///
574    /// Pass `None` for `opts` to use defaults (info level, persisted).
575    pub async fn log(
576        &self,
577        message: &str,
578        opts: Option<crate::types::LogOptions>,
579    ) -> Result<(), Error> {
580        let opts = opts.unwrap_or_default();
581        let level = match opts.level {
582            Some(level) => Some(serde_json::from_value(serde_json::to_value(level)?)?),
583            None => None,
584        };
585        let request = LogRequest {
586            message: message.to_string(),
587            level,
588            ephemeral: opts.ephemeral,
589            r#type: None,
590            tip: None,
591            url: None,
592        };
593        self.rpc().log(request).await?;
594        Ok(())
595    }
596
597    /// Returns the UI sub-API for elicitation, confirmation, selection, and
598    /// free-form input.
599    ///
600    /// All UI methods route through `session.ui.*` RPCs and require host
601    /// support — check `session.capabilities().ui.elicitation` before use.
602    pub fn ui(&self) -> SessionUi<'_> {
603        SessionUi { session: self }
604    }
605
606    /// Returns an error if the host doesn't support elicitation.
607    fn assert_elicitation(&self) -> Result<(), Error> {
608        if self
609            .capabilities
610            .read()
611            .ui
612            .as_ref()
613            .and_then(|u| u.elicitation)
614            != Some(true)
615        {
616            return Err(ErrorKind::Session(SessionErrorKind::ElicitationNotSupported).into());
617        }
618        Ok(())
619    }
620}
621
622impl Drop for Session {
623    fn drop(&mut self) {
624        // Cooperative shutdown: cancel the event loop's token to signal
625        // exit between iterations. The loop will see the cancellation on
626        // its next select poll and break cleanly without interrupting an
627        // in-flight handler. We do NOT abort the JoinHandle — that would
628        // land at any await point in the loop body, potentially leaving
629        // the CLI with an unanswered request id. RFD-400 review finding
630        // #3.
631        //
632        // The handle itself is left in `event_loop` to be reaped by the
633        // tokio runtime when it next polls; we intentionally don't await
634        // it here because Drop is sync.
635        self.shutdown.cancel();
636        self.client.unregister_session(&self.id);
637    }
638}
639
640/// UI sub-API for a [`Session`] — elicitation, confirmation, selection,
641/// and free-form input.
642///
643/// Acquired via [`Session::ui`]. Methods route to `session.ui.*` RPCs and
644/// require host elicitation support — check
645/// `session.capabilities().ui.elicitation` before use.
646pub struct SessionUi<'a> {
647    session: &'a Session,
648}
649
650impl<'a> SessionUi<'a> {
651    /// Request user input via an interactive UI form (elicitation).
652    ///
653    /// Sends a JSON Schema describing form fields to the CLI host. The host
654    /// renders a form dialog and returns the user's response.
655    ///
656    /// Prefer the typed convenience methods [`confirm`](Self::confirm),
657    /// [`select`](Self::select), and [`input`](Self::input) for common cases.
658    pub async fn elicitation(
659        &self,
660        message: &str,
661        schema: Value,
662    ) -> Result<ElicitationResult, Error> {
663        self.session.assert_elicitation()?;
664        let result = self
665            .session
666            .client
667            .call(
668                "session.ui.elicitation",
669                Some(serde_json::json!({
670                    "sessionId": self.session.id,
671                    "message": message,
672                    "requestedSchema": schema,
673                })),
674            )
675            .await?;
676        let elicitation: ElicitationResult = serde_json::from_value(result)?;
677        Ok(elicitation)
678    }
679
680    /// Ask the user a yes/no confirmation question.
681    ///
682    /// Returns `true` if the user accepted and confirmed, `false` otherwise.
683    pub async fn confirm(&self, message: &str) -> Result<bool, Error> {
684        self.session.assert_elicitation()?;
685        let schema = serde_json::json!({
686            "type": "object",
687            "properties": {
688                "confirmed": {
689                    "type": "boolean",
690                    "default": true,
691                }
692            },
693            "required": ["confirmed"]
694        });
695        let result = self.elicitation(message, schema).await?;
696        Ok(result.action == "accept"
697            && result
698                .content
699                .and_then(|c| c.get("confirmed").and_then(|v| v.as_bool()))
700                == Some(true))
701    }
702
703    /// Ask the user to select from a list of options.
704    ///
705    /// Returns the selected option string on accept, or `None` on decline/cancel.
706    pub async fn select(&self, message: &str, options: &[&str]) -> Result<Option<String>, Error> {
707        self.session.assert_elicitation()?;
708        let schema = serde_json::json!({
709            "type": "object",
710            "properties": {
711                "selection": {
712                    "type": "string",
713                    "enum": options,
714                }
715            },
716            "required": ["selection"]
717        });
718        let result = self.elicitation(message, schema).await?;
719        if result.action != "accept" {
720            return Ok(None);
721        }
722        let selection = result.content.and_then(|c| {
723            c.get("selection")
724                .and_then(|v| v.as_str())
725                .map(String::from)
726        });
727        Ok(selection)
728    }
729
730    /// Ask the user for free-form text input.
731    ///
732    /// Returns the input string on accept, or `None` on decline/cancel.
733    /// Use [`UiInputOptions`] to set validation constraints and field metadata.
734    pub async fn input(
735        &self,
736        message: &str,
737        options: Option<&UiInputOptions<'_>>,
738    ) -> Result<Option<String>, Error> {
739        self.session.assert_elicitation()?;
740        let mut field = serde_json::json!({ "type": "string" });
741        if let Some(opts) = options {
742            if let Some(title) = opts.title {
743                field["title"] = Value::String(title.to_string());
744            }
745            if let Some(desc) = opts.description {
746                field["description"] = Value::String(desc.to_string());
747            }
748            if let Some(min) = opts.min_length {
749                field["minLength"] = Value::Number(min.into());
750            }
751            if let Some(max) = opts.max_length {
752                field["maxLength"] = Value::Number(max.into());
753            }
754            if let Some(fmt) = &opts.format {
755                field["format"] = Value::String(fmt.as_str().to_string());
756            }
757            if let Some(default) = opts.default {
758                field["default"] = Value::String(default.to_string());
759            }
760        }
761        let schema = serde_json::json!({
762            "type": "object",
763            "properties": { "value": field },
764            "required": ["value"]
765        });
766        let result = self.elicitation(message, schema).await?;
767        if result.action != "accept" {
768            return Ok(None);
769        }
770        let value = result
771            .content
772            .and_then(|c| c.get("value").and_then(|v| v.as_str()).map(String::from));
773        Ok(value)
774    }
775}
776
777impl Client {
778    /// Create a new session on the CLI.
779    ///
780    /// Sends `session.create`, registers the session on the router,
781    /// and spawns an internal event loop that dispatches to the handler.
782    ///
783    /// All callbacks (per-event handlers, tool handlers, hooks, transform)
784    /// are configured via [`SessionConfig`] using its `with_*_handler` /
785    /// `with_tools` / `with_hooks` / `with_system_message_transform` builder
786    /// methods.
787    ///
788    /// If [`hooks_handler`](SessionConfig::hooks_handler) is set, the
789    /// wire-level `hooks` flag is automatically enabled.
790    ///
791    /// If [`system_message_transform`](SessionConfig::system_message_transform) is set, the SDK injects
792    /// `action: "transform"` sections into the [`SystemMessageConfig`] wire
793    /// format and handles `systemMessage.transform` RPC callbacks during
794    /// the session.
795    ///
796    /// Each per-event handler is independently optional. If a handler is
797    /// not installed, the SDK signals the runtime not to emit the matching
798    /// broadcast (and silently skips dispatch if one arrives anyway).
799    pub async fn create_session(&self, mut config: SessionConfig) -> Result<Session, Error> {
800        let total_start = Instant::now();
801        // For cloud sessions, let the CLI/server assign the session id and
802        // register the session lazily once the response arrives. For non-cloud
803        // sessions we generate the id client-side (when the caller didn't
804        // supply one) so the session can be registered BEFORE the RPC — the
805        // CLI may issue session-scoped requests (e.g. sessionFs.writeFile for
806        // workspace metadata) during session.create processing, before it has
807        // sent the response.
808        let caller_session_id = config.session_id.clone();
809        let use_server_generated_id = config.cloud.is_some() && caller_session_id.is_none();
810        let local_session_id: Option<SessionId> = if use_server_generated_id {
811            None
812        } else {
813            Some(
814                caller_session_id
815                    .clone()
816                    .unwrap_or_else(|| SessionId::new(uuid::Uuid::new_v4().to_string())),
817            )
818        };
819        if config.hooks_handler.is_some() && config.hooks.is_none() {
820            config.hooks = Some(true);
821        }
822        if let Some(transforms) = config.system_message_transform.clone() {
823            inject_transform_sections(&mut config, transforms.as_ref());
824        }
825        let mode = self.inner.mode;
826        if mode == crate::ClientMode::Empty && config.available_tools.is_none() {
827            return Err(Error::with_message(
828                ErrorKind::InvalidConfig,
829                "ClientMode::Empty requires available_tools to be set on the session config. \
830                 Use ToolSet to specify which tools the session may use (e.g. \
831                 ToolSet::new().add_builtin_many(BUILTIN_TOOLS_ISOLATED)).",
832            ));
833        }
834        crate::mode::validate_tool_filter_list(
835            "available_tools",
836            config.available_tools.as_deref(),
837        )?;
838        crate::mode::validate_tool_filter_list("excluded_tools", config.excluded_tools.as_deref())?;
839        config.system_message =
840            crate::mode::system_message_for_mode(mode, config.system_message.take());
841        config.memory = crate::mode::memory_for_mode(mode, config.memory.take());
842        if mode == crate::ClientMode::Empty {
843            if config.enable_session_telemetry.is_none() {
844                config.enable_session_telemetry = Some(false);
845            }
846            if config.skip_embedding_retrieval.is_none() {
847                config.skip_embedding_retrieval = Some(true);
848            }
849            if config.enable_on_demand_instruction_discovery.is_none() {
850                config.enable_on_demand_instruction_discovery = Some(false);
851            }
852            if config.enable_file_hooks.is_none() {
853                config.enable_file_hooks = Some(false);
854            }
855            if config.enable_host_git_operations.is_none() {
856                config.enable_host_git_operations = Some(false);
857            }
858            if config.enable_session_store.is_none() {
859                config.enable_session_store = Some(false);
860            }
861            if config.enable_skills.is_none() {
862                config.enable_skills = Some(false);
863            }
864        }
865        if mode == crate::ClientMode::Empty && config.mcp_oauth_token_storage.is_none() {
866            config.mcp_oauth_token_storage = Some("in-memory".into());
867        }
868        if mode == crate::ClientMode::Empty && config.embedding_cache_storage.is_none() {
869            config.embedding_cache_storage = Some("in-memory".into());
870        }
871        let opt_skip_custom_instructions = config.skip_custom_instructions;
872        let opt_custom_agents_local_only = config.custom_agents_local_only;
873        let opt_coauthor_enabled = config.coauthor_enabled;
874        let opt_manage_schedule_enabled = config.manage_schedule_enabled;
875        let (wire, mut runtime) = config.into_wire(local_session_id.clone())?;
876
877        let permission_handler = crate::permission::resolve_handler(
878            runtime.permission_handler.take(),
879            runtime.permission_policy.take(),
880        );
881        let handlers = SessionHandlers {
882            permission: permission_handler,
883            elicitation: runtime.elicitation_handler.take(),
884            user_input: runtime.user_input_handler.take(),
885            exit_plan_mode: runtime.exit_plan_mode_handler.take(),
886            auto_mode_switch: runtime.auto_mode_switch_handler.take(),
887            tools: Arc::new(std::mem::take(&mut runtime.tool_handlers)),
888        };
889        let hooks = runtime.hooks_handler.take();
890        let transforms = runtime.system_message_transform.take();
891        let tools_count = wire.tools.as_ref().map_or(0, Vec::len);
892        let commands_count = runtime.commands.as_ref().map_or(0, Vec::len);
893        let has_hooks = hooks.is_some();
894        let command_handlers = build_command_handler_map(runtime.commands.as_deref());
895        let canvas_handler = runtime.canvas_handler.take();
896        let session_fs_provider = runtime.session_fs_provider.take();
897        let bearer_token_providers = std::mem::take(&mut runtime.bearer_token_providers);
898        if self.inner.session_fs_configured && session_fs_provider.is_none() {
899            return Err(ErrorKind::Session(SessionErrorKind::SessionFsProviderRequired).into());
900        }
901        if self.inner.session_fs_sqlite_declared
902            && let Some(ref provider) = session_fs_provider
903            && provider.sqlite().is_none()
904        {
905            return Err(Error::with_message(
906                ErrorKind::InvalidConfig,
907                "SessionFs capabilities declare SQLite support but the provider \
908                 does not implement SessionFsSqliteProvider",
909            ));
910        }
911
912        let mut params = serde_json::to_value(&wire)?;
913        let trace_ctx = self.resolve_trace_context().await;
914        inject_trace_context(&mut params, &trace_ctx);
915
916        let setup_start = Instant::now();
917        let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default()));
918        let idle_waiter = Arc::new(ParkingLotMutex::new(None));
919        let open_canvases = Arc::new(parking_lot::RwLock::new(Vec::new()));
920        let shutdown = CancellationToken::new();
921        let (event_tx, _) = tokio::sync::broadcast::channel(512);
922
923        // For cloud sessions (use_server_generated_id), defer session
924        // registration to the inline callback so the read task registers
925        // the session synchronously the instant the response arrives.
926        // For non-cloud sessions, register up-front so the CLI can issue
927        // session-scoped requests during session.create processing.
928        let inline_stash: Arc<
929            ParkingLotMutex<Option<(SessionId, crate::router::SessionChannels)>>,
930        > = Arc::new(ParkingLotMutex::new(None));
931
932        let inline_callback: Option<crate::jsonrpc::InlineResponseCallback> = if let Some(ref sid) =
933            local_session_id
934        {
935            let channels = self.register_session(sid);
936            *inline_stash.lock() = Some((sid.clone(), channels));
937            None
938        } else {
939            let client = self.clone();
940            let stash = inline_stash.clone();
941            let expected = caller_session_id.clone();
942            Some(Box::new(move |response| {
943                let result = response.result.as_ref().ok_or_else(|| {
944                    Error::with_message(ErrorKind::Json, "session.create response had no result")
945                })?;
946                let parsed: CreateSessionResult =
947                    serde_json::from_value(result.clone()).map_err(Error::from)?;
948                if let Some(requested) = expected.as_ref()
949                    && parsed.session_id != *requested
950                {
951                    return Err(ErrorKind::Session(SessionErrorKind::SessionIdMismatch {
952                        requested: requested.clone(),
953                        returned: parsed.session_id,
954                    })
955                    .into());
956                }
957                let channels = client.register_session(&parsed.session_id);
958                *stash.lock() = Some((parsed.session_id, channels));
959                Ok(())
960            }))
961        };
962
963        let rpc_start = Instant::now();
964        let result = match self
965            .call_with_inline_callback("session.create", Some(params), inline_callback)
966            .await
967        {
968            Ok(result) => result,
969            Err(error) => {
970                if let Some((id, _channels)) = inline_stash.lock().take() {
971                    self.unregister_session(&id);
972                }
973                return Err(error);
974            }
975        };
976        tracing::debug!(
977            elapsed_ms = rpc_start.elapsed().as_millis(),
978            "Client::create_session session creation request completed successfully"
979        );
980        let create_result: CreateSessionResult = match serde_json::from_value(result) {
981            Ok(result) => result,
982            Err(error) => {
983                if let Some((id, _channels)) = inline_stash.lock().take() {
984                    self.unregister_session(&id);
985                }
986                return Err(error.into());
987            }
988        };
989
990        if let Some(ref requested) = local_session_id
991            && create_result.session_id != *requested
992        {
993            if let Some((id, _channels)) = inline_stash.lock().take() {
994                self.unregister_session(&id);
995            }
996            return Err(ErrorKind::Session(SessionErrorKind::SessionIdMismatch {
997                requested: requested.clone(),
998                returned: create_result.session_id.clone(),
999            })
1000            .into());
1001        }
1002
1003        let (session_id, channels) = inline_stash
1004            .lock()
1005            .take()
1006            .expect("session registration must have populated stash on success");
1007        let event_loop = spawn_event_loop(
1008            session_id.clone(),
1009            self.clone(),
1010            handlers,
1011            hooks,
1012            transforms,
1013            command_handlers,
1014            canvas_handler,
1015            session_fs_provider,
1016            bearer_token_providers,
1017            channels,
1018            idle_waiter.clone(),
1019            capabilities.clone(),
1020            open_canvases.clone(),
1021            event_tx.clone(),
1022            shutdown.clone(),
1023        );
1024        tracing::debug!(
1025            elapsed_ms = setup_start.elapsed().as_millis(),
1026            session_id = %session_id,
1027            tools_count,
1028            commands_count,
1029            has_hooks,
1030            "Client::create_session local setup complete"
1031        );
1032        *capabilities.write() = create_result.capabilities.unwrap_or_default();
1033
1034        tracing::debug!(
1035            elapsed_ms = total_start.elapsed().as_millis(),
1036            session_id = %session_id,
1037            "Client::create_session complete"
1038        );
1039        let session = Session {
1040            id: session_id,
1041            cwd: self.cwd().clone(),
1042            workspace_path: create_result.workspace_path,
1043            remote_url: create_result.remote_url,
1044            client: self.clone(),
1045            event_loop: ParkingLotMutex::new(Some(event_loop)),
1046            shutdown,
1047            idle_waiter,
1048            capabilities,
1049            open_canvases,
1050            event_tx,
1051        };
1052        apply_mode_post_create_patch(
1053            &session,
1054            mode,
1055            opt_skip_custom_instructions,
1056            opt_custom_agents_local_only,
1057            opt_coauthor_enabled,
1058            opt_manage_schedule_enabled,
1059        )
1060        .await?;
1061        Ok(session)
1062    }
1063
1064    /// Resume an existing session on the CLI.
1065    ///
1066    /// Sends `session.resume` and `session.skills.reload`, registers the
1067    /// session on the router, and spawns the event loop.
1068    ///
1069    /// All callbacks (event handler, hooks, transform) are configured
1070    /// via [`ResumeSessionConfig`] using its `with_*` builder methods.
1071    ///
1072    /// See [`Self::create_session`] for the defaults applied when callback
1073    /// fields are unset.
1074    pub async fn resume_session(&self, mut config: ResumeSessionConfig) -> Result<Session, Error> {
1075        let total_start = Instant::now();
1076        let session_id = config.session_id.clone();
1077        if config.hooks_handler.is_some() && config.hooks.is_none() {
1078            config.hooks = Some(true);
1079        }
1080        if let Some(transforms) = config.system_message_transform.clone() {
1081            inject_transform_sections_resume(&mut config, transforms.as_ref());
1082        }
1083        let mode = self.inner.mode;
1084        if mode == crate::ClientMode::Empty && config.available_tools.is_none() {
1085            return Err(Error::with_message(
1086                ErrorKind::InvalidConfig,
1087                "ClientMode::Empty requires available_tools to be set on the session config. \
1088                 Use ToolSet to specify which tools the session may use (e.g. \
1089                 ToolSet::new().add_builtin_many(BUILTIN_TOOLS_ISOLATED)).",
1090            ));
1091        }
1092        crate::mode::validate_tool_filter_list(
1093            "available_tools",
1094            config.available_tools.as_deref(),
1095        )?;
1096        crate::mode::validate_tool_filter_list("excluded_tools", config.excluded_tools.as_deref())?;
1097        config.system_message =
1098            crate::mode::system_message_for_mode(mode, config.system_message.take());
1099        config.memory = crate::mode::memory_for_mode(mode, config.memory.take());
1100        if mode == crate::ClientMode::Empty {
1101            if config.enable_session_telemetry.is_none() {
1102                config.enable_session_telemetry = Some(false);
1103            }
1104            if config.skip_embedding_retrieval.is_none() {
1105                config.skip_embedding_retrieval = Some(true);
1106            }
1107            if config.enable_on_demand_instruction_discovery.is_none() {
1108                config.enable_on_demand_instruction_discovery = Some(false);
1109            }
1110            if config.enable_file_hooks.is_none() {
1111                config.enable_file_hooks = Some(false);
1112            }
1113            if config.enable_host_git_operations.is_none() {
1114                config.enable_host_git_operations = Some(false);
1115            }
1116            if config.enable_session_store.is_none() {
1117                config.enable_session_store = Some(false);
1118            }
1119            if config.enable_skills.is_none() {
1120                config.enable_skills = Some(false);
1121            }
1122        }
1123        if mode == crate::ClientMode::Empty && config.mcp_oauth_token_storage.is_none() {
1124            config.mcp_oauth_token_storage = Some("in-memory".into());
1125        }
1126        if mode == crate::ClientMode::Empty && config.embedding_cache_storage.is_none() {
1127            config.embedding_cache_storage = Some("in-memory".into());
1128        }
1129        let opt_skip_custom_instructions = config.skip_custom_instructions;
1130        let opt_custom_agents_local_only = config.custom_agents_local_only;
1131        let opt_coauthor_enabled = config.coauthor_enabled;
1132        let opt_manage_schedule_enabled = config.manage_schedule_enabled;
1133        let (wire, mut runtime) = config.into_wire()?;
1134
1135        let permission_handler = crate::permission::resolve_handler(
1136            runtime.permission_handler.take(),
1137            runtime.permission_policy.take(),
1138        );
1139        let handlers = SessionHandlers {
1140            permission: permission_handler,
1141            elicitation: runtime.elicitation_handler.take(),
1142            user_input: runtime.user_input_handler.take(),
1143            exit_plan_mode: runtime.exit_plan_mode_handler.take(),
1144            auto_mode_switch: runtime.auto_mode_switch_handler.take(),
1145            tools: Arc::new(std::mem::take(&mut runtime.tool_handlers)),
1146        };
1147        let hooks = runtime.hooks_handler.take();
1148        let transforms = runtime.system_message_transform.take();
1149        let tools_count = wire.tools.as_ref().map_or(0, Vec::len);
1150        let commands_count = runtime.commands.as_ref().map_or(0, Vec::len);
1151        let has_hooks = hooks.is_some();
1152        let command_handlers = build_command_handler_map(runtime.commands.as_deref());
1153        let canvas_handler = runtime.canvas_handler.take();
1154        let session_fs_provider = runtime.session_fs_provider.take();
1155        let bearer_token_providers = std::mem::take(&mut runtime.bearer_token_providers);
1156        if self.inner.session_fs_configured && session_fs_provider.is_none() {
1157            return Err(ErrorKind::Session(SessionErrorKind::SessionFsProviderRequired).into());
1158        }
1159        if self.inner.session_fs_sqlite_declared
1160            && let Some(ref provider) = session_fs_provider
1161            && provider.sqlite().is_none()
1162        {
1163            return Err(Error::with_message(
1164                ErrorKind::InvalidConfig,
1165                "SessionFs capabilities declare SQLite support but the provider \
1166                 does not implement SessionFsSqliteProvider",
1167            ));
1168        }
1169
1170        let mut params = serde_json::to_value(&wire)?;
1171        let trace_ctx = self.resolve_trace_context().await;
1172        inject_trace_context(&mut params, &trace_ctx);
1173
1174        let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default()));
1175        let setup_start = Instant::now();
1176        let channels = self.register_session(&session_id);
1177        let idle_waiter = Arc::new(ParkingLotMutex::new(None));
1178        let open_canvases = Arc::new(parking_lot::RwLock::new(Vec::new()));
1179        let shutdown = CancellationToken::new();
1180        let (event_tx, _) = tokio::sync::broadcast::channel(512);
1181        let event_loop = spawn_event_loop(
1182            session_id.clone(),
1183            self.clone(),
1184            handlers,
1185            hooks,
1186            transforms,
1187            command_handlers,
1188            canvas_handler,
1189            session_fs_provider,
1190            bearer_token_providers,
1191            channels,
1192            idle_waiter.clone(),
1193            capabilities.clone(),
1194            open_canvases.clone(),
1195            event_tx.clone(),
1196            shutdown.clone(),
1197        );
1198        let mut registration =
1199            PendingSessionRegistration::new(self.clone(), session_id.clone(), shutdown.clone());
1200        tracing::debug!(
1201            elapsed_ms = setup_start.elapsed().as_millis(),
1202            session_id = %session_id,
1203            tools_count,
1204            commands_count,
1205            has_hooks,
1206            "Client::resume_session local setup complete"
1207        );
1208
1209        let rpc_start = Instant::now();
1210        let result = match self.call("session.resume", Some(params)).await {
1211            Ok(result) => result,
1212            Err(error) => {
1213                registration.cleanup(event_loop).await;
1214                return Err(error);
1215            }
1216        };
1217        tracing::debug!(
1218            elapsed_ms = rpc_start.elapsed().as_millis(),
1219            session_id = %session_id,
1220            "Client::resume_session session resume request completed successfully"
1221        );
1222
1223        let resume_result: ResumeSessionResult = match serde_json::from_value(result) {
1224            Ok(result) => result,
1225            Err(error) => {
1226                registration.cleanup(event_loop).await;
1227                return Err(error.into());
1228            }
1229        };
1230        let cli_session_id = resume_result
1231            .session_id
1232            .clone()
1233            .unwrap_or_else(|| session_id.clone());
1234        if cli_session_id != session_id {
1235            registration.cleanup(event_loop).await;
1236            return Err(ErrorKind::Session(SessionErrorKind::SessionIdMismatch {
1237                requested: session_id,
1238                returned: cli_session_id,
1239            })
1240            .into());
1241        }
1242
1243        // Reload skills after resume (best-effort).
1244        let skills_reload_start = Instant::now();
1245        if let Err(e) = self
1246            .call(
1247                "session.skills.reload",
1248                Some(serde_json::json!({ "sessionId": session_id })),
1249            )
1250            .await
1251        {
1252            warn!(
1253                elapsed_ms = skills_reload_start.elapsed().as_millis(),
1254                session_id = %session_id,
1255                error = %e,
1256                "Client::resume_session skills reload request failed"
1257            );
1258        } else {
1259            tracing::debug!(
1260                elapsed_ms = skills_reload_start.elapsed().as_millis(),
1261                session_id = %session_id,
1262                "Client::resume_session skills reload request completed successfully"
1263            );
1264        }
1265
1266        *capabilities.write() = resume_result.capabilities.unwrap_or_default();
1267        // Upsert resume snapshots rather than replacing wholesale. Live
1268        // `session.canvas.opened` notifications can arrive on the event loop
1269        // while `session.resume` is in flight; a wholesale replace would
1270        // discard those updates.
1271        {
1272            let mut snapshots = open_canvases.write();
1273            for snapshot in resume_result.open_canvases.unwrap_or_default() {
1274                upsert_open_canvas_snapshot(&mut snapshots, snapshot);
1275            }
1276        }
1277
1278        tracing::debug!(
1279            elapsed_ms = total_start.elapsed().as_millis(),
1280            session_id = %session_id,
1281            "Client::resume_session complete"
1282        );
1283        registration.disarm();
1284        let session = Session {
1285            id: session_id,
1286            cwd: self.cwd().clone(),
1287            workspace_path: resume_result.workspace_path,
1288            remote_url: resume_result.remote_url,
1289            client: self.clone(),
1290            event_loop: ParkingLotMutex::new(Some(event_loop)),
1291            shutdown,
1292            idle_waiter,
1293            capabilities,
1294            open_canvases,
1295            event_tx,
1296        };
1297        apply_mode_post_create_patch(
1298            &session,
1299            mode,
1300            opt_skip_custom_instructions,
1301            opt_custom_agents_local_only,
1302            opt_coauthor_enabled,
1303            opt_manage_schedule_enabled,
1304        )
1305        .await?;
1306        Ok(session)
1307    }
1308}
1309
1310type CommandHandlerMap = HashMap<String, Arc<dyn CommandHandler>>;
1311
1312async fn apply_mode_post_create_patch(
1313    session: &Session,
1314    mode: crate::ClientMode,
1315    opt_skip_custom_instructions: Option<bool>,
1316    opt_custom_agents_local_only: Option<bool>,
1317    opt_coauthor_enabled: Option<bool>,
1318    opt_manage_schedule_enabled: Option<bool>,
1319) -> Result<(), Error> {
1320    use crate::generated::api_types::SessionUpdateOptionsParams;
1321    let mut patch = SessionUpdateOptionsParams::default();
1322    let should_send = if mode == crate::ClientMode::Empty {
1323        patch.skip_custom_instructions = Some(opt_skip_custom_instructions.unwrap_or(true));
1324        patch.custom_agents_local_only = Some(opt_custom_agents_local_only.unwrap_or(true));
1325        patch.coauthor_enabled = Some(opt_coauthor_enabled.unwrap_or(false));
1326        patch.manage_schedule_enabled = Some(opt_manage_schedule_enabled.unwrap_or(false));
1327        patch.installed_plugins = Some(Vec::new());
1328        true
1329    } else {
1330        let mut any = false;
1331        if let Some(v) = opt_skip_custom_instructions {
1332            patch.skip_custom_instructions = Some(v);
1333            any = true;
1334        }
1335        if let Some(v) = opt_custom_agents_local_only {
1336            patch.custom_agents_local_only = Some(v);
1337            any = true;
1338        }
1339        if let Some(v) = opt_coauthor_enabled {
1340            patch.coauthor_enabled = Some(v);
1341            any = true;
1342        }
1343        if let Some(v) = opt_manage_schedule_enabled {
1344            patch.manage_schedule_enabled = Some(v);
1345            any = true;
1346        }
1347        any
1348    };
1349    if !should_send {
1350        return Ok(());
1351    }
1352    if let Err(error) = session.rpc().options().update(patch).await {
1353        let _ = session.disconnect().await;
1354        return Err(error);
1355    }
1356    Ok(())
1357}
1358
1359fn build_command_handler_map(commands: Option<&[CommandDefinition]>) -> Arc<CommandHandlerMap> {
1360    let map = match commands {
1361        Some(commands) => commands
1362            .iter()
1363            .filter(|cmd| !cmd.name.is_empty())
1364            .map(|cmd| (cmd.name.clone(), cmd.handler.clone()))
1365            .collect(),
1366        None => HashMap::new(),
1367    };
1368    Arc::new(map)
1369}
1370
1371fn upsert_open_canvas_snapshot(
1372    snapshots: &mut Vec<OpenCanvasInstance>,
1373    snapshot: OpenCanvasInstance,
1374) {
1375    if let Some(existing) = snapshots
1376        .iter_mut()
1377        .find(|open| open.instance_id == snapshot.instance_id)
1378    {
1379        *existing = snapshot;
1380    } else {
1381        snapshots.push(snapshot);
1382    }
1383}
1384
1385fn remove_open_canvas_snapshot(snapshots: &mut Vec<OpenCanvasInstance>, instance_id: &str) {
1386    snapshots.retain(|open| open.instance_id != instance_id);
1387}
1388
1389#[allow(clippy::too_many_arguments)]
1390fn spawn_event_loop(
1391    session_id: SessionId,
1392    client: Client,
1393    handlers: SessionHandlers,
1394    hooks: Option<Arc<dyn SessionHooks>>,
1395    transforms: Option<Arc<dyn SystemMessageTransform>>,
1396    command_handlers: Arc<CommandHandlerMap>,
1397    canvas_handler: Option<Arc<dyn CanvasHandler>>,
1398    session_fs_provider: Option<Arc<dyn SessionFsProvider>>,
1399    bearer_token_providers: HashMap<String, Arc<dyn BearerTokenProvider>>,
1400    channels: crate::router::SessionChannels,
1401    idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
1402    capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
1403    open_canvases: Arc<parking_lot::RwLock<Vec<OpenCanvasInstance>>>,
1404    event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
1405    shutdown: CancellationToken,
1406) -> JoinHandle<()> {
1407    let crate::router::SessionChannels {
1408        mut notifications,
1409        mut requests,
1410    } = channels;
1411
1412    let span = tracing::error_span!("session_event_loop", session_id = %session_id);
1413    tokio::spawn(
1414        async move {
1415            loop {
1416                // `mpsc::UnboundedReceiver::recv` and
1417                // `CancellationToken::cancelled` are both cancel-safe per
1418                // RFD 400. The selected branch's `await`'d handler is
1419                // *not* mid-cancelled by the select — once a branch fires
1420                // it runs to completion within the loop's iteration.
1421                // Spawned child tasks inside `handle_notification`
1422                // (permission/tool/elicitation callbacks) intentionally
1423                // outlive the parent loop and own their own cleanup;
1424                // this is RFD 400's "spawn background tasks to perform
1425                // cancel-unsafe operations" pattern and is correct as-is.
1426                tokio::select! {
1427                    _ = shutdown.cancelled() => break,
1428                    Some(notification) = notifications.recv() => {
1429                        handle_notification(
1430                            &session_id, &client, &handlers, &command_handlers, notification, &idle_waiter, &capabilities, &open_canvases, &event_tx,
1431                        ).await;
1432                    }
1433                    Some(request) = requests.recv() => {
1434                        let ctx = RequestDispatchContext {
1435                            client: &client,
1436                            handlers: &handlers,
1437                            hooks: hooks.as_deref(),
1438                            transforms: transforms.as_deref(),
1439                            canvas_handler: canvas_handler.as_ref(),
1440                            session_fs_provider: session_fs_provider.as_ref(),
1441                            bearer_token_providers: &bearer_token_providers,
1442                        };
1443                        handle_request(&session_id, ctx, request).await;
1444                    }
1445                    else => break,
1446                }
1447            }
1448            // Channels closed or shutdown signaled — fail any pending
1449            // send_and_wait so the caller observes a clean error.
1450            if let Some(waiter) = idle_waiter.lock().take() {
1451                let _ = waiter
1452                    .tx
1453                    .send(Err(ErrorKind::Session(SessionErrorKind::EventLoopClosed).into()));
1454            }
1455        }
1456        .instrument(span),
1457    )
1458}
1459
1460fn extract_request_id(data: &Value) -> Option<RequestId> {
1461    data.get("requestId")
1462        .and_then(|v| v.as_str())
1463        .filter(|s| !s.is_empty())
1464        .map(RequestId::new)
1465}
1466
1467/// Map a [`PermissionResult`] to the `result` payload sent back to the
1468/// server via `session.permissions.handlePendingPermissionRequest`.
1469///
1470/// Returns `None` when the SDK must not send a response.
1471fn notification_permission_payload(result: &PermissionResult) -> Option<Value> {
1472    match result {
1473        PermissionResult::NoResult => None,
1474        PermissionResult::Decision(decision) => Some(
1475            serde_json::to_value(decision).expect("serializing permission decision should succeed"),
1476        ),
1477    }
1478}
1479
1480fn tool_failure_result(message: impl Into<String>) -> ToolResult {
1481    let message = message.into();
1482    ToolResult::Expanded(ToolResultExpanded {
1483        text_result_for_llm: message.clone(),
1484        result_type: "failure".to_string(),
1485        binary_results_for_llm: None,
1486        session_log: None,
1487        error: Some(message),
1488        tool_telemetry: None,
1489    })
1490}
1491
1492/// Process a notification from the CLI's broadcast channel.
1493#[allow(clippy::too_many_arguments)]
1494async fn handle_notification(
1495    session_id: &SessionId,
1496    client: &Client,
1497    handlers: &SessionHandlers,
1498    command_handlers: &Arc<CommandHandlerMap>,
1499    notification: SessionEventNotification,
1500    idle_waiter: &Arc<ParkingLotMutex<Option<IdleWaiter>>>,
1501    capabilities: &Arc<parking_lot::RwLock<SessionCapabilities>>,
1502    open_canvases: &Arc<parking_lot::RwLock<Vec<OpenCanvasInstance>>>,
1503    event_tx: &tokio::sync::broadcast::Sender<SessionEvent>,
1504) {
1505    let dispatch_start = Instant::now();
1506    let event = notification.event.clone();
1507    let event_type = event.parsed_type();
1508    if event_type == SessionEventType::PermissionRequested {
1509        tracing::debug!(
1510            session_id = %session_id,
1511            event_type = %event.event_type,
1512            "Session::handle_notification permission request received"
1513        );
1514    }
1515
1516    // Signal send_and_wait if active. The lock is only contended when
1517    // a send_and_wait call is in flight (idle_waiter is Some).
1518    match event_type {
1519        SessionEventType::AssistantMessage
1520        | SessionEventType::SessionIdle
1521        | SessionEventType::SessionError => {
1522            let mut guard = idle_waiter.lock();
1523            if let Some(waiter) = guard.as_mut() {
1524                match event_type {
1525                    SessionEventType::AssistantMessage => {
1526                        if !waiter.first_assistant_message_seen {
1527                            waiter.first_assistant_message_seen = true;
1528                            tracing::debug!(
1529                                elapsed_ms = waiter.started_at.elapsed().as_millis(),
1530                                session_id = %session_id,
1531                                "Session::send_and_wait first assistant message"
1532                            );
1533                        }
1534                        waiter.last_assistant_message = Some(event.clone());
1535                    }
1536                    SessionEventType::SessionIdle | SessionEventType::SessionError => {
1537                        if let Some(waiter) = guard.take() {
1538                            if event_type == SessionEventType::SessionIdle {
1539                                tracing::debug!(
1540                                    elapsed_ms = waiter.started_at.elapsed().as_millis(),
1541                                    session_id = %session_id,
1542                                    "Session::send_and_wait idle received"
1543                                );
1544                                let _ = waiter.tx.send(Ok(waiter.last_assistant_message));
1545                            } else {
1546                                let error_msg = event
1547                                    .typed_data::<SessionErrorData>()
1548                                    .map(|d| d.message)
1549                                    .or_else(|| {
1550                                        event
1551                                            .data
1552                                            .get("message")
1553                                            .and_then(|v| v.as_str())
1554                                            .map(|s| s.to_string())
1555                                    })
1556                                    .unwrap_or_else(|| "session error".to_string());
1557                                let _ = waiter.tx.send(Err(Error::with_message(
1558                                    ErrorKind::Session(SessionErrorKind::AgentError),
1559                                    error_msg,
1560                                )));
1561                            }
1562                        }
1563                    }
1564                    _ => {}
1565                }
1566            }
1567        }
1568        _ => {}
1569    }
1570
1571    // Update the snapshot caches BEFORE broadcasting so subscribers that
1572    // call `Session::capabilities()` / `Session::open_canvases()` in
1573    // response to the event observe the new state.
1574    if event_type == SessionEventType::CapabilitiesChanged {
1575        match serde_json::from_value::<SessionCapabilities>(notification.event.data.clone()) {
1576            Ok(changed) => *capabilities.write() = changed,
1577            Err(e) => warn!(error = %e, "failed to deserialize capabilities.changed payload"),
1578        }
1579    }
1580    if event_type == SessionEventType::SessionCanvasOpened {
1581        match serde_json::from_value::<OpenCanvasInstance>(notification.event.data.clone()) {
1582            Ok(open_canvas) => {
1583                upsert_open_canvas_snapshot(&mut open_canvases.write(), open_canvas);
1584            }
1585            Err(e) => warn!(error = %e, "failed to deserialize session.canvas.opened payload"),
1586        }
1587    }
1588    if event_type == SessionEventType::SessionCanvasClosed {
1589        match serde_json::from_value::<SessionCanvasClosedData>(notification.event.data.clone()) {
1590            Ok(closed) => {
1591                if closed.instance_id.is_empty() {
1592                    warn!("failed to deserialize session.canvas.closed payload");
1593                } else {
1594                    remove_open_canvas_snapshot(&mut open_canvases.write(), &closed.instance_id);
1595                }
1596            }
1597            Err(e) => warn!(error = %e, "failed to deserialize session.canvas.closed payload"),
1598        }
1599    }
1600
1601    // Fan out the event to runtime subscribers (`Session::subscribe`). `send`
1602    // only errors when there are no receivers, which is the normal case
1603    // before any consumer subscribes.
1604    let _ = event_tx.send(event.clone());
1605
1606    tracing::debug!(
1607        elapsed_ms = dispatch_start.elapsed().as_millis(),
1608        session_id = %session_id,
1609        event_type = %notification.event.event_type,
1610        "Session::handle_notification dispatch"
1611    );
1612
1613    // Notification-based permission/tool/elicitation requests require a
1614    // separate RPC callback. Spawn concurrently since the CLI doesn't block.
1615    match event_type {
1616        SessionEventType::PermissionRequested => {
1617            let Some(request_id) = extract_request_id(&notification.event.data) else {
1618                return;
1619            };
1620            // Honor the runtime's `resolvedByHook` signal — when the
1621            // server has already resolved the permission via a hook,
1622            // clients must not send a second response.
1623            if notification
1624                .event
1625                .data
1626                .get("resolvedByHook")
1627                .and_then(|v| v.as_bool())
1628                .unwrap_or(false)
1629            {
1630                return;
1631            }
1632            // Multi-client safety: if this client has no permission
1633            // handler installed, don't respond — another client on the
1634            // same CLI may handle it.
1635            let Some(permission_handler) = handlers.permission.clone() else {
1636                return;
1637            };
1638            let client = client.clone();
1639            let sid = session_id.clone();
1640            let data: PermissionRequestData =
1641                serde_json::from_value(notification.event.data.clone()).unwrap_or_else(|_| {
1642                    PermissionRequestData {
1643                        kind: None,
1644                        tool_call_id: None,
1645                        extra: notification.event.data.clone(),
1646                    }
1647                });
1648            let span = tracing::error_span!(
1649                "permission_request_handler",
1650                session_id = %sid,
1651                request_id = %request_id
1652            );
1653            tokio::spawn(
1654                async move {
1655                    let handler_start = Instant::now();
1656                    let result = permission_handler
1657                        .handle(sid.clone(), request_id.clone(), data)
1658                        .await;
1659                    tracing::debug!(
1660                        elapsed_ms = handler_start.elapsed().as_millis(),
1661                        session_id = %sid,
1662                        request_id = %request_id,
1663                        "PermissionHandler::handle dispatch"
1664                    );
1665                    let Some(result_value) = notification_permission_payload(&result) else {
1666                        // Handler returned Deferred / NoResult — it will
1667                        // call handlePendingPermissionRequest itself (or
1668                        // leave the request unanswered).
1669                        return;
1670                    };
1671                    let rpc_start = Instant::now();
1672                    let _ = client
1673                        .call(
1674                            "session.permissions.handlePendingPermissionRequest",
1675                            Some(serde_json::json!({
1676                                "sessionId": sid,
1677                                "requestId": request_id,
1678                                "result": result_value,
1679                            })),
1680                        )
1681                        .await;
1682                    tracing::debug!(
1683                        elapsed_ms = rpc_start.elapsed().as_millis(),
1684                        session_id = %sid,
1685                        request_id = %request_id,
1686                        "Session::handle_notification response sent successfully"
1687                    );
1688                }
1689                .instrument(span),
1690            );
1691        }
1692        SessionEventType::ExternalToolRequested => {
1693            let Some(request_id) = extract_request_id(&notification.event.data) else {
1694                return;
1695            };
1696            let data: ExternalToolRequestedData =
1697                match serde_json::from_value(notification.event.data.clone()) {
1698                    Ok(d) => d,
1699                    Err(e) => {
1700                        warn!(error = %e, "failed to deserialize external_tool.requested");
1701                        let client = client.clone();
1702                        let sid = session_id.clone();
1703                        let span = tracing::error_span!(
1704                            "external_tool_deserialize_error",
1705                            session_id = %sid,
1706                            request_id = %request_id
1707                        );
1708                        tokio::spawn(
1709                            async move {
1710                                let rpc_start = Instant::now();
1711                                let _ = client
1712                                .call(
1713                                    "session.tools.handlePendingToolCall",
1714                                    Some(serde_json::json!({
1715                                        "sessionId": sid,
1716                                        "requestId": request_id,
1717                                        "error": format!("Failed to deserialize tool request: {e}"),
1718                                    })),
1719                                )
1720                                .await;
1721                                tracing::debug!(
1722                                    elapsed_ms = rpc_start.elapsed().as_millis(),
1723                                    session_id = %sid,
1724                                    request_id = %request_id,
1725                                    "Session::handle_notification response sent successfully"
1726                                );
1727                            }
1728                            .instrument(span),
1729                        );
1730                        return;
1731                    }
1732                };
1733            // Multi-client safety: look up a handler for the requested
1734            // tool name. If this client has no handler installed for that
1735            // tool, don't respond — another connected client may have one.
1736            let tool_handler = if data.tool_name.is_empty() {
1737                None
1738            } else {
1739                handlers.tools.get(&data.tool_name).cloned()
1740            };
1741            let Some(tool_handler) = tool_handler else {
1742                return;
1743            };
1744            let client = client.clone();
1745            let sid = session_id.clone();
1746            let span = tracing::error_span!(
1747                "external_tool_handler",
1748                session_id = %sid,
1749                request_id = %request_id
1750            );
1751            tokio::spawn(
1752                async move {
1753                    // `tool_name.is_empty()` would have produced a `None`
1754                    // lookup in `handlers.tools` and short-circuited at the
1755                    // outer guard above, so only the tool_call_id check is
1756                    // reachable here.
1757                    if data.tool_call_id.is_empty() {
1758                        let error_msg = "Missing toolCallId";
1759                        let rpc_start = Instant::now();
1760                        let _ = client
1761                            .call(
1762                                "session.tools.handlePendingToolCall",
1763                                Some(serde_json::json!({
1764                                    "sessionId": sid,
1765                                    "requestId": request_id,
1766                                    "error": error_msg,
1767                                })),
1768                            )
1769                            .await;
1770                        tracing::debug!(
1771                            elapsed_ms = rpc_start.elapsed().as_millis(),
1772                            session_id = %sid,
1773                            request_id = %request_id,
1774                            "Session::handle_notification response sent successfully"
1775                        );
1776                        return;
1777                    }
1778                    let tool_call_id = data.tool_call_id.clone();
1779                    let tool_name = data.tool_name.clone();
1780                    let invocation = ToolInvocation {
1781                        session_id: sid.clone(),
1782                        tool_call_id: data.tool_call_id,
1783                        tool_name: data.tool_name,
1784                        arguments: data
1785                            .arguments
1786                            .unwrap_or(Value::Object(serde_json::Map::new())),
1787                        traceparent: data.traceparent,
1788                        tracestate: data.tracestate,
1789                    };
1790                    let handler_start = Instant::now();
1791                    let tool_result = match tool_handler.call(invocation).await {
1792                        Ok(r) => r,
1793                        Err(e) => tool_failure_result(e.to_string()),
1794                    };
1795                    tracing::debug!(
1796                        elapsed_ms = handler_start.elapsed().as_millis(),
1797                        session_id = %sid,
1798                        request_id = %request_id,
1799                        tool_call_id = %tool_call_id,
1800                        tool_name = %tool_name,
1801                        "ToolHandler::call dispatch"
1802                    );
1803                    let result_value = serde_json::to_value(tool_result).unwrap_or(Value::Null);
1804                    let rpc_start = Instant::now();
1805                    let _ = client
1806                        .call(
1807                            "session.tools.handlePendingToolCall",
1808                            Some(serde_json::json!({
1809                                "sessionId": sid,
1810                                "requestId": request_id,
1811                                "result": result_value,
1812                            })),
1813                        )
1814                        .await;
1815                    tracing::debug!(
1816                        elapsed_ms = rpc_start.elapsed().as_millis(),
1817                        session_id = %sid,
1818                        request_id = %request_id,
1819                        tool_call_id = %tool_call_id,
1820                        tool_name = %tool_name,
1821                        "Session::handle_notification response sent successfully"
1822                    );
1823                }
1824                .instrument(span),
1825            );
1826        }
1827        SessionEventType::UserInputRequested => {
1828            // Notification-only signal for observers (UI, telemetry).
1829            // The CLI follows up with a `userInput.request` JSON-RPC call
1830            // that drives the `UserInputHandler` dispatch — handling
1831            // the notification here too would double-fire the handler
1832            // and produce duplicate prompts on the consumer side. See
1833            // github/github-app#4249.
1834        }
1835        SessionEventType::ElicitationRequested => {
1836            let Some(request_id) = extract_request_id(&notification.event.data) else {
1837                return;
1838            };
1839            // Multi-client safety: if this client has no elicitation
1840            // handler installed, don't respond — another client on the
1841            // same CLI may handle it.
1842            let Some(elicitation_handler) = handlers.elicitation.clone() else {
1843                return;
1844            };
1845            let elicitation_data: ElicitationRequestedData =
1846                match serde_json::from_value(notification.event.data.clone()) {
1847                    Ok(d) => d,
1848                    Err(e) => {
1849                        warn!(error = %e, "failed to deserialize elicitation request");
1850                        return;
1851                    }
1852                };
1853            let request = ElicitationRequest {
1854                message: elicitation_data.message,
1855                requested_schema: elicitation_data
1856                    .requested_schema
1857                    .map(|s| serde_json::to_value(s).unwrap_or(Value::Null)),
1858                mode: elicitation_data.mode.map(|m| match m {
1859                    crate::generated::session_events::ElicitationRequestedMode::Form => {
1860                        crate::types::ElicitationMode::Form
1861                    }
1862                    crate::generated::session_events::ElicitationRequestedMode::Url => {
1863                        crate::types::ElicitationMode::Url
1864                    }
1865                    _ => crate::types::ElicitationMode::Unknown,
1866                }),
1867                elicitation_source: elicitation_data.elicitation_source,
1868                url: elicitation_data.url,
1869            };
1870            let client = client.clone();
1871            let sid = session_id.clone();
1872            let span = tracing::error_span!(
1873                "elicitation_request_handler",
1874                session_id = %sid,
1875                request_id = %request_id
1876            );
1877            tokio::spawn(
1878                async move {
1879                    let cancel = ElicitationResult {
1880                        action: "cancel".to_string(),
1881                        content: None,
1882                    };
1883                    // Dispatch to a nested task so panics are caught as JoinErrors.
1884                    let handler_task = tokio::spawn({
1885                        let sid = sid.clone();
1886                        let request_id = request_id.clone();
1887                        let span = tracing::error_span!(
1888                            "elicitation_callback",
1889                            session_id = %sid,
1890                            request_id = %request_id
1891                        );
1892                        async move {
1893                            let handler_start = Instant::now();
1894                            let response = elicitation_handler
1895                                .handle(sid.clone(), request_id.clone(), request)
1896                                .await;
1897                            tracing::debug!(
1898                                elapsed_ms = handler_start.elapsed().as_millis(),
1899                                session_id = %sid,
1900                                request_id = %request_id,
1901                                "ElicitationHandler::handle dispatch"
1902                            );
1903                            response
1904                        }
1905                        .instrument(span)
1906                    });
1907                    let result = match handler_task.await {
1908                        Ok(r) => r,
1909                        Err(_) => cancel.clone(),
1910                    };
1911                    let rpc_start = Instant::now();
1912                    if let Err(e) = client
1913                        .call(
1914                            "session.ui.handlePendingElicitation",
1915                            Some(serde_json::json!({
1916                                "sessionId": sid,
1917                                "requestId": request_id,
1918                                "result": result,
1919                            })),
1920                        )
1921                        .await
1922                    {
1923                        // RPC failed — attempt cancel as last resort
1924                        warn!(error = %e, "handlePendingElicitation failed, sending cancel");
1925                        let _ = client
1926                            .call(
1927                                "session.ui.handlePendingElicitation",
1928                                Some(serde_json::json!({
1929                                    "sessionId": sid,
1930                                    "requestId": request_id,
1931                                    "result": cancel,
1932                                })),
1933                            )
1934                            .await;
1935                    } else {
1936                        tracing::debug!(
1937                            elapsed_ms = rpc_start.elapsed().as_millis(),
1938                            session_id = %sid,
1939                            request_id = %request_id,
1940                            "Session::handle_notification response sent successfully"
1941                        );
1942                    }
1943                }
1944                .instrument(span),
1945            );
1946        }
1947        SessionEventType::CommandExecute => {
1948            let data: CommandExecuteData =
1949                match serde_json::from_value(notification.event.data.clone()) {
1950                    Ok(d) => d,
1951                    Err(e) => {
1952                        warn!(error = %e, "failed to deserialize command.execute");
1953                        return;
1954                    }
1955                };
1956            let client = client.clone();
1957            let command_handlers = command_handlers.clone();
1958            let sid = session_id.clone();
1959            let span = tracing::error_span!("command_handler", session_id = %sid);
1960            tokio::spawn(
1961                async move {
1962                    let request_id = data.request_id;
1963                    let ack_error = match command_handlers.get(&data.command_name).cloned() {
1964                        None => Some(format!("Unknown command: {}", data.command_name)),
1965                        Some(handler) => {
1966                            let command_name = data.command_name.clone();
1967                            let ctx = CommandContext {
1968                                session_id: sid.clone(),
1969                                command: data.command,
1970                                command_name: data.command_name,
1971                                args: data.args,
1972                            };
1973                            let handler_start = Instant::now();
1974                            let result = handler.on_command(ctx).await;
1975                            tracing::debug!(
1976                                elapsed_ms = handler_start.elapsed().as_millis(),
1977                                session_id = %sid,
1978                                request_id = %request_id,
1979                                command_name = %command_name,
1980                                "CommandHandler::call dispatch"
1981                            );
1982                            match result {
1983                                Ok(()) => None,
1984                                Err(e) => Some(e.to_string()),
1985                            }
1986                        }
1987                    };
1988                    let mut params = serde_json::json!({
1989                        "sessionId": sid,
1990                        "requestId": request_id,
1991                    });
1992                    if let Some(error_msg) = ack_error {
1993                        params["error"] = serde_json::Value::String(error_msg);
1994                    }
1995                    let rpc_start = Instant::now();
1996                    let _ = client
1997                        .call("session.commands.handlePendingCommand", Some(params))
1998                        .await;
1999                    tracing::debug!(
2000                        elapsed_ms = rpc_start.elapsed().as_millis(),
2001                        session_id = %sid,
2002                        request_id = %request_id,
2003                        "Session::handle_notification response sent successfully"
2004                    );
2005                }
2006                .instrument(span),
2007            );
2008        }
2009        _ => {}
2010    }
2011}
2012
2013struct RequestDispatchContext<'a> {
2014    client: &'a Client,
2015    handlers: &'a SessionHandlers,
2016    hooks: Option<&'a dyn SessionHooks>,
2017    transforms: Option<&'a dyn SystemMessageTransform>,
2018    canvas_handler: Option<&'a Arc<dyn CanvasHandler>>,
2019    session_fs_provider: Option<&'a Arc<dyn SessionFsProvider>>,
2020    bearer_token_providers: &'a HashMap<String, Arc<dyn BearerTokenProvider>>,
2021}
2022
2023/// Process a JSON-RPC request from the CLI.
2024async fn handle_request(
2025    session_id: &SessionId,
2026    ctx: RequestDispatchContext<'_>,
2027    request: crate::JsonRpcRequest,
2028) {
2029    let sid = session_id.clone();
2030    let client = ctx.client;
2031    let handlers = ctx.handlers;
2032    let hooks = ctx.hooks;
2033    let transforms = ctx.transforms;
2034    let canvas_handler = ctx.canvas_handler;
2035    let session_fs_provider = ctx.session_fs_provider;
2036    let bearer_token_providers = ctx.bearer_token_providers;
2037
2038    if request.method.starts_with("sessionFs.") {
2039        crate::session_fs_dispatch::dispatch(client, session_fs_provider, request).await;
2040        return;
2041    }
2042
2043    if request.method.starts_with("canvas.") {
2044        crate::canvas_dispatch::dispatch(client, canvas_handler, request).await;
2045        return;
2046    }
2047
2048    if request.method == crate::generated::api_types::rpc_methods::PROVIDERTOKEN_GETTOKEN {
2049        crate::provider_token_dispatch::dispatch(client, bearer_token_providers, request).await;
2050        return;
2051    }
2052
2053    match request.method.as_str() {
2054        "hooks.invoke" => {
2055            let params = request.params.as_ref();
2056            let hook_type = params
2057                .and_then(|p| p.get("hookType"))
2058                .and_then(|v| v.as_str())
2059                .unwrap_or("");
2060            let input = params
2061                .and_then(|p| p.get("input"))
2062                .cloned()
2063                .unwrap_or(Value::Object(Default::default()));
2064
2065            let rpc_result = if let Some(hooks) = hooks {
2066                match crate::hooks::dispatch_hook(hooks, &sid, hook_type, input).await {
2067                    Ok(output) => output,
2068                    Err(e) => {
2069                        warn!(error = %e, hook_type = hook_type, "hook dispatch failed");
2070                        serde_json::json!({ "output": {} })
2071                    }
2072                }
2073            } else {
2074                serde_json::json!({ "output": {} })
2075            };
2076
2077            let rpc_response = JsonRpcResponse {
2078                jsonrpc: "2.0".to_string(),
2079                id: request.id,
2080                result: Some(rpc_result),
2081                error: None,
2082            };
2083            let _ = client.send_response(&rpc_response).await;
2084        }
2085
2086        "userInput.request" => {
2087            let params = request.params.as_ref();
2088            let Some(question) = params
2089                .and_then(|p| p.get("question"))
2090                .and_then(|v| v.as_str())
2091            else {
2092                warn!("userInput.request missing 'question' field");
2093                let rpc_response = JsonRpcResponse {
2094                    jsonrpc: "2.0".to_string(),
2095                    id: request.id,
2096                    result: None,
2097                    error: Some(crate::JsonRpcError {
2098                        code: error_codes::INVALID_PARAMS,
2099                        message: "missing required field: question".to_string(),
2100                        data: None,
2101                    }),
2102                };
2103                let _ = client.send_response(&rpc_response).await;
2104                return;
2105            };
2106            let question = question.to_string();
2107            let choices = params
2108                .and_then(|p| p.get("choices"))
2109                .and_then(|v| v.as_array())
2110                .map(|arr| {
2111                    arr.iter()
2112                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
2113                        .collect()
2114                });
2115            let allow_freeform = params
2116                .and_then(|p| p.get("allowFreeform"))
2117                .and_then(|v| v.as_bool());
2118
2119            let handler_start = Instant::now();
2120            let response = if let Some(user_input_handler) = handlers.user_input.as_ref() {
2121                user_input_handler
2122                    .handle(sid.clone(), question, choices, allow_freeform)
2123                    .await
2124            } else {
2125                None
2126            };
2127            tracing::debug!(
2128                elapsed_ms = handler_start.elapsed().as_millis(),
2129                session_id = %sid,
2130                "UserInputHandler::handle dispatch"
2131            );
2132
2133            let rpc_result = match response {
2134                Some(UserInputResponse {
2135                    answer,
2136                    was_freeform,
2137                }) => serde_json::json!({
2138                    "answer": answer,
2139                    "wasFreeform": was_freeform,
2140                }),
2141                None => serde_json::json!({ "noResponse": true }),
2142            };
2143            let rpc_response = JsonRpcResponse {
2144                jsonrpc: "2.0".to_string(),
2145                id: request.id,
2146                result: Some(rpc_result),
2147                error: None,
2148            };
2149            let _ = client.send_response(&rpc_response).await;
2150        }
2151
2152        "exitPlanMode.request" => {
2153            let params = request
2154                .params
2155                .as_ref()
2156                .cloned()
2157                .unwrap_or(Value::Object(serde_json::Map::new()));
2158            let data: ExitPlanModeData = match serde_json::from_value(params) {
2159                Ok(d) => d,
2160                Err(e) => {
2161                    warn!(error = %e, "failed to deserialize exitPlanMode.request params, using defaults");
2162                    ExitPlanModeData::default()
2163                }
2164            };
2165
2166            let rpc_result = if let Some(exit_plan_handler) = handlers.exit_plan_mode.as_ref() {
2167                let result = exit_plan_handler.handle(sid, data).await;
2168                serde_json::to_value(result).expect("ExitPlanModeResult serialization cannot fail")
2169            } else {
2170                serde_json::json!({ "approved": true })
2171            };
2172            let rpc_response = JsonRpcResponse {
2173                jsonrpc: "2.0".to_string(),
2174                id: request.id,
2175                result: Some(rpc_result),
2176                error: None,
2177            };
2178            let _ = client.send_response(&rpc_response).await;
2179        }
2180
2181        "autoModeSwitch.request" => {
2182            let error_code = request
2183                .params
2184                .as_ref()
2185                .and_then(|p| p.get("errorCode"))
2186                .and_then(|v| v.as_str())
2187                .map(|s| s.to_string());
2188            let retry_after_seconds = request
2189                .params
2190                .as_ref()
2191                .and_then(|p| p.get("retryAfterSeconds"))
2192                .and_then(|v| v.as_f64());
2193
2194            let answer = if let Some(auto_mode_handler) = handlers.auto_mode_switch.as_ref() {
2195                auto_mode_handler
2196                    .handle(sid, error_code, retry_after_seconds)
2197                    .await
2198            } else {
2199                AutoModeSwitchResponse::No
2200            };
2201            let rpc_response = JsonRpcResponse {
2202                jsonrpc: "2.0".to_string(),
2203                id: request.id,
2204                result: Some(serde_json::json!({ "response": answer })),
2205                error: None,
2206            };
2207            let _ = client.send_response(&rpc_response).await;
2208        }
2209
2210        "systemMessage.transform" => {
2211            let params = request.params.as_ref();
2212            let sections: HashMap<String, crate::transforms::TransformSection> =
2213                match params.and_then(|p| p.get("sections")) {
2214                    Some(v) => match serde_json::from_value(v.clone()) {
2215                        Ok(s) => s,
2216                        Err(e) => {
2217                            let _ = send_error_response(
2218                                client,
2219                                request.id,
2220                                error_codes::INVALID_PARAMS,
2221                                &format!("invalid sections: {e}"),
2222                            )
2223                            .await;
2224                            return;
2225                        }
2226                    },
2227                    None => {
2228                        let _ = send_error_response(
2229                            client,
2230                            request.id,
2231                            error_codes::INVALID_PARAMS,
2232                            "missing sections parameter",
2233                        )
2234                        .await;
2235                        return;
2236                    }
2237                };
2238
2239            let rpc_result = if let Some(transforms) = transforms {
2240                let transform_start = Instant::now();
2241                let response =
2242                    crate::transforms::dispatch_transform(transforms, &sid, sections).await;
2243                tracing::debug!(
2244                    elapsed_ms = transform_start.elapsed().as_millis(),
2245                    session_id = %sid,
2246                    "SystemMessageTransform::transform_section dispatch"
2247                );
2248                match serde_json::to_value(response) {
2249                    Ok(v) => v,
2250                    Err(e) => {
2251                        warn!(error = %e, "failed to serialize transform response");
2252                        serde_json::json!({ "sections": {} })
2253                    }
2254                }
2255            } else {
2256                // No transforms registered — pass through all sections unchanged.
2257                let passthrough: HashMap<String, crate::transforms::TransformSection> = sections;
2258                serde_json::json!({ "sections": passthrough })
2259            };
2260
2261            let rpc_response = JsonRpcResponse {
2262                jsonrpc: "2.0".to_string(),
2263                id: request.id,
2264                result: Some(rpc_result),
2265                error: None,
2266            };
2267            let _ = client.send_response(&rpc_response).await;
2268        }
2269
2270        method => {
2271            warn!(
2272                method = method,
2273                "unhandled request method in session event loop"
2274            );
2275            let _ = send_error_response(
2276                client,
2277                request.id,
2278                error_codes::METHOD_NOT_FOUND,
2279                &format!("unknown method: {method}"),
2280            )
2281            .await;
2282        }
2283    }
2284}
2285
2286async fn send_error_response(
2287    client: &Client,
2288    id: u64,
2289    code: i32,
2290    message: &str,
2291) -> Result<(), Error> {
2292    let response = JsonRpcResponse {
2293        jsonrpc: "2.0".to_string(),
2294        id,
2295        result: None,
2296        error: Some(crate::JsonRpcError {
2297            code,
2298            message: message.to_string(),
2299            data: None,
2300        }),
2301    };
2302    client.send_response(&response).await
2303}
2304
2305/// Inject `action: "transform"` sections into a `SystemMessageConfig`,
2306/// forcing `mode: "customize"` (required by the CLI for transforms to fire).
2307/// Preserves any existing caller-provided section overrides.
2308fn apply_transform_sections(
2309    sys_msg: &mut SystemMessageConfig,
2310    transforms: &dyn SystemMessageTransform,
2311) {
2312    sys_msg.mode = Some("customize".to_string());
2313    let sections = sys_msg.sections.get_or_insert_with(HashMap::new);
2314    for id in transforms.section_ids() {
2315        sections.entry(id).or_insert_with(|| SectionOverride {
2316            action: Some("transform".to_string()),
2317            content: None,
2318        });
2319    }
2320}
2321
2322fn inject_transform_sections(config: &mut SessionConfig, transforms: &dyn SystemMessageTransform) {
2323    let sys_msg = config.system_message.get_or_insert_with(Default::default);
2324    apply_transform_sections(sys_msg, transforms);
2325}
2326
2327fn inject_transform_sections_resume(
2328    config: &mut ResumeSessionConfig,
2329    transforms: &dyn SystemMessageTransform,
2330) {
2331    let sys_msg = config.system_message.get_or_insert_with(Default::default);
2332    apply_transform_sections(sys_msg, transforms);
2333}
2334
2335#[cfg(test)]
2336mod tests {
2337    use serde_json::json;
2338
2339    use super::notification_permission_payload;
2340    use crate::handler::PermissionResult;
2341
2342    #[test]
2343    fn notification_payload_suppresses_no_result() {
2344        assert!(notification_permission_payload(&PermissionResult::NoResult).is_none());
2345    }
2346
2347    #[test]
2348    fn notification_payload_serializes_decisions() {
2349        assert_eq!(
2350            notification_permission_payload(&PermissionResult::approve_once()),
2351            Some(json!({ "kind": "approve-once" }))
2352        );
2353        assert_eq!(
2354            notification_permission_payload(&PermissionResult::reject(None)),
2355            Some(json!({ "kind": "reject" }))
2356        );
2357        assert_eq!(
2358            notification_permission_payload(&PermissionResult::reject(Some("bad".to_string()))),
2359            Some(json!({ "kind": "reject", "feedback": "bad" }))
2360        );
2361        assert_eq!(
2362            notification_permission_payload(&PermissionResult::user_not_available()),
2363            Some(json!({ "kind": "user-not-available" }))
2364        );
2365    }
2366}