Skip to main content

github_copilot_sdk/
session.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use parking_lot::Mutex as ParkingLotMutex;
7use serde_json::Value;
8use tokio::sync::oneshot;
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tracing::{Instrument, warn};
12
13use crate::generated::api_types::{
14    LogRequest, ModelSwitchToRequest, PermissionDecision, PermissionDecisionApproveOnce,
15    PermissionDecisionApproveOnceKind, PermissionDecisionReject, PermissionDecisionRejectKind,
16};
17use crate::generated::session_events::{
18    CommandExecuteData, ElicitationRequestedData, ExternalToolRequestedData, SessionErrorData,
19    SessionEventType,
20};
21use crate::handler::{
22    AutoModeSwitchResponse, HandlerEvent, HandlerResponse, PermissionResult, SessionHandler,
23    UserInputResponse,
24};
25use crate::hooks::SessionHooks;
26use crate::session_fs::SessionFsProvider;
27use crate::trace_context::inject_trace_context;
28use crate::transforms::SystemMessageTransform;
29use crate::types::{
30    CommandContext, CommandDefinition, CommandHandler, CreateSessionResult, ElicitationRequest,
31    ElicitationResult, ExitPlanModeData, GetMessagesResponse, InputOptions, MessageOptions,
32    PermissionRequestData, RequestId, ResumeSessionConfig, SectionOverride, SessionCapabilities,
33    SessionConfig, SessionEvent, SessionId, SetModelOptions, SystemMessageConfig, ToolInvocation,
34    ToolResult, ToolResultExpanded, ToolResultResponse, TraceContext,
35    ensure_attachment_display_names,
36};
37use crate::{Client, Error, JsonRpcResponse, SessionError, SessionEventNotification, error_codes};
38
39/// Shared state between a [`Session`] and its event loop, used by [`Session::send_and_wait`].
40struct IdleWaiter {
41    tx: oneshot::Sender<Result<Option<SessionEvent>, Error>>,
42    last_assistant_message: Option<SessionEvent>,
43    started_at: Instant,
44    first_assistant_message_seen: bool,
45}
46
47/// RAII guard that clears the [`Session::idle_waiter`] slot on drop. Used
48/// by [`Session::send_and_wait`] to ensure the slot doesn't leak if the
49/// caller's future is cancelled (outer `tokio::time::timeout` / `select!`
50/// / dropped JoinHandle). Synchronous clear via `parking_lot::Mutex` —
51/// no async drop needed.
52///
53/// Without this, an outer cancellation between "install waiter" and
54/// "drain channel" would leave the slot occupied, causing all subsequent
55/// `send` and `send_and_wait` calls on the session to return
56/// [`SendWhileWaiting`](SessionError::SendWhileWaiting). Closes RFD-400
57/// review finding #2.
58struct WaiterGuard {
59    slot: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
60}
61
62impl Drop for WaiterGuard {
63    fn drop(&mut self) {
64        self.slot.lock().take();
65    }
66}
67
68struct PendingSessionRegistration {
69    client: Client,
70    session_id: SessionId,
71    shutdown: CancellationToken,
72    disarmed: bool,
73}
74
75impl PendingSessionRegistration {
76    fn new(client: Client, session_id: SessionId, shutdown: CancellationToken) -> Self {
77        Self {
78            client,
79            session_id,
80            shutdown,
81            disarmed: false,
82        }
83    }
84
85    async fn cleanup(mut self, event_loop: JoinHandle<()>) {
86        self.shutdown.cancel();
87        let _ = event_loop.await;
88        self.client.unregister_session(&self.session_id);
89        self.disarmed = true;
90    }
91
92    fn disarm(&mut self) {
93        self.disarmed = true;
94    }
95}
96
97impl Drop for PendingSessionRegistration {
98    fn drop(&mut self) {
99        if !self.disarmed {
100            self.shutdown.cancel();
101            self.client.unregister_session(&self.session_id);
102        }
103    }
104}
105
106/// A session on a GitHub Copilot CLI server.
107///
108/// Created via [`Client::create_session`] or [`Client::resume_session`].
109/// Owns an internal event loop that dispatches events to the [`SessionHandler`].
110///
111/// Protocol methods (`send`, `get_messages`, `abort`, etc.) automatically
112/// inject the session ID into RPC params.
113///
114/// Call [`destroy`](Self::destroy) for graceful cleanup (RPC + local). If dropped
115/// without calling `destroy`, the `Drop` impl aborts the event loop and
116/// unregisters from the router as a best-effort safety net.
117pub struct Session {
118    id: SessionId,
119    cwd: PathBuf,
120    workspace_path: Option<PathBuf>,
121    remote_url: Option<String>,
122    client: Client,
123    /// Handle to the spawned event-loop task. Sync `parking_lot::Mutex`
124    /// because the lock is never held across an `.await` and the `Drop`
125    /// impl needs to take the handle synchronously without `try_lock`
126    /// fallibility.
127    event_loop: ParkingLotMutex<Option<JoinHandle<()>>>,
128    /// Cooperative shutdown signal for the event loop. The loop selects
129    /// on [`shutdown.cancelled()`](CancellationToken::cancelled) alongside
130    /// its inbound channels; [`Session::stop_event_loop`] and [`Drop`]
131    /// both call [`cancel()`](CancellationToken::cancel) to ask the loop
132    /// to exit between iterations rather than aborting the task (which
133    /// can land at any await point and leave the session mid-protocol).
134    /// See RFD-400 review finding #3.
135    ///
136    /// `CancellationToken` is the canonical signalling primitive in
137    /// `tokio_util`; it is what `tonic` uses for the equivalent task-
138    /// coordination case. Advanced consumers can obtain a child token
139    /// via [`Session::cancellation_token`] to bind their own work to
140    /// the session lifetime.
141    shutdown: CancellationToken,
142    /// Only populated while a `send_and_wait` call is in flight.
143    ///
144    /// Sync `parking_lot::Mutex` because the lock is never held across an
145    /// `.await`, and synchronous access lets the `WaiterGuard` RAII helper
146    /// in `send_and_wait` clear the slot from a `Drop` impl on caller-side
147    /// cancellation. See RFD-400 review (cancel-safety hardening).
148    idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
149    /// Capabilities negotiated with the CLI, updated on `capabilities.changed` events.
150    capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
151    /// Broadcast channel for runtime event subscribers — see [`Session::subscribe`].
152    event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
153}
154
155impl Session {
156    /// Session ID assigned by the CLI.
157    pub fn id(&self) -> &SessionId {
158        &self.id
159    }
160
161    /// Working directory of the CLI process.
162    pub fn cwd(&self) -> &PathBuf {
163        &self.cwd
164    }
165
166    /// Workspace directory for the session (if using infinite sessions).
167    pub fn workspace_path(&self) -> Option<&Path> {
168        self.workspace_path.as_deref()
169    }
170
171    /// Remote session URL, if the session is running remotely.
172    pub fn remote_url(&self) -> Option<&str> {
173        self.remote_url.as_deref()
174    }
175
176    /// Session capabilities negotiated with the CLI.
177    ///
178    /// Capabilities are set during session creation and updated at runtime
179    /// via `capabilities.changed` events.
180    pub fn capabilities(&self) -> SessionCapabilities {
181        self.capabilities.read().clone()
182    }
183
184    /// Returns a [`CancellationToken`] that fires when this session shuts
185    /// down (via [`Session::stop_event_loop`], [`Session::destroy`], or
186    /// [`Drop`]).
187    ///
188    /// Use this to bind an external task's lifetime to the session — when
189    /// the session shuts down, awaiting [`cancelled()`](CancellationToken::cancelled)
190    /// resolves so cooperative consumers can stop cleanly.
191    ///
192    /// The returned handle is a *child* token: calling
193    /// [`cancel()`](CancellationToken::cancel) on it cancels only the
194    /// caller's child, not the session itself. To cancel the session, call
195    /// [`Session::stop_event_loop`].
196    ///
197    /// # Example
198    ///
199    /// ```no_run
200    /// # async fn example(session: github_copilot_sdk::session::Session) {
201    /// let token = session.cancellation_token();
202    /// tokio::select! {
203    ///     _ = token.cancelled() => println!("session shut down"),
204    ///     _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
205    ///         println!("60s elapsed, session still alive");
206    ///     }
207    /// }
208    /// # }
209    /// ```
210    pub fn cancellation_token(&self) -> CancellationToken {
211        self.shutdown.child_token()
212    }
213
214    /// Subscribe to events for this session.
215    ///
216    /// Returns an [`EventSubscription`](crate::subscription::EventSubscription)
217    /// that yields every [`SessionEvent`] dispatched on this session's
218    /// event loop. Drop the value to unsubscribe; there is no separate
219    /// cancel handle.
220    ///
221    /// **Observe-only.** Subscribers receive a clone of every
222    /// [`SessionEvent`] but cannot influence permission decisions, tool
223    /// results, or anything else that requires returning a
224    /// [`HandlerResponse`]. Those remain
225    /// the responsibility of the [`SessionHandler`] passed via
226    /// [`SessionConfig::handler`](crate::types::SessionConfig::handler).
227    ///
228    /// The returned handle implements both an inherent
229    /// [`recv`](crate::subscription::EventSubscription::recv) method and
230    /// [`Stream`](tokio_stream::Stream), so callers can use a `while let`
231    /// loop or any combinator from `tokio_stream::StreamExt` /
232    /// `futures::StreamExt`.
233    ///
234    /// Each subscriber maintains its own queue. If a consumer cannot keep
235    /// up, the oldest events are dropped and `recv` returns
236    /// [`RecvError::Lagged`](crate::subscription::RecvError::Lagged)
237    /// reporting the count of skipped events. Slow consumers do not block
238    /// the session's event loop.
239    ///
240    /// # Example
241    ///
242    /// ```no_run
243    /// # async fn example(session: github_copilot_sdk::session::Session) {
244    /// let mut events = session.subscribe();
245    /// tokio::spawn(async move {
246    ///     while let Ok(event) = events.recv().await {
247    ///         println!("[{}] event {}", event.id, event.event_type);
248    ///     }
249    /// });
250    /// # }
251    /// ```
252    pub fn subscribe(&self) -> crate::subscription::EventSubscription {
253        crate::subscription::EventSubscription::new(self.event_tx.subscribe())
254    }
255
256    /// The underlying Client (for advanced use cases).
257    pub fn client(&self) -> &Client {
258        &self.client
259    }
260
261    /// Typed RPC namespace for this session.
262    ///
263    /// Every protocol method lives here under its schema-aligned path —
264    /// e.g. `session.rpc().workspaces().list_files()`. Wire method names
265    /// and request/response types are generated from the protocol schema,
266    /// so the typed namespace can't drift from the wire contract.
267    ///
268    /// The hand-authored helpers on [`Session`] delegate to this namespace
269    /// and remain the recommended entry point for everyday use; reach for
270    /// `rpc()` when you want a method without a hand-written wrapper.
271    pub fn rpc(&self) -> crate::generated::rpc::SessionRpc<'_> {
272        crate::generated::rpc::SessionRpc { session: self }
273    }
274
275    /// Stop the internal event loop. Called automatically on [`destroy`](Self::destroy).
276    ///
277    /// Cooperative: signals shutdown via the session's [`CancellationToken`]
278    /// and awaits the loop's natural exit rather than aborting the task.
279    /// Any in-flight handler (permission callback, tool call, elicitation
280    /// response) completes before the loop exits, so the CLI never sees a
281    /// half-handled request. See RFD-400 review finding #3.
282    pub async fn stop_event_loop(&self) {
283        self.shutdown.cancel();
284        let handle = self.event_loop.lock().take();
285        if let Some(handle) = handle {
286            let _ = handle.await;
287        }
288        // Fail any pending send_and_wait so it returns immediately.
289        if let Some(waiter) = self.idle_waiter.lock().take() {
290            let _ = waiter
291                .tx
292                .send(Err(Error::Session(SessionError::EventLoopClosed)));
293        }
294    }
295
296    /// Send a user message to the agent.
297    ///
298    /// Accepts anything convertible to [`MessageOptions`] — pass a `&str` for the
299    /// trivial case, or build a `MessageOptions` for mode/attachments. The
300    /// `wait_timeout` field on `MessageOptions` is ignored here (use
301    /// [`send_and_wait`](Self::send_and_wait) if you need to wait).
302    ///
303    /// Returns the assigned message ID, which can be used to correlate the
304    /// send with later [`SessionEvent`]s emitted in
305    /// response (assistant messages, tool requests, etc.).
306    ///
307    /// Returns an error if a [`send_and_wait`](Self::send_and_wait) call is
308    /// currently in flight, since the plain send would race with the waiter.
309    ///
310    /// # Cancel safety
311    ///
312    /// **Cancel-safe.** The underlying `session.send` RPC is dispatched
313    /// through the writer-actor (see [`Client::call`](crate::Client::call)),
314    /// so dropping this future after the actor has committed to writing
315    /// will not produce a partial frame on the wire. If the caller's
316    /// future is dropped between "frame enqueued" and "response received",
317    /// the message has already landed on the wire — the agent will process
318    /// it and emit events normally; the caller just won't see the returned
319    /// message ID.
320    pub async fn send(&self, opts: impl Into<MessageOptions>) -> Result<String, Error> {
321        if self.idle_waiter.lock().is_some() {
322            return Err(Error::Session(SessionError::SendWhileWaiting));
323        }
324        self.send_inner(opts.into()).await
325    }
326
327    async fn send_inner(&self, opts: MessageOptions) -> Result<String, Error> {
328        let mut params = serde_json::json!({
329            "sessionId": self.id,
330            "prompt": opts.prompt,
331        });
332        if let Some(m) = opts.mode {
333            params["mode"] = serde_json::to_value(m)?;
334        }
335        if let Some(mut a) = opts.attachments {
336            ensure_attachment_display_names(&mut a);
337            params["attachments"] = serde_json::to_value(a)?;
338        }
339        if let Some(headers) = opts.request_headers
340            && !headers.is_empty()
341        {
342            params["requestHeaders"] = serde_json::to_value(headers)?;
343        }
344        let trace_ctx = if opts.traceparent.is_some() || opts.tracestate.is_some() {
345            TraceContext {
346                traceparent: opts.traceparent,
347                tracestate: opts.tracestate,
348            }
349        } else {
350            self.client.resolve_trace_context().await
351        };
352        inject_trace_context(&mut params, &trace_ctx);
353        let rpc_start = Instant::now();
354        let result = self.client.call("session.send", Some(params)).await?;
355        let message_id = result
356            .get("messageId")
357            .and_then(|v| v.as_str())
358            .map(|s| s.to_string())
359            .unwrap_or_default();
360        tracing::debug!(
361            elapsed_ms = rpc_start.elapsed().as_millis(),
362            session_id = %self.id,
363            message_id = %message_id,
364            "Session::send completed successfully"
365        );
366        Ok(message_id)
367    }
368
369    /// Send a user message and wait for the agent to finish processing.
370    ///
371    /// Accepts anything convertible to [`MessageOptions`] — pass a `&str` for the
372    /// trivial case, or build a `MessageOptions` for mode/attachments/timeout.
373    /// Blocks until `session.idle` (success) or `session.error` (failure),
374    /// returning the last `assistant.message` event captured during streaming.
375    /// Times out after `MessageOptions::wait_timeout` (default 60 seconds).
376    ///
377    /// Only one `send_and_wait` call may be active per session at a time.
378    /// Calling [`send`](Self::send) while a `send_and_wait`
379    /// is in flight will also return an error.
380    ///
381    /// # Cancel safety
382    ///
383    /// **Cancel-safe.** A `WaiterGuard` clears the in-flight slot on every
384    /// exit path (success, internal failure, internal timeout, *and*
385    /// external cancellation via `tokio::time::timeout` / `select!` /
386    /// dropped JoinHandle). Subsequent `send` and `send_and_wait` calls on
387    /// this session will succeed normally — the slot is never leaked.
388    pub async fn send_and_wait(
389        &self,
390        opts: impl Into<MessageOptions>,
391    ) -> Result<Option<SessionEvent>, Error> {
392        let total_start = Instant::now();
393        let opts = opts.into();
394        let timeout_duration = opts.wait_timeout.unwrap_or(Duration::from_secs(60));
395        let (tx, rx) = oneshot::channel();
396
397        {
398            let mut guard = self.idle_waiter.lock();
399            if guard.is_some() {
400                return Err(Error::Session(SessionError::SendWhileWaiting));
401            }
402            *guard = Some(IdleWaiter {
403                tx,
404                last_assistant_message: None,
405                started_at: total_start,
406                first_assistant_message_seen: false,
407            });
408        }
409
410        // RAII: clears the idle_waiter slot on every exit path, including
411        // external cancellation (caller's outer `select!` / `timeout` /
412        // dropped future). Without this, an outer cancellation would leak
413        // the slot and brick subsequent `send`/`send_and_wait` calls.
414        let _waiter_guard = WaiterGuard {
415            slot: self.idle_waiter.clone(),
416        };
417
418        let result = tokio::time::timeout(timeout_duration, async {
419            self.send_inner(opts).await?;
420            match rx.await {
421                Ok(result) => result,
422                Err(_) => Err(Error::Session(SessionError::EventLoopClosed)),
423            }
424        })
425        .await;
426
427        match result {
428            Ok(inner) => {
429                tracing::debug!(
430                    elapsed_ms = total_start.elapsed().as_millis(),
431                    session_id = %self.id,
432                    completed_by = if inner.is_ok() { "idle" } else { "error" },
433                    "Session::send_and_wait complete"
434                );
435                inner
436            }
437            Err(_) => {
438                tracing::warn!(
439                    elapsed_ms = total_start.elapsed().as_millis(),
440                    session_id = %self.id,
441                    completed_by = "timeout",
442                    "Session::send_and_wait failed"
443                );
444                Err(Error::Session(SessionError::Timeout(timeout_duration)))
445            }
446        }
447    }
448
449    /// Retrieve the session's message history.
450    pub async fn get_messages(&self) -> Result<Vec<SessionEvent>, Error> {
451        let result = self
452            .client
453            .call(
454                "session.getMessages",
455                Some(serde_json::json!({ "sessionId": self.id })),
456            )
457            .await?;
458        let response: GetMessagesResponse = serde_json::from_value(result)?;
459        Ok(response.events)
460    }
461
462    /// Abort the current agent turn.
463    ///
464    /// # Cancel safety
465    ///
466    /// **Cancel-safe.** Single `session.abort` RPC; the underlying
467    /// [`Client::call`](crate::Client::call) is cancel-safe via the
468    /// writer-actor.
469    pub async fn abort(&self) -> Result<(), Error> {
470        self.client
471            .call(
472                "session.abort",
473                Some(serde_json::json!({ "sessionId": self.id })),
474            )
475            .await?;
476        Ok(())
477    }
478
479    /// Switch to a different model.
480    ///
481    /// Pass `None` for `opts` if no extra configuration is needed.
482    pub async fn set_model(&self, model: &str, opts: Option<SetModelOptions>) -> Result<(), Error> {
483        let opts = opts.unwrap_or_default();
484        let request = ModelSwitchToRequest {
485            model_id: model.to_string(),
486            reasoning_effort: opts.reasoning_effort,
487            reasoning_summary: None,
488            model_capabilities: opts.model_capabilities,
489        };
490        self.rpc().model().switch_to(request).await?;
491        Ok(())
492    }
493
494    /// Disconnect this session from the CLI.
495    ///
496    /// Sends the `session.destroy` RPC, stops the event loop, and unregisters
497    /// the session from the client. **Session state on disk** (conversation
498    /// history, planning state, artifacts) is **preserved**, so the
499    /// conversation can be resumed later via [`Client::resume_session`]
500    /// using this session's ID. To permanently remove all on-disk session
501    /// data, use [`Client::delete_session`] instead.
502    ///
503    /// The caller should ensure the session is idle (e.g. [`send_and_wait`]
504    /// has returned) before disconnecting; in-flight tool or event handlers
505    /// may otherwise observe failures.
506    ///
507    /// [`Client::resume_session`]: crate::Client::resume_session
508    /// [`Client::delete_session`]: crate::Client::delete_session
509    /// [`send_and_wait`]: Self::send_and_wait
510    pub async fn disconnect(&self) -> Result<(), Error> {
511        self.client
512            .call(
513                "session.destroy",
514                Some(serde_json::json!({ "sessionId": self.id })),
515            )
516            .await?;
517        self.stop_event_loop().await;
518        self.client.unregister_session(&self.id);
519        Ok(())
520    }
521
522    /// Alias for [`disconnect`](Self::disconnect).
523    ///
524    /// Named after the `session.destroy` wire RPC. Prefer `disconnect` in
525    /// new code — the wire-level "destroy" is misleading because on-disk
526    /// state is preserved.
527    pub async fn destroy(&self) -> Result<(), Error> {
528        self.disconnect().await
529    }
530
531    /// Write a log message to the session.
532    ///
533    /// Pass `None` for `opts` to use defaults (info level, persisted).
534    pub async fn log(
535        &self,
536        message: &str,
537        opts: Option<crate::types::LogOptions>,
538    ) -> Result<(), Error> {
539        let opts = opts.unwrap_or_default();
540        let level = match opts.level {
541            Some(level) => Some(serde_json::from_value(serde_json::to_value(level)?)?),
542            None => None,
543        };
544        let request = LogRequest {
545            message: message.to_string(),
546            level,
547            ephemeral: opts.ephemeral,
548            r#type: None,
549            tip: None,
550            url: None,
551        };
552        self.rpc().log(request).await?;
553        Ok(())
554    }
555
556    /// Returns the UI sub-API for elicitation, confirmation, selection, and
557    /// free-form input.
558    ///
559    /// All UI methods route through `session.ui.*` RPCs and require host
560    /// support — check `session.capabilities().ui.elicitation` before use.
561    pub fn ui(&self) -> SessionUi<'_> {
562        SessionUi { session: self }
563    }
564
565    /// Returns an error if the host doesn't support elicitation.
566    fn assert_elicitation(&self) -> Result<(), Error> {
567        if self
568            .capabilities
569            .read()
570            .ui
571            .as_ref()
572            .and_then(|u| u.elicitation)
573            != Some(true)
574        {
575            return Err(Error::Session(SessionError::ElicitationNotSupported));
576        }
577        Ok(())
578    }
579}
580
581impl Drop for Session {
582    fn drop(&mut self) {
583        // Cooperative shutdown: cancel the event loop's token to signal
584        // exit between iterations. The loop will see the cancellation on
585        // its next select poll and break cleanly without interrupting an
586        // in-flight handler. We do NOT abort the JoinHandle — that would
587        // land at any await point in the loop body, potentially leaving
588        // the CLI with an unanswered request id. RFD-400 review finding
589        // #3.
590        //
591        // The handle itself is left in `event_loop` to be reaped by the
592        // tokio runtime when it next polls; we intentionally don't await
593        // it here because Drop is sync.
594        self.shutdown.cancel();
595        self.client.unregister_session(&self.id);
596    }
597}
598
599/// UI sub-API for a [`Session`] — elicitation, confirmation, selection,
600/// and free-form input.
601///
602/// Acquired via [`Session::ui`]. Methods route to `session.ui.*` RPCs and
603/// require host elicitation support — check
604/// `session.capabilities().ui.elicitation` before use.
605pub struct SessionUi<'a> {
606    session: &'a Session,
607}
608
609impl<'a> SessionUi<'a> {
610    /// Request user input via an interactive UI form (elicitation).
611    ///
612    /// Sends a JSON Schema describing form fields to the CLI host. The host
613    /// renders a form dialog and returns the user's response.
614    ///
615    /// Prefer the typed convenience methods [`confirm`](Self::confirm),
616    /// [`select`](Self::select), and [`input`](Self::input) for common cases.
617    pub async fn elicitation(
618        &self,
619        message: &str,
620        schema: Value,
621    ) -> Result<ElicitationResult, Error> {
622        self.session.assert_elicitation()?;
623        let result = self
624            .session
625            .client
626            .call(
627                "session.ui.elicitation",
628                Some(serde_json::json!({
629                    "sessionId": self.session.id,
630                    "message": message,
631                    "requestedSchema": schema,
632                })),
633            )
634            .await?;
635        let elicitation: ElicitationResult = serde_json::from_value(result)?;
636        Ok(elicitation)
637    }
638
639    /// Ask the user a yes/no confirmation question.
640    ///
641    /// Returns `true` if the user accepted and confirmed, `false` otherwise.
642    pub async fn confirm(&self, message: &str) -> Result<bool, Error> {
643        self.session.assert_elicitation()?;
644        let schema = serde_json::json!({
645            "type": "object",
646            "properties": {
647                "confirmed": {
648                    "type": "boolean",
649                    "default": true,
650                }
651            },
652            "required": ["confirmed"]
653        });
654        let result = self.elicitation(message, schema).await?;
655        Ok(result.action == "accept"
656            && result
657                .content
658                .and_then(|c| c.get("confirmed").and_then(|v| v.as_bool()))
659                == Some(true))
660    }
661
662    /// Ask the user to select from a list of options.
663    ///
664    /// Returns the selected option string on accept, or `None` on decline/cancel.
665    pub async fn select(&self, message: &str, options: &[&str]) -> Result<Option<String>, Error> {
666        self.session.assert_elicitation()?;
667        let schema = serde_json::json!({
668            "type": "object",
669            "properties": {
670                "selection": {
671                    "type": "string",
672                    "enum": options,
673                }
674            },
675            "required": ["selection"]
676        });
677        let result = self.elicitation(message, schema).await?;
678        if result.action != "accept" {
679            return Ok(None);
680        }
681        let selection = result.content.and_then(|c| {
682            c.get("selection")
683                .and_then(|v| v.as_str())
684                .map(String::from)
685        });
686        Ok(selection)
687    }
688
689    /// Ask the user for free-form text input.
690    ///
691    /// Returns the input string on accept, or `None` on decline/cancel.
692    /// Use [`InputOptions`] to set validation constraints and field metadata.
693    pub async fn input(
694        &self,
695        message: &str,
696        options: Option<&InputOptions<'_>>,
697    ) -> Result<Option<String>, Error> {
698        self.session.assert_elicitation()?;
699        let mut field = serde_json::json!({ "type": "string" });
700        if let Some(opts) = options {
701            if let Some(title) = opts.title {
702                field["title"] = Value::String(title.to_string());
703            }
704            if let Some(desc) = opts.description {
705                field["description"] = Value::String(desc.to_string());
706            }
707            if let Some(min) = opts.min_length {
708                field["minLength"] = Value::Number(min.into());
709            }
710            if let Some(max) = opts.max_length {
711                field["maxLength"] = Value::Number(max.into());
712            }
713            if let Some(fmt) = &opts.format {
714                field["format"] = Value::String(fmt.as_str().to_string());
715            }
716            if let Some(default) = opts.default {
717                field["default"] = Value::String(default.to_string());
718            }
719        }
720        let schema = serde_json::json!({
721            "type": "object",
722            "properties": { "value": field },
723            "required": ["value"]
724        });
725        let result = self.elicitation(message, schema).await?;
726        if result.action != "accept" {
727            return Ok(None);
728        }
729        let value = result
730            .content
731            .and_then(|c| c.get("value").and_then(|v| v.as_str()).map(String::from));
732        Ok(value)
733    }
734}
735
736impl Client {
737    /// Create a new session on the CLI.
738    ///
739    /// Sends `session.create`, registers the session on the router,
740    /// and spawns an internal event loop that dispatches to the handler.
741    ///
742    /// All callbacks (event handler, hooks, transform) are configured
743    /// via [`SessionConfig`] using [`with_handler`](SessionConfig::with_handler),
744    /// [`with_hooks`](SessionConfig::with_hooks), and
745    /// [`with_transform`](SessionConfig::with_transform).
746    ///
747    /// If [`hooks_handler`](SessionConfig::hooks_handler) is set, the
748    /// wire-level `hooks` flag is automatically enabled.
749    ///
750    /// If [`transform`](SessionConfig::transform) is set, the SDK injects
751    /// `action: "transform"` sections into the [`SystemMessageConfig`] wire
752    /// format and handles `systemMessage.transform` RPC callbacks during
753    /// the session.
754    ///
755    /// If [`handler`](SessionConfig::handler) is `None`, the session uses
756    /// [`NoopHandler`](crate::handler::NoopHandler) — permission requests and
757    /// external tool calls are left pending for the consumer to resolve.
758    pub async fn create_session(&self, mut config: SessionConfig) -> Result<Session, Error> {
759        let total_start = Instant::now();
760        let handler = config
761            .handler
762            .take()
763            .unwrap_or_else(|| Arc::new(crate::handler::NoopHandler));
764        let hooks = config.hooks_handler.take();
765        let transforms = config.transform.take();
766        let tools_count = config.tools.as_ref().map_or(0, Vec::len);
767        let commands_count = config.commands.as_ref().map_or(0, Vec::len);
768        let has_hooks = hooks.is_some();
769        let command_handlers = build_command_handler_map(config.commands.as_deref());
770        let session_fs_provider = config.session_fs_provider.take();
771        if self.inner.session_fs_configured && session_fs_provider.is_none() {
772            return Err(Error::Session(SessionError::SessionFsProviderRequired));
773        }
774        if self.inner.session_fs_sqlite_declared
775            && let Some(ref provider) = session_fs_provider
776            && provider.sqlite().is_none()
777        {
778            return Err(Error::InvalidConfig(
779                "SessionFs capabilities declare SQLite support but the provider \
780                 does not implement SessionFsSqliteProvider"
781                    .to_string(),
782            ));
783        }
784
785        if hooks.is_some() && config.hooks.is_none() {
786            config.hooks = Some(true);
787        }
788        if let Some(ref transforms) = transforms {
789            inject_transform_sections(&mut config, transforms.as_ref());
790        }
791        let session_id = config
792            .session_id
793            .clone()
794            .unwrap_or_else(|| SessionId::from(uuid::Uuid::new_v4().to_string()));
795        config.session_id = Some(session_id.clone());
796        let mut params = serde_json::to_value(&config)?;
797        let trace_ctx = self.resolve_trace_context().await;
798        inject_trace_context(&mut params, &trace_ctx);
799
800        let setup_start = Instant::now();
801        let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default()));
802        let channels = self.register_session(&session_id);
803        let idle_waiter = Arc::new(ParkingLotMutex::new(None));
804        let shutdown = CancellationToken::new();
805        let (event_tx, _) = tokio::sync::broadcast::channel(512);
806        let event_loop = spawn_event_loop(
807            session_id.clone(),
808            self.clone(),
809            handler,
810            hooks,
811            transforms,
812            command_handlers,
813            session_fs_provider,
814            channels,
815            idle_waiter.clone(),
816            capabilities.clone(),
817            event_tx.clone(),
818            shutdown.clone(),
819        );
820        let mut registration =
821            PendingSessionRegistration::new(self.clone(), session_id.clone(), shutdown.clone());
822        tracing::debug!(
823            elapsed_ms = setup_start.elapsed().as_millis(),
824            session_id = %session_id,
825            tools_count,
826            commands_count,
827            has_hooks,
828            "Client::create_session local setup complete"
829        );
830
831        let rpc_start = Instant::now();
832        let result = match self.call("session.create", Some(params)).await {
833            Ok(result) => result,
834            Err(error) => {
835                registration.cleanup(event_loop).await;
836                return Err(error);
837            }
838        };
839        tracing::debug!(
840            elapsed_ms = rpc_start.elapsed().as_millis(),
841            "Client::create_session session creation request completed successfully"
842        );
843        let create_result: CreateSessionResult = match serde_json::from_value(result) {
844            Ok(result) => result,
845            Err(error) => {
846                registration.cleanup(event_loop).await;
847                return Err(error.into());
848            }
849        };
850        if create_result.session_id != session_id {
851            registration.cleanup(event_loop).await;
852            return Err(Error::Session(SessionError::SessionIdMismatch {
853                requested: session_id,
854                returned: create_result.session_id,
855            }));
856        }
857        *capabilities.write() = create_result.capabilities.unwrap_or_default();
858
859        tracing::debug!(
860            elapsed_ms = total_start.elapsed().as_millis(),
861            session_id = %session_id,
862            "Client::create_session complete"
863        );
864        registration.disarm();
865        Ok(Session {
866            id: session_id,
867            cwd: self.cwd().clone(),
868            workspace_path: create_result.workspace_path,
869            remote_url: create_result.remote_url,
870            client: self.clone(),
871            event_loop: ParkingLotMutex::new(Some(event_loop)),
872            shutdown,
873            idle_waiter,
874            capabilities,
875            event_tx,
876        })
877    }
878
879    /// Resume an existing session on the CLI.
880    ///
881    /// Sends `session.resume` and `session.skills.reload`, registers the
882    /// session on the router, and spawns the event loop.
883    ///
884    /// All callbacks (event handler, hooks, transform) are configured
885    /// via [`ResumeSessionConfig`] using its `with_*` builder methods.
886    ///
887    /// See [`Self::create_session`] for the defaults applied when callback
888    /// fields are unset.
889    pub async fn resume_session(&self, mut config: ResumeSessionConfig) -> Result<Session, Error> {
890        let total_start = Instant::now();
891        let handler = config
892            .handler
893            .take()
894            .unwrap_or_else(|| Arc::new(crate::handler::NoopHandler));
895        let hooks = config.hooks_handler.take();
896        let transforms = config.transform.take();
897        let tools_count = config.tools.as_ref().map_or(0, Vec::len);
898        let commands_count = config.commands.as_ref().map_or(0, Vec::len);
899        let has_hooks = hooks.is_some();
900        let command_handlers = build_command_handler_map(config.commands.as_deref());
901        let session_fs_provider = config.session_fs_provider.take();
902        if self.inner.session_fs_configured && session_fs_provider.is_none() {
903            return Err(Error::Session(SessionError::SessionFsProviderRequired));
904        }
905        if self.inner.session_fs_sqlite_declared
906            && let Some(ref provider) = session_fs_provider
907            && provider.sqlite().is_none()
908        {
909            return Err(Error::InvalidConfig(
910                "SessionFs capabilities declare SQLite support but the provider \
911                 does not implement SessionFsSqliteProvider"
912                    .to_string(),
913            ));
914        }
915
916        if hooks.is_some() && config.hooks.is_none() {
917            config.hooks = Some(true);
918        }
919        if let Some(ref transforms) = transforms {
920            inject_transform_sections_resume(&mut config, transforms.as_ref());
921        }
922        let session_id = config.session_id.clone();
923        let mut params = serde_json::to_value(&config)?;
924        let trace_ctx = self.resolve_trace_context().await;
925        inject_trace_context(&mut params, &trace_ctx);
926
927        let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default()));
928        let setup_start = Instant::now();
929        let channels = self.register_session(&session_id);
930        let idle_waiter = Arc::new(ParkingLotMutex::new(None));
931        let shutdown = CancellationToken::new();
932        let (event_tx, _) = tokio::sync::broadcast::channel(512);
933        let event_loop = spawn_event_loop(
934            session_id.clone(),
935            self.clone(),
936            handler,
937            hooks,
938            transforms,
939            command_handlers,
940            session_fs_provider,
941            channels,
942            idle_waiter.clone(),
943            capabilities.clone(),
944            event_tx.clone(),
945            shutdown.clone(),
946        );
947        let mut registration =
948            PendingSessionRegistration::new(self.clone(), session_id.clone(), shutdown.clone());
949        tracing::debug!(
950            elapsed_ms = setup_start.elapsed().as_millis(),
951            session_id = %session_id,
952            tools_count,
953            commands_count,
954            has_hooks,
955            "Client::resume_session local setup complete"
956        );
957
958        let rpc_start = Instant::now();
959        let result = match self.call("session.resume", Some(params)).await {
960            Ok(result) => result,
961            Err(error) => {
962                registration.cleanup(event_loop).await;
963                return Err(error);
964            }
965        };
966        tracing::debug!(
967            elapsed_ms = rpc_start.elapsed().as_millis(),
968            session_id = %session_id,
969            "Client::resume_session session resume request completed successfully"
970        );
971
972        // The CLI may reassign the session ID on resume.
973        let cli_session_id: SessionId = result
974            .get("sessionId")
975            .and_then(|v| v.as_str())
976            .unwrap_or(&session_id)
977            .into();
978        if cli_session_id != session_id {
979            registration.cleanup(event_loop).await;
980            return Err(Error::Session(SessionError::SessionIdMismatch {
981                requested: session_id,
982                returned: cli_session_id,
983            }));
984        }
985
986        let resume_capabilities: Option<SessionCapabilities> = result
987            .get("capabilities")
988            .and_then(|v| {
989                serde_json::from_value(v.clone())
990                    .map_err(|e| warn!(error = %e, "failed to deserialize capabilities from resume response"))
991                    .ok()
992            });
993        let remote_url = result
994            .get("remoteUrl")
995            .or_else(|| result.get("remote_url"))
996            .and_then(|value| value.as_str())
997            .map(ToString::to_string);
998
999        // Reload skills after resume (best-effort).
1000        let skills_reload_start = Instant::now();
1001        if let Err(e) = self
1002            .call(
1003                "session.skills.reload",
1004                Some(serde_json::json!({ "sessionId": session_id })),
1005            )
1006            .await
1007        {
1008            warn!(
1009                elapsed_ms = skills_reload_start.elapsed().as_millis(),
1010                session_id = %session_id,
1011                error = %e,
1012                "Client::resume_session skills reload request failed"
1013            );
1014        } else {
1015            tracing::debug!(
1016                elapsed_ms = skills_reload_start.elapsed().as_millis(),
1017                session_id = %session_id,
1018                "Client::resume_session skills reload request completed successfully"
1019            );
1020        }
1021
1022        *capabilities.write() = resume_capabilities.unwrap_or_default();
1023
1024        tracing::debug!(
1025            elapsed_ms = total_start.elapsed().as_millis(),
1026            session_id = %session_id,
1027            "Client::resume_session complete"
1028        );
1029        registration.disarm();
1030        Ok(Session {
1031            id: session_id,
1032            cwd: self.cwd().clone(),
1033            workspace_path: None,
1034            remote_url,
1035            client: self.clone(),
1036            event_loop: ParkingLotMutex::new(Some(event_loop)),
1037            shutdown,
1038            idle_waiter,
1039            capabilities,
1040            event_tx,
1041        })
1042    }
1043}
1044
1045type CommandHandlerMap = HashMap<String, Arc<dyn CommandHandler>>;
1046
1047fn build_command_handler_map(commands: Option<&[CommandDefinition]>) -> Arc<CommandHandlerMap> {
1048    let map = match commands {
1049        Some(commands) => commands
1050            .iter()
1051            .filter(|cmd| !cmd.name.is_empty())
1052            .map(|cmd| (cmd.name.clone(), cmd.handler.clone()))
1053            .collect(),
1054        None => HashMap::new(),
1055    };
1056    Arc::new(map)
1057}
1058
1059#[allow(clippy::too_many_arguments)]
1060fn spawn_event_loop(
1061    session_id: SessionId,
1062    client: Client,
1063    handler: Arc<dyn SessionHandler>,
1064    hooks: Option<Arc<dyn SessionHooks>>,
1065    transforms: Option<Arc<dyn SystemMessageTransform>>,
1066    command_handlers: Arc<CommandHandlerMap>,
1067    session_fs_provider: Option<Arc<dyn SessionFsProvider>>,
1068    channels: crate::router::SessionChannels,
1069    idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
1070    capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
1071    event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
1072    shutdown: CancellationToken,
1073) -> JoinHandle<()> {
1074    let crate::router::SessionChannels {
1075        mut notifications,
1076        mut requests,
1077    } = channels;
1078
1079    let span = tracing::error_span!("session_event_loop", session_id = %session_id);
1080    tokio::spawn(
1081        async move {
1082            loop {
1083                // `mpsc::UnboundedReceiver::recv` and
1084                // `CancellationToken::cancelled` are both cancel-safe per
1085                // RFD 400. The selected branch's `await`'d handler is
1086                // *not* mid-cancelled by the select — once a branch fires
1087                // it runs to completion within the loop's iteration.
1088                // Spawned child tasks inside `handle_notification`
1089                // (permission/tool/elicitation callbacks) intentionally
1090                // outlive the parent loop and own their own cleanup;
1091                // this is RFD 400's "spawn background tasks to perform
1092                // cancel-unsafe operations" pattern and is correct as-is.
1093                tokio::select! {
1094                    _ = shutdown.cancelled() => break,
1095                    Some(notification) = notifications.recv() => {
1096                        handle_notification(
1097                            &session_id, &client, &handler, &command_handlers, notification, &idle_waiter, &capabilities, &event_tx,
1098                        ).await;
1099                    }
1100                    Some(request) = requests.recv() => {
1101                        handle_request(
1102                            &session_id, &client, &handler, hooks.as_deref(), transforms.as_deref(), session_fs_provider.as_ref(), request,
1103                        ).await;
1104                    }
1105                    else => break,
1106                }
1107            }
1108            // Channels closed or shutdown signaled — fail any pending
1109            // send_and_wait so the caller observes a clean error.
1110            if let Some(waiter) = idle_waiter.lock().take() {
1111                let _ = waiter
1112                    .tx
1113                    .send(Err(Error::Session(SessionError::EventLoopClosed)));
1114            }
1115        }
1116        .instrument(span),
1117    )
1118}
1119
1120fn extract_request_id(data: &Value) -> Option<RequestId> {
1121    data.get("requestId")
1122        .and_then(|v| v.as_str())
1123        .filter(|s| !s.is_empty())
1124        .map(RequestId::new)
1125}
1126
1127fn pending_permission_result_kind(response: &HandlerResponse) -> &'static str {
1128    match response {
1129        HandlerResponse::Permission(PermissionResult::Approved) => "approve-once",
1130        HandlerResponse::Permission(PermissionResult::Denied) => "reject",
1131        // Fallback to "user-not-available" for UserNotAvailable, Deferred (when
1132        // forced through this path), Custom (handled separately upstream), and
1133        // any non-permission/no-result HandlerResponse that gets here defensively.
1134        _ => "user-not-available",
1135    }
1136}
1137
1138fn permission_request_response(response: &HandlerResponse) -> PermissionDecision {
1139    match response {
1140        HandlerResponse::Permission(PermissionResult::Approved) => {
1141            PermissionDecision::ApproveOnce(PermissionDecisionApproveOnce {
1142                kind: PermissionDecisionApproveOnceKind::ApproveOnce,
1143            })
1144        }
1145        _ => PermissionDecision::Reject(PermissionDecisionReject {
1146            kind: PermissionDecisionRejectKind::Reject,
1147            feedback: None,
1148        }),
1149    }
1150}
1151
1152/// Map a handler response into the `result` payload for the notification
1153/// path (`session.permissions.handlePendingPermissionRequest`).
1154///
1155/// Returns `None` when the SDK must not respond.
1156fn notification_permission_payload(response: &HandlerResponse) -> Option<Value> {
1157    match response {
1158        HandlerResponse::Permission(PermissionResult::Deferred | PermissionResult::NoResult) => {
1159            None
1160        }
1161        HandlerResponse::Permission(PermissionResult::Custom(value)) => Some(value.clone()),
1162        _ => Some(serde_json::json!({
1163            "kind": pending_permission_result_kind(response),
1164        })),
1165    }
1166}
1167
1168/// Map a handler response into the JSON-RPC `result` payload for the
1169/// direct-RPC path (`permission.request`).
1170///
1171/// Always returns a value. [`PermissionResult::Deferred`] is treated as
1172/// [`PermissionResult::Approved`] here because the JSON-RPC contract
1173/// requires a reply — see the variant's doc comment.
1174fn direct_permission_payload(response: &HandlerResponse) -> Value {
1175    match response {
1176        HandlerResponse::Permission(PermissionResult::Custom(value)) => value.clone(),
1177        HandlerResponse::Permission(PermissionResult::Deferred) => serde_json::to_value(
1178            permission_request_response(&HandlerResponse::Permission(PermissionResult::Approved)),
1179        )
1180        .expect("serializing direct permission response should succeed"),
1181        HandlerResponse::Permission(
1182            PermissionResult::NoResult | PermissionResult::UserNotAvailable,
1183        ) => serde_json::json!({
1184            "kind": pending_permission_result_kind(response),
1185        }),
1186        _ => serde_json::to_value(permission_request_response(response))
1187            .expect("serializing direct permission response should succeed"),
1188    }
1189}
1190
1191fn tool_failure_result(message: impl Into<String>) -> ToolResult {
1192    let message = message.into();
1193    ToolResult::Expanded(ToolResultExpanded {
1194        text_result_for_llm: message.clone(),
1195        result_type: "failure".to_string(),
1196        binary_results_for_llm: None,
1197        session_log: None,
1198        error: Some(message),
1199        tool_telemetry: None,
1200    })
1201}
1202
1203fn notification_tool_payload(response: HandlerResponse) -> Option<Value> {
1204    match response {
1205        HandlerResponse::ToolResult(result) => {
1206            Some(serde_json::to_value(result).unwrap_or(Value::Null))
1207        }
1208        HandlerResponse::NoResult => None,
1209        _ => Some(
1210            serde_json::to_value(tool_failure_result("Unexpected handler response"))
1211                .unwrap_or(Value::Null),
1212        ),
1213    }
1214}
1215
1216fn direct_tool_result(response: HandlerResponse) -> ToolResult {
1217    match response {
1218        HandlerResponse::ToolResult(result) => result,
1219        HandlerResponse::NoResult => tool_failure_result("No tool handler available"),
1220        _ => tool_failure_result("Unexpected handler response"),
1221    }
1222}
1223
1224/// Process a notification from the CLI's broadcast channel.
1225#[allow(clippy::too_many_arguments)]
1226async fn handle_notification(
1227    session_id: &SessionId,
1228    client: &Client,
1229    handler: &Arc<dyn SessionHandler>,
1230    command_handlers: &Arc<CommandHandlerMap>,
1231    notification: SessionEventNotification,
1232    idle_waiter: &Arc<ParkingLotMutex<Option<IdleWaiter>>>,
1233    capabilities: &Arc<parking_lot::RwLock<SessionCapabilities>>,
1234    event_tx: &tokio::sync::broadcast::Sender<SessionEvent>,
1235) {
1236    let dispatch_start = Instant::now();
1237    let event = notification.event.clone();
1238    let event_type = event.parsed_type();
1239    if event_type == SessionEventType::PermissionRequested {
1240        tracing::debug!(
1241            session_id = %session_id,
1242            event_type = %event.event_type,
1243            "Session::handle_notification permission request received"
1244        );
1245    }
1246
1247    // Signal send_and_wait if active. The lock is only contended when
1248    // a send_and_wait call is in flight (idle_waiter is Some).
1249    match event_type {
1250        SessionEventType::AssistantMessage
1251        | SessionEventType::SessionIdle
1252        | SessionEventType::SessionError => {
1253            let mut guard = idle_waiter.lock();
1254            if let Some(waiter) = guard.as_mut() {
1255                match event_type {
1256                    SessionEventType::AssistantMessage => {
1257                        if !waiter.first_assistant_message_seen {
1258                            waiter.first_assistant_message_seen = true;
1259                            tracing::debug!(
1260                                elapsed_ms = waiter.started_at.elapsed().as_millis(),
1261                                session_id = %session_id,
1262                                "Session::send_and_wait first assistant message"
1263                            );
1264                        }
1265                        waiter.last_assistant_message = Some(event.clone());
1266                    }
1267                    SessionEventType::SessionIdle | SessionEventType::SessionError => {
1268                        if let Some(waiter) = guard.take() {
1269                            if event_type == SessionEventType::SessionIdle {
1270                                tracing::debug!(
1271                                    elapsed_ms = waiter.started_at.elapsed().as_millis(),
1272                                    session_id = %session_id,
1273                                    "Session::send_and_wait idle received"
1274                                );
1275                                let _ = waiter.tx.send(Ok(waiter.last_assistant_message));
1276                            } else {
1277                                let error_msg = event
1278                                    .typed_data::<SessionErrorData>()
1279                                    .map(|d| d.message)
1280                                    .or_else(|| {
1281                                        event
1282                                            .data
1283                                            .get("message")
1284                                            .and_then(|v| v.as_str())
1285                                            .map(|s| s.to_string())
1286                                    })
1287                                    .unwrap_or_else(|| "session error".to_string());
1288                                let _ = waiter
1289                                    .tx
1290                                    .send(Err(Error::Session(SessionError::AgentError(error_msg))));
1291                            }
1292                        }
1293                    }
1294                    _ => {}
1295                }
1296            }
1297        }
1298        _ => {}
1299    }
1300
1301    // Fan out the event to runtime subscribers (`Session::subscribe`). `send`
1302    // only errors when there are no receivers, which is the normal case
1303    // before any consumer subscribes.
1304    let _ = event_tx.send(event.clone());
1305
1306    // Fire-and-forget dispatch for the general event.
1307    handler
1308        .on_event(HandlerEvent::SessionEvent {
1309            session_id: session_id.clone(),
1310            event,
1311        })
1312        .await;
1313
1314    // Update capabilities when the CLI reports changes. The CLI sends
1315    // the full updated capabilities object — replace wholesale so removals
1316    // and new subfields are handled correctly.
1317    if event_type == SessionEventType::CapabilitiesChanged {
1318        match serde_json::from_value::<SessionCapabilities>(notification.event.data.clone()) {
1319            Ok(changed) => *capabilities.write() = changed,
1320            Err(e) => warn!(error = %e, "failed to deserialize capabilities.changed payload"),
1321        }
1322    }
1323
1324    tracing::debug!(
1325        elapsed_ms = dispatch_start.elapsed().as_millis(),
1326        session_id = %session_id,
1327        event_type = %notification.event.event_type,
1328        "Session::handle_notification dispatch"
1329    );
1330
1331    // Notification-based permission/tool/elicitation requests require a
1332    // separate RPC callback. Spawn concurrently since the CLI doesn't block.
1333    match event_type {
1334        SessionEventType::PermissionRequested => {
1335            let Some(request_id) = extract_request_id(&notification.event.data) else {
1336                return;
1337            };
1338            let client = client.clone();
1339            let handler = handler.clone();
1340            let sid = session_id.clone();
1341            let data: PermissionRequestData =
1342                serde_json::from_value(notification.event.data.clone()).unwrap_or_else(|_| {
1343                    PermissionRequestData {
1344                        kind: None,
1345                        tool_call_id: None,
1346                        extra: notification.event.data.clone(),
1347                    }
1348                });
1349            let span = tracing::error_span!(
1350                "permission_request_handler",
1351                session_id = %sid,
1352                request_id = %request_id
1353            );
1354            tokio::spawn(
1355                async move {
1356                    let handler_start = Instant::now();
1357                    let response = handler
1358                        .on_event(HandlerEvent::PermissionRequest {
1359                            session_id: sid.clone(),
1360                            request_id: request_id.clone(),
1361                            data,
1362                        })
1363                        .await;
1364                    tracing::debug!(
1365                        elapsed_ms = handler_start.elapsed().as_millis(),
1366                        session_id = %sid,
1367                        request_id = %request_id,
1368                        "SessionHandler::on_permission_request dispatch"
1369                    );
1370                    let Some(result_value) = notification_permission_payload(&response) else {
1371                        // Handler returned Deferred — it will call
1372                        // handlePendingPermissionRequest itself.
1373                        return;
1374                    };
1375                    let rpc_start = Instant::now();
1376                    let _ = client
1377                        .call(
1378                            "session.permissions.handlePendingPermissionRequest",
1379                            Some(serde_json::json!({
1380                                "sessionId": sid,
1381                                "requestId": request_id,
1382                                "result": result_value,
1383                            })),
1384                        )
1385                        .await;
1386                    tracing::debug!(
1387                        elapsed_ms = rpc_start.elapsed().as_millis(),
1388                        session_id = %sid,
1389                        request_id = %request_id,
1390                        "Session::handle_notification response sent successfully"
1391                    );
1392                }
1393                .instrument(span),
1394            );
1395        }
1396        SessionEventType::ExternalToolRequested => {
1397            let Some(request_id) = extract_request_id(&notification.event.data) else {
1398                return;
1399            };
1400            let data: ExternalToolRequestedData =
1401                match serde_json::from_value(notification.event.data.clone()) {
1402                    Ok(d) => d,
1403                    Err(e) => {
1404                        warn!(error = %e, "failed to deserialize external_tool.requested");
1405                        let client = client.clone();
1406                        let sid = session_id.clone();
1407                        let span = tracing::error_span!(
1408                            "external_tool_deserialize_error",
1409                            session_id = %sid,
1410                            request_id = %request_id
1411                        );
1412                        tokio::spawn(
1413                            async move {
1414                                let rpc_start = Instant::now();
1415                                let _ = client
1416                                .call(
1417                                    "session.tools.handlePendingToolCall",
1418                                    Some(serde_json::json!({
1419                                        "sessionId": sid,
1420                                        "requestId": request_id,
1421                                        "error": format!("Failed to deserialize tool request: {e}"),
1422                                    })),
1423                                )
1424                                .await;
1425                                tracing::debug!(
1426                                    elapsed_ms = rpc_start.elapsed().as_millis(),
1427                                    session_id = %sid,
1428                                    request_id = %request_id,
1429                                    "Session::handle_notification response sent successfully"
1430                                );
1431                            }
1432                            .instrument(span),
1433                        );
1434                        return;
1435                    }
1436                };
1437            let client = client.clone();
1438            let handler = handler.clone();
1439            let sid = session_id.clone();
1440            let span = tracing::error_span!(
1441                "external_tool_handler",
1442                session_id = %sid,
1443                request_id = %request_id
1444            );
1445            tokio::spawn(
1446                async move {
1447                    if data.tool_call_id.is_empty() || data.tool_name.is_empty() {
1448                        let error_msg = if data.tool_call_id.is_empty() {
1449                            "Missing toolCallId"
1450                        } else {
1451                            "Missing toolName"
1452                        };
1453                        let rpc_start = Instant::now();
1454                        let _ = client
1455                            .call(
1456                                "session.tools.handlePendingToolCall",
1457                                Some(serde_json::json!({
1458                                    "sessionId": sid,
1459                                    "requestId": request_id,
1460                                    "error": error_msg,
1461                                })),
1462                            )
1463                            .await;
1464                        tracing::debug!(
1465                            elapsed_ms = rpc_start.elapsed().as_millis(),
1466                            session_id = %sid,
1467                            request_id = %request_id,
1468                            "Session::handle_notification response sent successfully"
1469                        );
1470                        return;
1471                    }
1472                    let tool_call_id = data.tool_call_id.clone();
1473                    let tool_name = data.tool_name.clone();
1474                    let invocation = ToolInvocation {
1475                        session_id: sid.clone(),
1476                        tool_call_id: data.tool_call_id,
1477                        tool_name: data.tool_name,
1478                        arguments: data
1479                            .arguments
1480                            .unwrap_or(Value::Object(serde_json::Map::new())),
1481                        traceparent: data.traceparent,
1482                        tracestate: data.tracestate,
1483                    };
1484                    let handler_start = Instant::now();
1485                    let response = handler
1486                        .on_event(HandlerEvent::ExternalTool { invocation })
1487                        .await;
1488                    tracing::debug!(
1489                        elapsed_ms = handler_start.elapsed().as_millis(),
1490                        session_id = %sid,
1491                        request_id = %request_id,
1492                        tool_call_id = %tool_call_id,
1493                        tool_name = %tool_name,
1494                        "ToolHandler::call dispatch"
1495                    );
1496                    let Some(result_value) = notification_tool_payload(response) else {
1497                        return;
1498                    };
1499                    let rpc_start = Instant::now();
1500                    let _ = client
1501                        .call(
1502                            "session.tools.handlePendingToolCall",
1503                            Some(serde_json::json!({
1504                                "sessionId": sid,
1505                                "requestId": request_id,
1506                                "result": result_value,
1507                            })),
1508                        )
1509                        .await;
1510                    tracing::debug!(
1511                        elapsed_ms = rpc_start.elapsed().as_millis(),
1512                        session_id = %sid,
1513                        request_id = %request_id,
1514                        tool_call_id = %tool_call_id,
1515                        tool_name = %tool_name,
1516                        "Session::handle_notification response sent successfully"
1517                    );
1518                }
1519                .instrument(span),
1520            );
1521        }
1522        SessionEventType::UserInputRequested => {
1523            // Notification-only signal for observers (UI, telemetry).
1524            // The CLI follows up with a `userInput.request` JSON-RPC call
1525            // that drives `HandlerEvent::UserInput` dispatch — handling
1526            // the notification here too would double-fire the handler
1527            // and produce duplicate prompts on the consumer side. See
1528            // github/github-app#4249.
1529        }
1530        SessionEventType::ElicitationRequested => {
1531            let Some(request_id) = extract_request_id(&notification.event.data) else {
1532                return;
1533            };
1534            let elicitation_data: ElicitationRequestedData =
1535                match serde_json::from_value(notification.event.data.clone()) {
1536                    Ok(d) => d,
1537                    Err(e) => {
1538                        warn!(error = %e, "failed to deserialize elicitation request");
1539                        return;
1540                    }
1541                };
1542            let request = ElicitationRequest {
1543                message: elicitation_data.message,
1544                requested_schema: elicitation_data
1545                    .requested_schema
1546                    .map(|s| serde_json::to_value(s).unwrap_or(Value::Null)),
1547                mode: elicitation_data.mode.map(|m| match m {
1548                    crate::generated::session_events::ElicitationRequestedMode::Form => {
1549                        crate::types::ElicitationMode::Form
1550                    }
1551                    crate::generated::session_events::ElicitationRequestedMode::Url => {
1552                        crate::types::ElicitationMode::Url
1553                    }
1554                    _ => crate::types::ElicitationMode::Unknown,
1555                }),
1556                elicitation_source: elicitation_data.elicitation_source,
1557                url: elicitation_data.url,
1558            };
1559            let client = client.clone();
1560            let handler = handler.clone();
1561            let sid = session_id.clone();
1562            let span = tracing::error_span!(
1563                "elicitation_request_handler",
1564                session_id = %sid,
1565                request_id = %request_id
1566            );
1567            tokio::spawn(
1568                async move {
1569                    let cancel = ElicitationResult {
1570                        action: "cancel".to_string(),
1571                        content: None,
1572                    };
1573                    // Dispatch to a nested task so panics are caught as JoinErrors.
1574                    let handler_task = tokio::spawn({
1575                        let sid = sid.clone();
1576                        let request_id = request_id.clone();
1577                        let span = tracing::error_span!(
1578                            "elicitation_callback",
1579                            session_id = %sid,
1580                            request_id = %request_id
1581                        );
1582                        async move {
1583                            let handler_start = Instant::now();
1584                            let response = handler
1585                                .on_event(HandlerEvent::ElicitationRequest {
1586                                    session_id: sid.clone(),
1587                                    request_id: request_id.clone(),
1588                                    request,
1589                                })
1590                                .await;
1591                            tracing::debug!(
1592                                elapsed_ms = handler_start.elapsed().as_millis(),
1593                                session_id = %sid,
1594                                request_id = %request_id,
1595                                "SessionHandler::on_elicitation dispatch"
1596                            );
1597                            response
1598                        }
1599                        .instrument(span)
1600                    });
1601                    let result = match handler_task.await {
1602                        Ok(HandlerResponse::Elicitation(r)) => r,
1603                        _ => cancel.clone(),
1604                    };
1605                    let rpc_start = Instant::now();
1606                    if let Err(e) = client
1607                        .call(
1608                            "session.ui.handlePendingElicitation",
1609                            Some(serde_json::json!({
1610                                "sessionId": sid,
1611                                "requestId": request_id,
1612                                "result": result,
1613                            })),
1614                        )
1615                        .await
1616                    {
1617                        // RPC failed — attempt cancel as last resort
1618                        warn!(error = %e, "handlePendingElicitation failed, sending cancel");
1619                        let _ = client
1620                            .call(
1621                                "session.ui.handlePendingElicitation",
1622                                Some(serde_json::json!({
1623                                    "sessionId": sid,
1624                                    "requestId": request_id,
1625                                    "result": cancel,
1626                                })),
1627                            )
1628                            .await;
1629                    } else {
1630                        tracing::debug!(
1631                            elapsed_ms = rpc_start.elapsed().as_millis(),
1632                            session_id = %sid,
1633                            request_id = %request_id,
1634                            "Session::handle_notification response sent successfully"
1635                        );
1636                    }
1637                }
1638                .instrument(span),
1639            );
1640        }
1641        SessionEventType::CommandExecute => {
1642            let data: CommandExecuteData =
1643                match serde_json::from_value(notification.event.data.clone()) {
1644                    Ok(d) => d,
1645                    Err(e) => {
1646                        warn!(error = %e, "failed to deserialize command.execute");
1647                        return;
1648                    }
1649                };
1650            let client = client.clone();
1651            let command_handlers = command_handlers.clone();
1652            let sid = session_id.clone();
1653            let span = tracing::error_span!("command_handler", session_id = %sid);
1654            tokio::spawn(
1655                async move {
1656                    let request_id = data.request_id;
1657                    let ack_error = match command_handlers.get(&data.command_name).cloned() {
1658                        None => Some(format!("Unknown command: {}", data.command_name)),
1659                        Some(handler) => {
1660                            let command_name = data.command_name.clone();
1661                            let ctx = CommandContext {
1662                                session_id: sid.clone(),
1663                                command: data.command,
1664                                command_name: data.command_name,
1665                                args: data.args,
1666                            };
1667                            let handler_start = Instant::now();
1668                            let result = handler.on_command(ctx).await;
1669                            tracing::debug!(
1670                                elapsed_ms = handler_start.elapsed().as_millis(),
1671                                session_id = %sid,
1672                                request_id = %request_id,
1673                                command_name = %command_name,
1674                                "CommandHandler::call dispatch"
1675                            );
1676                            match result {
1677                                Ok(()) => None,
1678                                Err(e) => Some(e.to_string()),
1679                            }
1680                        }
1681                    };
1682                    let mut params = serde_json::json!({
1683                        "sessionId": sid,
1684                        "requestId": request_id,
1685                    });
1686                    if let Some(error_msg) = ack_error {
1687                        params["error"] = serde_json::Value::String(error_msg);
1688                    }
1689                    let rpc_start = Instant::now();
1690                    let _ = client
1691                        .call("session.commands.handlePendingCommand", Some(params))
1692                        .await;
1693                    tracing::debug!(
1694                        elapsed_ms = rpc_start.elapsed().as_millis(),
1695                        session_id = %sid,
1696                        request_id = %request_id,
1697                        "Session::handle_notification response sent successfully"
1698                    );
1699                }
1700                .instrument(span),
1701            );
1702        }
1703        _ => {}
1704    }
1705}
1706
1707/// Process a JSON-RPC request from the CLI.
1708async fn handle_request(
1709    session_id: &SessionId,
1710    client: &Client,
1711    handler: &Arc<dyn SessionHandler>,
1712    hooks: Option<&dyn SessionHooks>,
1713    transforms: Option<&dyn SystemMessageTransform>,
1714    session_fs_provider: Option<&Arc<dyn SessionFsProvider>>,
1715    request: crate::JsonRpcRequest,
1716) {
1717    let sid = session_id.clone();
1718
1719    if request.method.starts_with("sessionFs.") {
1720        crate::session_fs_dispatch::dispatch(client, session_fs_provider, request).await;
1721        return;
1722    }
1723
1724    match request.method.as_str() {
1725        "hooks.invoke" => {
1726            let params = request.params.as_ref();
1727            let hook_type = params
1728                .and_then(|p| p.get("hookType"))
1729                .and_then(|v| v.as_str())
1730                .unwrap_or("");
1731            let input = params
1732                .and_then(|p| p.get("input"))
1733                .cloned()
1734                .unwrap_or(Value::Object(Default::default()));
1735
1736            let rpc_result = if let Some(hooks) = hooks {
1737                match crate::hooks::dispatch_hook(hooks, &sid, hook_type, input).await {
1738                    Ok(output) => output,
1739                    Err(e) => {
1740                        warn!(error = %e, hook_type = hook_type, "hook dispatch failed");
1741                        serde_json::json!({ "output": {} })
1742                    }
1743                }
1744            } else {
1745                serde_json::json!({ "output": {} })
1746            };
1747
1748            let rpc_response = JsonRpcResponse {
1749                jsonrpc: "2.0".to_string(),
1750                id: request.id,
1751                result: Some(rpc_result),
1752                error: None,
1753            };
1754            let _ = client.send_response(&rpc_response).await;
1755        }
1756
1757        "tool.call" => {
1758            let invocation: ToolInvocation = match request
1759                .params
1760                .as_ref()
1761                .and_then(|p| serde_json::from_value::<ToolInvocation>(p.clone()).ok())
1762            {
1763                Some(inv) => inv,
1764                None => {
1765                    let _ = send_error_response(
1766                        client,
1767                        request.id,
1768                        error_codes::INVALID_PARAMS,
1769                        "invalid tool.call params",
1770                    )
1771                    .await;
1772                    return;
1773                }
1774            };
1775            let tool_call_id = invocation.tool_call_id.clone();
1776            let tool_name = invocation.tool_name.clone();
1777            let handler_start = Instant::now();
1778            let response = handler
1779                .on_event(HandlerEvent::ExternalTool { invocation })
1780                .await;
1781            tracing::debug!(
1782                elapsed_ms = handler_start.elapsed().as_millis(),
1783                session_id = %sid,
1784                tool_call_id = %tool_call_id,
1785                tool_name = %tool_name,
1786                "ToolHandler::call dispatch"
1787            );
1788            let tool_result = direct_tool_result(response);
1789            let rpc_response = JsonRpcResponse {
1790                jsonrpc: "2.0".to_string(),
1791                id: request.id,
1792                result: Some(serde_json::json!(ToolResultResponse {
1793                    result: tool_result
1794                })),
1795                error: None,
1796            };
1797            let _ = client.send_response(&rpc_response).await;
1798        }
1799
1800        "userInput.request" => {
1801            let params = request.params.as_ref();
1802            let Some(question) = params
1803                .and_then(|p| p.get("question"))
1804                .and_then(|v| v.as_str())
1805            else {
1806                warn!("userInput.request missing 'question' field");
1807                let rpc_response = JsonRpcResponse {
1808                    jsonrpc: "2.0".to_string(),
1809                    id: request.id,
1810                    result: None,
1811                    error: Some(crate::JsonRpcError {
1812                        code: error_codes::INVALID_PARAMS,
1813                        message: "missing required field: question".to_string(),
1814                        data: None,
1815                    }),
1816                };
1817                let _ = client.send_response(&rpc_response).await;
1818                return;
1819            };
1820            let question = question.to_string();
1821            let choices = params
1822                .and_then(|p| p.get("choices"))
1823                .and_then(|v| v.as_array())
1824                .map(|arr| {
1825                    arr.iter()
1826                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
1827                        .collect()
1828                });
1829            let allow_freeform = params
1830                .and_then(|p| p.get("allowFreeform"))
1831                .and_then(|v| v.as_bool());
1832
1833            let handler_start = Instant::now();
1834            let response = handler
1835                .on_event(HandlerEvent::UserInput {
1836                    session_id: sid.clone(),
1837                    question,
1838                    choices,
1839                    allow_freeform,
1840                })
1841                .await;
1842            tracing::debug!(
1843                elapsed_ms = handler_start.elapsed().as_millis(),
1844                session_id = %sid,
1845                "SessionHandler::on_user_input dispatch"
1846            );
1847
1848            let rpc_result = match response {
1849                HandlerResponse::UserInput(Some(UserInputResponse {
1850                    answer,
1851                    was_freeform,
1852                })) => serde_json::json!({
1853                    "answer": answer,
1854                    "wasFreeform": was_freeform,
1855                }),
1856                _ => serde_json::json!({ "noResponse": true }),
1857            };
1858            let rpc_response = JsonRpcResponse {
1859                jsonrpc: "2.0".to_string(),
1860                id: request.id,
1861                result: Some(rpc_result),
1862                error: None,
1863            };
1864            let _ = client.send_response(&rpc_response).await;
1865        }
1866
1867        "exitPlanMode.request" => {
1868            let params = request
1869                .params
1870                .as_ref()
1871                .cloned()
1872                .unwrap_or(Value::Object(serde_json::Map::new()));
1873            let data: ExitPlanModeData = match serde_json::from_value(params) {
1874                Ok(d) => d,
1875                Err(e) => {
1876                    warn!(error = %e, "failed to deserialize exitPlanMode.request params, using defaults");
1877                    ExitPlanModeData::default()
1878                }
1879            };
1880
1881            let response = handler
1882                .on_event(HandlerEvent::ExitPlanMode {
1883                    session_id: sid,
1884                    data,
1885                })
1886                .await;
1887
1888            let rpc_result = match response {
1889                HandlerResponse::ExitPlanMode(result) => serde_json::to_value(result)
1890                    .expect("ExitPlanModeResult serialization cannot fail"),
1891                _ => serde_json::json!({ "approved": true }),
1892            };
1893            let rpc_response = JsonRpcResponse {
1894                jsonrpc: "2.0".to_string(),
1895                id: request.id,
1896                result: Some(rpc_result),
1897                error: None,
1898            };
1899            let _ = client.send_response(&rpc_response).await;
1900        }
1901
1902        "autoModeSwitch.request" => {
1903            let error_code = request
1904                .params
1905                .as_ref()
1906                .and_then(|p| p.get("errorCode"))
1907                .and_then(|v| v.as_str())
1908                .map(|s| s.to_string());
1909            let retry_after_seconds = request
1910                .params
1911                .as_ref()
1912                .and_then(|p| p.get("retryAfterSeconds"))
1913                .and_then(|v| v.as_f64());
1914
1915            let response = handler
1916                .on_event(HandlerEvent::AutoModeSwitch {
1917                    session_id: sid,
1918                    error_code,
1919                    retry_after_seconds,
1920                })
1921                .await;
1922
1923            let answer = match response {
1924                HandlerResponse::AutoModeSwitch(answer) => answer,
1925                _ => AutoModeSwitchResponse::No,
1926            };
1927            let rpc_response = JsonRpcResponse {
1928                jsonrpc: "2.0".to_string(),
1929                id: request.id,
1930                result: Some(serde_json::json!({ "response": answer })),
1931                error: None,
1932            };
1933            let _ = client.send_response(&rpc_response).await;
1934        }
1935
1936        "permission.request" => {
1937            let Some(request_id) = request
1938                .params
1939                .as_ref()
1940                .and_then(|p| p.get("requestId"))
1941                .and_then(|v| v.as_str())
1942                .filter(|s| !s.is_empty())
1943            else {
1944                warn!("permission.request missing 'requestId' field");
1945                let rpc_response = JsonRpcResponse {
1946                    jsonrpc: "2.0".to_string(),
1947                    id: request.id,
1948                    result: None,
1949                    error: Some(crate::JsonRpcError {
1950                        code: error_codes::INVALID_PARAMS,
1951                        message: "missing required field: requestId".to_string(),
1952                        data: None,
1953                    }),
1954                };
1955                let _ = client.send_response(&rpc_response).await;
1956                return;
1957            };
1958            let request_id = RequestId::new(request_id);
1959            let raw_params = request
1960                .params
1961                .as_ref()
1962                .cloned()
1963                .unwrap_or(Value::Object(serde_json::Map::new()));
1964            let data: PermissionRequestData =
1965                serde_json::from_value(raw_params.clone()).unwrap_or(PermissionRequestData {
1966                    kind: None,
1967                    tool_call_id: None,
1968                    extra: raw_params,
1969                });
1970
1971            let handler_start = Instant::now();
1972            let response = handler
1973                .on_event(HandlerEvent::PermissionRequest {
1974                    session_id: sid.clone(),
1975                    request_id: request_id.clone(),
1976                    data,
1977                })
1978                .await;
1979            tracing::debug!(
1980                elapsed_ms = handler_start.elapsed().as_millis(),
1981                session_id = %sid,
1982                request_id = %request_id,
1983                "SessionHandler::on_permission_request dispatch"
1984            );
1985            let rpc_response = JsonRpcResponse {
1986                jsonrpc: "2.0".to_string(),
1987                id: request.id,
1988                result: Some(direct_permission_payload(&response)),
1989                error: None,
1990            };
1991            let _ = client.send_response(&rpc_response).await;
1992        }
1993
1994        "systemMessage.transform" => {
1995            let params = request.params.as_ref();
1996            let sections: HashMap<String, crate::transforms::TransformSection> =
1997                match params.and_then(|p| p.get("sections")) {
1998                    Some(v) => match serde_json::from_value(v.clone()) {
1999                        Ok(s) => s,
2000                        Err(e) => {
2001                            let _ = send_error_response(
2002                                client,
2003                                request.id,
2004                                error_codes::INVALID_PARAMS,
2005                                &format!("invalid sections: {e}"),
2006                            )
2007                            .await;
2008                            return;
2009                        }
2010                    },
2011                    None => {
2012                        let _ = send_error_response(
2013                            client,
2014                            request.id,
2015                            error_codes::INVALID_PARAMS,
2016                            "missing sections parameter",
2017                        )
2018                        .await;
2019                        return;
2020                    }
2021                };
2022
2023            let rpc_result = if let Some(transforms) = transforms {
2024                let transform_start = Instant::now();
2025                let response =
2026                    crate::transforms::dispatch_transform(transforms, &sid, sections).await;
2027                tracing::debug!(
2028                    elapsed_ms = transform_start.elapsed().as_millis(),
2029                    session_id = %sid,
2030                    "SystemMessageTransform::transform_section dispatch"
2031                );
2032                match serde_json::to_value(response) {
2033                    Ok(v) => v,
2034                    Err(e) => {
2035                        warn!(error = %e, "failed to serialize transform response");
2036                        serde_json::json!({ "sections": {} })
2037                    }
2038                }
2039            } else {
2040                // No transforms registered — pass through all sections unchanged.
2041                let passthrough: HashMap<String, crate::transforms::TransformSection> = sections;
2042                serde_json::json!({ "sections": passthrough })
2043            };
2044
2045            let rpc_response = JsonRpcResponse {
2046                jsonrpc: "2.0".to_string(),
2047                id: request.id,
2048                result: Some(rpc_result),
2049                error: None,
2050            };
2051            let _ = client.send_response(&rpc_response).await;
2052        }
2053
2054        method => {
2055            warn!(
2056                method = method,
2057                "unhandled request method in session event loop"
2058            );
2059            let _ = send_error_response(
2060                client,
2061                request.id,
2062                error_codes::METHOD_NOT_FOUND,
2063                &format!("unknown method: {method}"),
2064            )
2065            .await;
2066        }
2067    }
2068}
2069
2070async fn send_error_response(
2071    client: &Client,
2072    id: u64,
2073    code: i32,
2074    message: &str,
2075) -> Result<(), Error> {
2076    let response = JsonRpcResponse {
2077        jsonrpc: "2.0".to_string(),
2078        id,
2079        result: None,
2080        error: Some(crate::JsonRpcError {
2081            code,
2082            message: message.to_string(),
2083            data: None,
2084        }),
2085    };
2086    client.send_response(&response).await
2087}
2088
2089/// Inject `action: "transform"` sections into a `SystemMessageConfig`,
2090/// forcing `mode: "customize"` (required by the CLI for transforms to fire).
2091/// Preserves any existing caller-provided section overrides.
2092fn apply_transform_sections(
2093    sys_msg: &mut SystemMessageConfig,
2094    transforms: &dyn SystemMessageTransform,
2095) {
2096    sys_msg.mode = Some("customize".to_string());
2097    let sections = sys_msg.sections.get_or_insert_with(HashMap::new);
2098    for id in transforms.section_ids() {
2099        sections.entry(id).or_insert_with(|| SectionOverride {
2100            action: Some("transform".to_string()),
2101            content: None,
2102        });
2103    }
2104}
2105
2106fn inject_transform_sections(config: &mut SessionConfig, transforms: &dyn SystemMessageTransform) {
2107    let sys_msg = config.system_message.get_or_insert_with(Default::default);
2108    apply_transform_sections(sys_msg, transforms);
2109}
2110
2111fn inject_transform_sections_resume(
2112    config: &mut ResumeSessionConfig,
2113    transforms: &dyn SystemMessageTransform,
2114) {
2115    let sys_msg = config.system_message.get_or_insert_with(Default::default);
2116    apply_transform_sections(sys_msg, transforms);
2117}
2118
2119#[cfg(test)]
2120mod tests {
2121    use serde_json::json;
2122
2123    use super::{
2124        direct_permission_payload, notification_permission_payload, pending_permission_result_kind,
2125        permission_request_response,
2126    };
2127    use crate::handler::{HandlerResponse, PermissionResult};
2128
2129    #[test]
2130    fn pending_permission_requests_use_decision_kinds() {
2131        assert_eq!(
2132            pending_permission_result_kind(&HandlerResponse::Permission(
2133                PermissionResult::Approved,
2134            )),
2135            "approve-once"
2136        );
2137        assert_eq!(
2138            pending_permission_result_kind(&HandlerResponse::Permission(PermissionResult::Denied)),
2139            "reject"
2140        );
2141        assert_eq!(
2142            pending_permission_result_kind(&HandlerResponse::Ok),
2143            "user-not-available"
2144        );
2145    }
2146
2147    #[test]
2148    fn direct_permission_requests_use_decision_response_kinds() {
2149        assert_eq!(
2150            serde_json::to_value(permission_request_response(&HandlerResponse::Permission(
2151                PermissionResult::Approved
2152            ),))
2153            .expect("serializing approved permission response should succeed"),
2154            json!({ "kind": "approve-once" })
2155        );
2156        assert_eq!(
2157            serde_json::to_value(permission_request_response(&HandlerResponse::Permission(
2158                PermissionResult::Denied
2159            ),))
2160            .expect("serializing denied permission response should succeed"),
2161            json!({ "kind": "reject" })
2162        );
2163        assert_eq!(
2164            serde_json::to_value(permission_request_response(&HandlerResponse::Ok))
2165                .expect("serializing fallback permission response should succeed"),
2166            json!({ "kind": "reject" })
2167        );
2168    }
2169
2170    #[test]
2171    fn notification_payload_handles_non_responses_and_custom() {
2172        // Deferred/NoResult -> no payload, SDK must not respond.
2173        assert!(
2174            notification_permission_payload(&HandlerResponse::Permission(
2175                PermissionResult::Deferred,
2176            ))
2177            .is_none()
2178        );
2179        assert!(
2180            notification_permission_payload(&HandlerResponse::Permission(
2181                PermissionResult::NoResult,
2182            ))
2183            .is_none()
2184        );
2185
2186        // Custom → handler-supplied value passed through verbatim.
2187        let custom = json!({
2188            "kind": "approve-and-remember",
2189            "allowlist": ["ls", "grep"],
2190        });
2191        assert_eq!(
2192            notification_permission_payload(&HandlerResponse::Permission(
2193                PermissionResult::Custom(custom.clone()),
2194            )),
2195            Some(custom)
2196        );
2197
2198        // Approved/Denied → existing kind-only shape.
2199        assert_eq!(
2200            notification_permission_payload(&HandlerResponse::Permission(
2201                PermissionResult::Approved,
2202            )),
2203            Some(json!({ "kind": "approve-once" }))
2204        );
2205        assert_eq!(
2206            notification_permission_payload(
2207                &HandlerResponse::Permission(PermissionResult::Denied,)
2208            ),
2209            Some(json!({ "kind": "reject" }))
2210        );
2211    }
2212
2213    #[test]
2214    fn direct_payload_handles_deferred_and_custom() {
2215        // Custom → handler-supplied value passed through verbatim.
2216        let custom = json!({
2217            "kind": "approve-and-remember",
2218            "allowlist": ["ls", "grep"],
2219        });
2220        assert_eq!(
2221            direct_permission_payload(&HandlerResponse::Permission(PermissionResult::Custom(
2222                custom.clone(),
2223            ))),
2224            custom
2225        );
2226
2227        // Deferred → falls back to Approved because the direct RPC must reply.
2228        assert_eq!(
2229            direct_permission_payload(&HandlerResponse::Permission(PermissionResult::Deferred)),
2230            json!({ "kind": "approve-once" })
2231        );
2232
2233        // NoResult -> direct RPC cannot be left pending, so report no user.
2234        assert_eq!(
2235            direct_permission_payload(&HandlerResponse::Permission(PermissionResult::NoResult)),
2236            json!({ "kind": "user-not-available" })
2237        );
2238
2239        // Approved/Denied → existing kind-only shape.
2240        assert_eq!(
2241            direct_permission_payload(&HandlerResponse::Permission(PermissionResult::Approved)),
2242            json!({ "kind": "approve-once" })
2243        );
2244        assert_eq!(
2245            direct_permission_payload(&HandlerResponse::Permission(PermissionResult::Denied)),
2246            json!({ "kind": "reject" })
2247        );
2248    }
2249}