Skip to main content

github_copilot_sdk/
session.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use parking_lot::Mutex as ParkingLotMutex;
7use serde_json::Value;
8use tokio::sync::oneshot;
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tracing::{Instrument, warn};
12
13use crate::generated::api_types::{
14    LogRequest, ModelSwitchToRequest, PermissionDecision, PermissionDecisionApproveOnce,
15    PermissionDecisionApproveOnceKind, PermissionDecisionReject, PermissionDecisionRejectKind,
16};
17use crate::generated::session_events::{
18    CommandExecuteData, ElicitationRequestedData, ExternalToolRequestedData, SessionErrorData,
19    SessionEventType,
20};
21use crate::handler::{
22    AutoModeSwitchResponse, HandlerEvent, HandlerResponse, PermissionResult, SessionHandler,
23    UserInputResponse,
24};
25use crate::hooks::SessionHooks;
26use crate::session_fs::SessionFsProvider;
27use crate::trace_context::inject_trace_context;
28use crate::transforms::SystemMessageTransform;
29use crate::types::{
30    CommandContext, CommandDefinition, CommandHandler, CreateSessionResult, ElicitationRequest,
31    ElicitationResult, ExitPlanModeData, GetMessagesResponse, InputOptions, MessageOptions,
32    PermissionRequestData, RequestId, ResumeSessionConfig, SectionOverride, SessionCapabilities,
33    SessionConfig, SessionEvent, SessionId, SetModelOptions, SystemMessageConfig, ToolInvocation,
34    ToolResult, ToolResultResponse, TraceContext, ensure_attachment_display_names,
35};
36use crate::{Client, Error, JsonRpcResponse, SessionError, SessionEventNotification, error_codes};
37
38/// Shared state between a [`Session`] and its event loop, used by [`Session::send_and_wait`].
39struct IdleWaiter {
40    tx: oneshot::Sender<Result<Option<SessionEvent>, Error>>,
41    last_assistant_message: Option<SessionEvent>,
42    started_at: Instant,
43    first_assistant_message_seen: bool,
44}
45
46/// RAII guard that clears the [`Session::idle_waiter`] slot on drop. Used
47/// by [`Session::send_and_wait`] to ensure the slot doesn't leak if the
48/// caller's future is cancelled (outer `tokio::time::timeout` / `select!`
49/// / dropped JoinHandle). Synchronous clear via `parking_lot::Mutex` —
50/// no async drop needed.
51///
52/// Without this, an outer cancellation between "install waiter" and
53/// "drain channel" would leave the slot occupied, causing all subsequent
54/// `send` and `send_and_wait` calls on the session to return
55/// [`SendWhileWaiting`](SessionError::SendWhileWaiting). Closes RFD-400
56/// review finding #2.
57struct WaiterGuard {
58    slot: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
59}
60
61impl Drop for WaiterGuard {
62    fn drop(&mut self) {
63        self.slot.lock().take();
64    }
65}
66
67struct PendingSessionRegistration {
68    client: Client,
69    session_id: SessionId,
70    shutdown: CancellationToken,
71    disarmed: bool,
72}
73
74impl PendingSessionRegistration {
75    fn new(client: Client, session_id: SessionId, shutdown: CancellationToken) -> Self {
76        Self {
77            client,
78            session_id,
79            shutdown,
80            disarmed: false,
81        }
82    }
83
84    async fn cleanup(mut self, event_loop: JoinHandle<()>) {
85        self.shutdown.cancel();
86        let _ = event_loop.await;
87        self.client.unregister_session(&self.session_id);
88        self.disarmed = true;
89    }
90
91    fn disarm(&mut self) {
92        self.disarmed = true;
93    }
94}
95
96impl Drop for PendingSessionRegistration {
97    fn drop(&mut self) {
98        if !self.disarmed {
99            self.shutdown.cancel();
100            self.client.unregister_session(&self.session_id);
101        }
102    }
103}
104
105/// A session on a GitHub Copilot CLI server.
106///
107/// Created via [`Client::create_session`] or [`Client::resume_session`].
108/// Owns an internal event loop that dispatches events to the [`SessionHandler`].
109///
110/// Protocol methods (`send`, `get_messages`, `abort`, etc.) automatically
111/// inject the session ID into RPC params.
112///
113/// Call [`destroy`](Self::destroy) for graceful cleanup (RPC + local). If dropped
114/// without calling `destroy`, the `Drop` impl aborts the event loop and
115/// unregisters from the router as a best-effort safety net.
116pub struct Session {
117    id: SessionId,
118    cwd: PathBuf,
119    workspace_path: Option<PathBuf>,
120    remote_url: Option<String>,
121    client: Client,
122    /// Handle to the spawned event-loop task. Sync `parking_lot::Mutex`
123    /// because the lock is never held across an `.await` and the `Drop`
124    /// impl needs to take the handle synchronously without `try_lock`
125    /// fallibility.
126    event_loop: ParkingLotMutex<Option<JoinHandle<()>>>,
127    /// Cooperative shutdown signal for the event loop. The loop selects
128    /// on [`shutdown.cancelled()`](CancellationToken::cancelled) alongside
129    /// its inbound channels; [`Session::stop_event_loop`] and [`Drop`]
130    /// both call [`cancel()`](CancellationToken::cancel) to ask the loop
131    /// to exit between iterations rather than aborting the task (which
132    /// can land at any await point and leave the session mid-protocol).
133    /// See RFD-400 review finding #3.
134    ///
135    /// `CancellationToken` is the canonical signalling primitive in
136    /// `tokio_util`; it is what `tonic` uses for the equivalent task-
137    /// coordination case. Advanced consumers can obtain a child token
138    /// via [`Session::cancellation_token`] to bind their own work to
139    /// the session lifetime.
140    shutdown: CancellationToken,
141    /// Only populated while a `send_and_wait` call is in flight.
142    ///
143    /// Sync `parking_lot::Mutex` because the lock is never held across an
144    /// `.await`, and synchronous access lets the `WaiterGuard` RAII helper
145    /// in `send_and_wait` clear the slot from a `Drop` impl on caller-side
146    /// cancellation. See RFD-400 review (cancel-safety hardening).
147    idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
148    /// Capabilities negotiated with the CLI, updated on `capabilities.changed` events.
149    capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
150    /// Broadcast channel for runtime event subscribers — see [`Session::subscribe`].
151    event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
152}
153
154impl Session {
155    /// Session ID assigned by the CLI.
156    pub fn id(&self) -> &SessionId {
157        &self.id
158    }
159
160    /// Working directory of the CLI process.
161    pub fn cwd(&self) -> &PathBuf {
162        &self.cwd
163    }
164
165    /// Workspace directory for the session (if using infinite sessions).
166    pub fn workspace_path(&self) -> Option<&Path> {
167        self.workspace_path.as_deref()
168    }
169
170    /// Remote session URL, if the session is running remotely.
171    pub fn remote_url(&self) -> Option<&str> {
172        self.remote_url.as_deref()
173    }
174
175    /// Session capabilities negotiated with the CLI.
176    ///
177    /// Capabilities are set during session creation and updated at runtime
178    /// via `capabilities.changed` events.
179    pub fn capabilities(&self) -> SessionCapabilities {
180        self.capabilities.read().clone()
181    }
182
183    /// Returns a [`CancellationToken`] that fires when this session shuts
184    /// down (via [`Session::stop_event_loop`], [`Session::destroy`], or
185    /// [`Drop`]).
186    ///
187    /// Use this to bind an external task's lifetime to the session — when
188    /// the session shuts down, awaiting [`cancelled()`](CancellationToken::cancelled)
189    /// resolves so cooperative consumers can stop cleanly.
190    ///
191    /// The returned handle is a *child* token: calling
192    /// [`cancel()`](CancellationToken::cancel) on it cancels only the
193    /// caller's child, not the session itself. To cancel the session, call
194    /// [`Session::stop_event_loop`].
195    ///
196    /// # Example
197    ///
198    /// ```no_run
199    /// # async fn example(session: github_copilot_sdk::session::Session) {
200    /// let token = session.cancellation_token();
201    /// tokio::select! {
202    ///     _ = token.cancelled() => println!("session shut down"),
203    ///     _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
204    ///         println!("60s elapsed, session still alive");
205    ///     }
206    /// }
207    /// # }
208    /// ```
209    pub fn cancellation_token(&self) -> CancellationToken {
210        self.shutdown.child_token()
211    }
212
213    /// Subscribe to events for this session.
214    ///
215    /// Returns an [`EventSubscription`](crate::subscription::EventSubscription)
216    /// that yields every [`SessionEvent`] dispatched on this session's
217    /// event loop. Drop the value to unsubscribe; there is no separate
218    /// cancel handle.
219    ///
220    /// **Observe-only.** Subscribers receive a clone of every
221    /// [`SessionEvent`] but cannot influence permission decisions, tool
222    /// results, or anything else that requires returning a
223    /// [`HandlerResponse`]. Those remain
224    /// the responsibility of the [`SessionHandler`] passed via
225    /// [`SessionConfig::handler`](crate::types::SessionConfig::handler).
226    ///
227    /// The returned handle implements both an inherent
228    /// [`recv`](crate::subscription::EventSubscription::recv) method and
229    /// [`Stream`](tokio_stream::Stream), so callers can use a `while let`
230    /// loop or any combinator from `tokio_stream::StreamExt` /
231    /// `futures::StreamExt`.
232    ///
233    /// Each subscriber maintains its own queue. If a consumer cannot keep
234    /// up, the oldest events are dropped and `recv` returns
235    /// [`RecvError::Lagged`](crate::subscription::RecvError::Lagged)
236    /// reporting the count of skipped events. Slow consumers do not block
237    /// the session's event loop.
238    ///
239    /// # Example
240    ///
241    /// ```no_run
242    /// # async fn example(session: github_copilot_sdk::session::Session) {
243    /// let mut events = session.subscribe();
244    /// tokio::spawn(async move {
245    ///     while let Ok(event) = events.recv().await {
246    ///         println!("[{}] event {}", event.id, event.event_type);
247    ///     }
248    /// });
249    /// # }
250    /// ```
251    pub fn subscribe(&self) -> crate::subscription::EventSubscription {
252        crate::subscription::EventSubscription::new(self.event_tx.subscribe())
253    }
254
255    /// The underlying Client (for advanced use cases).
256    pub fn client(&self) -> &Client {
257        &self.client
258    }
259
260    /// Typed RPC namespace for this session.
261    ///
262    /// Every protocol method lives here under its schema-aligned path —
263    /// e.g. `session.rpc().workspaces().list_files()`. Wire method names
264    /// and request/response types are generated from the protocol schema,
265    /// so the typed namespace can't drift from the wire contract.
266    ///
267    /// The hand-authored helpers on [`Session`] delegate to this namespace
268    /// and remain the recommended entry point for everyday use; reach for
269    /// `rpc()` when you want a method without a hand-written wrapper.
270    pub fn rpc(&self) -> crate::generated::rpc::SessionRpc<'_> {
271        crate::generated::rpc::SessionRpc { session: self }
272    }
273
274    /// Stop the internal event loop. Called automatically on [`destroy`](Self::destroy).
275    ///
276    /// Cooperative: signals shutdown via the session's [`CancellationToken`]
277    /// and awaits the loop's natural exit rather than aborting the task.
278    /// Any in-flight handler (permission callback, tool call, elicitation
279    /// response) completes before the loop exits, so the CLI never sees a
280    /// half-handled request. See RFD-400 review finding #3.
281    pub async fn stop_event_loop(&self) {
282        self.shutdown.cancel();
283        let handle = self.event_loop.lock().take();
284        if let Some(handle) = handle {
285            let _ = handle.await;
286        }
287        // Fail any pending send_and_wait so it returns immediately.
288        if let Some(waiter) = self.idle_waiter.lock().take() {
289            let _ = waiter
290                .tx
291                .send(Err(Error::Session(SessionError::EventLoopClosed)));
292        }
293    }
294
295    /// Send a user message to the agent.
296    ///
297    /// Accepts anything convertible to [`MessageOptions`] — pass a `&str` for the
298    /// trivial case, or build a `MessageOptions` for mode/attachments. The
299    /// `wait_timeout` field on `MessageOptions` is ignored here (use
300    /// [`send_and_wait`](Self::send_and_wait) if you need to wait).
301    ///
302    /// Returns the assigned message ID, which can be used to correlate the
303    /// send with later [`SessionEvent`]s emitted in
304    /// response (assistant messages, tool requests, etc.).
305    ///
306    /// Returns an error if a [`send_and_wait`](Self::send_and_wait) call is
307    /// currently in flight, since the plain send would race with the waiter.
308    ///
309    /// # Cancel safety
310    ///
311    /// **Cancel-safe.** The underlying `session.send` RPC is dispatched
312    /// through the writer-actor (see [`Client::call`](crate::Client::call)),
313    /// so dropping this future after the actor has committed to writing
314    /// will not produce a partial frame on the wire. If the caller's
315    /// future is dropped between "frame enqueued" and "response received",
316    /// the message has already landed on the wire — the agent will process
317    /// it and emit events normally; the caller just won't see the returned
318    /// message ID.
319    pub async fn send(&self, opts: impl Into<MessageOptions>) -> Result<String, Error> {
320        if self.idle_waiter.lock().is_some() {
321            return Err(Error::Session(SessionError::SendWhileWaiting));
322        }
323        self.send_inner(opts.into()).await
324    }
325
326    async fn send_inner(&self, opts: MessageOptions) -> Result<String, Error> {
327        let mut params = serde_json::json!({
328            "sessionId": self.id,
329            "prompt": opts.prompt,
330        });
331        if let Some(m) = opts.mode {
332            params["mode"] = serde_json::to_value(m)?;
333        }
334        if let Some(mut a) = opts.attachments {
335            ensure_attachment_display_names(&mut a);
336            params["attachments"] = serde_json::to_value(a)?;
337        }
338        if let Some(headers) = opts.request_headers
339            && !headers.is_empty()
340        {
341            params["requestHeaders"] = serde_json::to_value(headers)?;
342        }
343        let trace_ctx = if opts.traceparent.is_some() || opts.tracestate.is_some() {
344            TraceContext {
345                traceparent: opts.traceparent,
346                tracestate: opts.tracestate,
347            }
348        } else {
349            self.client.resolve_trace_context().await
350        };
351        inject_trace_context(&mut params, &trace_ctx);
352        let rpc_start = Instant::now();
353        let result = self.client.call("session.send", Some(params)).await?;
354        let message_id = result
355            .get("messageId")
356            .and_then(|v| v.as_str())
357            .map(|s| s.to_string())
358            .unwrap_or_default();
359        tracing::debug!(
360            elapsed_ms = rpc_start.elapsed().as_millis(),
361            session_id = %self.id,
362            message_id = %message_id,
363            "Session::send completed successfully"
364        );
365        Ok(message_id)
366    }
367
368    /// Send a user message and wait for the agent to finish processing.
369    ///
370    /// Accepts anything convertible to [`MessageOptions`] — pass a `&str` for the
371    /// trivial case, or build a `MessageOptions` for mode/attachments/timeout.
372    /// Blocks until `session.idle` (success) or `session.error` (failure),
373    /// returning the last `assistant.message` event captured during streaming.
374    /// Times out after `MessageOptions::wait_timeout` (default 60 seconds).
375    ///
376    /// Only one `send_and_wait` call may be active per session at a time.
377    /// Calling [`send`](Self::send) while a `send_and_wait`
378    /// is in flight will also return an error.
379    ///
380    /// # Cancel safety
381    ///
382    /// **Cancel-safe.** A `WaiterGuard` clears the in-flight slot on every
383    /// exit path (success, internal failure, internal timeout, *and*
384    /// external cancellation via `tokio::time::timeout` / `select!` /
385    /// dropped JoinHandle). Subsequent `send` and `send_and_wait` calls on
386    /// this session will succeed normally — the slot is never leaked.
387    pub async fn send_and_wait(
388        &self,
389        opts: impl Into<MessageOptions>,
390    ) -> Result<Option<SessionEvent>, Error> {
391        let total_start = Instant::now();
392        let opts = opts.into();
393        let timeout_duration = opts.wait_timeout.unwrap_or(Duration::from_secs(60));
394        let (tx, rx) = oneshot::channel();
395
396        {
397            let mut guard = self.idle_waiter.lock();
398            if guard.is_some() {
399                return Err(Error::Session(SessionError::SendWhileWaiting));
400            }
401            *guard = Some(IdleWaiter {
402                tx,
403                last_assistant_message: None,
404                started_at: total_start,
405                first_assistant_message_seen: false,
406            });
407        }
408
409        // RAII: clears the idle_waiter slot on every exit path, including
410        // external cancellation (caller's outer `select!` / `timeout` /
411        // dropped future). Without this, an outer cancellation would leak
412        // the slot and brick subsequent `send`/`send_and_wait` calls.
413        let _waiter_guard = WaiterGuard {
414            slot: self.idle_waiter.clone(),
415        };
416
417        let result = tokio::time::timeout(timeout_duration, async {
418            self.send_inner(opts).await?;
419            match rx.await {
420                Ok(result) => result,
421                Err(_) => Err(Error::Session(SessionError::EventLoopClosed)),
422            }
423        })
424        .await;
425
426        match result {
427            Ok(inner) => {
428                tracing::debug!(
429                    elapsed_ms = total_start.elapsed().as_millis(),
430                    session_id = %self.id,
431                    completed_by = if inner.is_ok() { "idle" } else { "error" },
432                    "Session::send_and_wait complete"
433                );
434                inner
435            }
436            Err(_) => {
437                tracing::warn!(
438                    elapsed_ms = total_start.elapsed().as_millis(),
439                    session_id = %self.id,
440                    completed_by = "timeout",
441                    "Session::send_and_wait failed"
442                );
443                Err(Error::Session(SessionError::Timeout(timeout_duration)))
444            }
445        }
446    }
447
448    /// Retrieve the session's message history.
449    pub async fn get_messages(&self) -> Result<Vec<SessionEvent>, Error> {
450        let result = self
451            .client
452            .call(
453                "session.getMessages",
454                Some(serde_json::json!({ "sessionId": self.id })),
455            )
456            .await?;
457        let response: GetMessagesResponse = serde_json::from_value(result)?;
458        Ok(response.events)
459    }
460
461    /// Abort the current agent turn.
462    ///
463    /// # Cancel safety
464    ///
465    /// **Cancel-safe.** Single `session.abort` RPC; the underlying
466    /// [`Client::call`](crate::Client::call) is cancel-safe via the
467    /// writer-actor.
468    pub async fn abort(&self) -> Result<(), Error> {
469        self.client
470            .call(
471                "session.abort",
472                Some(serde_json::json!({ "sessionId": self.id })),
473            )
474            .await?;
475        Ok(())
476    }
477
478    /// Switch to a different model.
479    ///
480    /// Pass `None` for `opts` if no extra configuration is needed.
481    pub async fn set_model(&self, model: &str, opts: Option<SetModelOptions>) -> Result<(), Error> {
482        let opts = opts.unwrap_or_default();
483        let request = ModelSwitchToRequest {
484            model_id: model.to_string(),
485            reasoning_effort: opts.reasoning_effort,
486            model_capabilities: opts.model_capabilities,
487        };
488        self.rpc().model().switch_to(request).await?;
489        Ok(())
490    }
491
492    /// Disconnect this session from the CLI.
493    ///
494    /// Sends the `session.destroy` RPC, stops the event loop, and unregisters
495    /// the session from the client. **Session state on disk** (conversation
496    /// history, planning state, artifacts) is **preserved**, so the
497    /// conversation can be resumed later via [`Client::resume_session`]
498    /// using this session's ID. To permanently remove all on-disk session
499    /// data, use [`Client::delete_session`] instead.
500    ///
501    /// The caller should ensure the session is idle (e.g. [`send_and_wait`]
502    /// has returned) before disconnecting; in-flight tool or event handlers
503    /// may otherwise observe failures.
504    ///
505    /// [`Client::resume_session`]: crate::Client::resume_session
506    /// [`Client::delete_session`]: crate::Client::delete_session
507    /// [`send_and_wait`]: Self::send_and_wait
508    pub async fn disconnect(&self) -> Result<(), Error> {
509        self.client
510            .call(
511                "session.destroy",
512                Some(serde_json::json!({ "sessionId": self.id })),
513            )
514            .await?;
515        self.stop_event_loop().await;
516        self.client.unregister_session(&self.id);
517        Ok(())
518    }
519
520    /// Alias for [`disconnect`](Self::disconnect).
521    ///
522    /// Named after the `session.destroy` wire RPC. Prefer `disconnect` in
523    /// new code — the wire-level "destroy" is misleading because on-disk
524    /// state is preserved.
525    pub async fn destroy(&self) -> Result<(), Error> {
526        self.disconnect().await
527    }
528
529    /// Write a log message to the session.
530    ///
531    /// Pass `None` for `opts` to use defaults (info level, persisted).
532    pub async fn log(
533        &self,
534        message: &str,
535        opts: Option<crate::types::LogOptions>,
536    ) -> Result<(), Error> {
537        let opts = opts.unwrap_or_default();
538        let level = match opts.level {
539            Some(level) => Some(serde_json::from_value(serde_json::to_value(level)?)?),
540            None => None,
541        };
542        let request = LogRequest {
543            message: message.to_string(),
544            level,
545            ephemeral: opts.ephemeral,
546            url: None,
547        };
548        self.rpc().log(request).await?;
549        Ok(())
550    }
551
552    /// Returns the UI sub-API for elicitation, confirmation, selection, and
553    /// free-form input.
554    ///
555    /// All UI methods route through `session.ui.*` RPCs and require host
556    /// support — check `session.capabilities().ui.elicitation` before use.
557    pub fn ui(&self) -> SessionUi<'_> {
558        SessionUi { session: self }
559    }
560
561    /// Returns an error if the host doesn't support elicitation.
562    fn assert_elicitation(&self) -> Result<(), Error> {
563        if self
564            .capabilities
565            .read()
566            .ui
567            .as_ref()
568            .and_then(|u| u.elicitation)
569            != Some(true)
570        {
571            return Err(Error::Session(SessionError::ElicitationNotSupported));
572        }
573        Ok(())
574    }
575}
576
577impl Drop for Session {
578    fn drop(&mut self) {
579        // Cooperative shutdown: cancel the event loop's token to signal
580        // exit between iterations. The loop will see the cancellation on
581        // its next select poll and break cleanly without interrupting an
582        // in-flight handler. We do NOT abort the JoinHandle — that would
583        // land at any await point in the loop body, potentially leaving
584        // the CLI with an unanswered request id. RFD-400 review finding
585        // #3.
586        //
587        // The handle itself is left in `event_loop` to be reaped by the
588        // tokio runtime when it next polls; we intentionally don't await
589        // it here because Drop is sync.
590        self.shutdown.cancel();
591        self.client.unregister_session(&self.id);
592    }
593}
594
595/// UI sub-API for a [`Session`] — elicitation, confirmation, selection,
596/// and free-form input.
597///
598/// Acquired via [`Session::ui`]. Methods route to `session.ui.*` RPCs and
599/// require host elicitation support — check
600/// `session.capabilities().ui.elicitation` before use.
601pub struct SessionUi<'a> {
602    session: &'a Session,
603}
604
605impl<'a> SessionUi<'a> {
606    /// Request user input via an interactive UI form (elicitation).
607    ///
608    /// Sends a JSON Schema describing form fields to the CLI host. The host
609    /// renders a form dialog and returns the user's response.
610    ///
611    /// Prefer the typed convenience methods [`confirm`](Self::confirm),
612    /// [`select`](Self::select), and [`input`](Self::input) for common cases.
613    pub async fn elicitation(
614        &self,
615        message: &str,
616        schema: Value,
617    ) -> Result<ElicitationResult, Error> {
618        self.session.assert_elicitation()?;
619        let result = self
620            .session
621            .client
622            .call(
623                "session.ui.elicitation",
624                Some(serde_json::json!({
625                    "sessionId": self.session.id,
626                    "message": message,
627                    "requestedSchema": schema,
628                })),
629            )
630            .await?;
631        let elicitation: ElicitationResult = serde_json::from_value(result)?;
632        Ok(elicitation)
633    }
634
635    /// Ask the user a yes/no confirmation question.
636    ///
637    /// Returns `true` if the user accepted and confirmed, `false` otherwise.
638    pub async fn confirm(&self, message: &str) -> Result<bool, Error> {
639        self.session.assert_elicitation()?;
640        let schema = serde_json::json!({
641            "type": "object",
642            "properties": {
643                "confirmed": {
644                    "type": "boolean",
645                    "default": true,
646                }
647            },
648            "required": ["confirmed"]
649        });
650        let result = self.elicitation(message, schema).await?;
651        Ok(result.action == "accept"
652            && result
653                .content
654                .and_then(|c| c.get("confirmed").and_then(|v| v.as_bool()))
655                == Some(true))
656    }
657
658    /// Ask the user to select from a list of options.
659    ///
660    /// Returns the selected option string on accept, or `None` on decline/cancel.
661    pub async fn select(&self, message: &str, options: &[&str]) -> Result<Option<String>, Error> {
662        self.session.assert_elicitation()?;
663        let schema = serde_json::json!({
664            "type": "object",
665            "properties": {
666                "selection": {
667                    "type": "string",
668                    "enum": options,
669                }
670            },
671            "required": ["selection"]
672        });
673        let result = self.elicitation(message, schema).await?;
674        if result.action != "accept" {
675            return Ok(None);
676        }
677        let selection = result.content.and_then(|c| {
678            c.get("selection")
679                .and_then(|v| v.as_str())
680                .map(String::from)
681        });
682        Ok(selection)
683    }
684
685    /// Ask the user for free-form text input.
686    ///
687    /// Returns the input string on accept, or `None` on decline/cancel.
688    /// Use [`InputOptions`] to set validation constraints and field metadata.
689    pub async fn input(
690        &self,
691        message: &str,
692        options: Option<&InputOptions<'_>>,
693    ) -> Result<Option<String>, Error> {
694        self.session.assert_elicitation()?;
695        let mut field = serde_json::json!({ "type": "string" });
696        if let Some(opts) = options {
697            if let Some(title) = opts.title {
698                field["title"] = Value::String(title.to_string());
699            }
700            if let Some(desc) = opts.description {
701                field["description"] = Value::String(desc.to_string());
702            }
703            if let Some(min) = opts.min_length {
704                field["minLength"] = Value::Number(min.into());
705            }
706            if let Some(max) = opts.max_length {
707                field["maxLength"] = Value::Number(max.into());
708            }
709            if let Some(fmt) = &opts.format {
710                field["format"] = Value::String(fmt.as_str().to_string());
711            }
712            if let Some(default) = opts.default {
713                field["default"] = Value::String(default.to_string());
714            }
715        }
716        let schema = serde_json::json!({
717            "type": "object",
718            "properties": { "value": field },
719            "required": ["value"]
720        });
721        let result = self.elicitation(message, schema).await?;
722        if result.action != "accept" {
723            return Ok(None);
724        }
725        let value = result
726            .content
727            .and_then(|c| c.get("value").and_then(|v| v.as_str()).map(String::from));
728        Ok(value)
729    }
730}
731
732impl Client {
733    /// Create a new session on the CLI.
734    ///
735    /// Sends `session.create`, registers the session on the router,
736    /// and spawns an internal event loop that dispatches to the handler.
737    ///
738    /// All callbacks (event handler, hooks, transform) are configured
739    /// via [`SessionConfig`] using [`with_handler`](SessionConfig::with_handler),
740    /// [`with_hooks`](SessionConfig::with_hooks), and
741    /// [`with_transform`](SessionConfig::with_transform).
742    ///
743    /// If [`hooks_handler`](SessionConfig::hooks_handler) is set, the
744    /// wire-level `hooks` flag is automatically enabled.
745    ///
746    /// If [`transform`](SessionConfig::transform) is set, the SDK injects
747    /// `action: "transform"` sections into the [`SystemMessageConfig`] wire
748    /// format and handles `systemMessage.transform` RPC callbacks during
749    /// the session.
750    ///
751    /// If [`handler`](SessionConfig::handler) is `None`, the session uses
752    /// [`DenyAllHandler`](crate::handler::DenyAllHandler) — permission
753    /// requests are denied; other events are no-ops.
754    pub async fn create_session(&self, mut config: SessionConfig) -> Result<Session, Error> {
755        let total_start = Instant::now();
756        let handler = config
757            .handler
758            .take()
759            .unwrap_or_else(|| Arc::new(crate::handler::DenyAllHandler));
760        let hooks = config.hooks_handler.take();
761        let transforms = config.transform.take();
762        let tools_count = config.tools.as_ref().map_or(0, Vec::len);
763        let commands_count = config.commands.as_ref().map_or(0, Vec::len);
764        let has_hooks = hooks.is_some();
765        let command_handlers = build_command_handler_map(config.commands.as_deref());
766        let session_fs_provider = config.session_fs_provider.take();
767        if self.inner.session_fs_configured && session_fs_provider.is_none() {
768            return Err(Error::Session(SessionError::SessionFsProviderRequired));
769        }
770
771        if hooks.is_some() && config.hooks.is_none() {
772            config.hooks = Some(true);
773        }
774        if let Some(ref transforms) = transforms {
775            inject_transform_sections(&mut config, transforms.as_ref());
776        }
777        let session_id = config
778            .session_id
779            .clone()
780            .unwrap_or_else(|| SessionId::from(uuid::Uuid::new_v4().to_string()));
781        config.session_id = Some(session_id.clone());
782        let mut params = serde_json::to_value(&config)?;
783        let trace_ctx = self.resolve_trace_context().await;
784        inject_trace_context(&mut params, &trace_ctx);
785
786        let setup_start = Instant::now();
787        let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default()));
788        let channels = self.register_session(&session_id);
789        let idle_waiter = Arc::new(ParkingLotMutex::new(None));
790        let shutdown = CancellationToken::new();
791        let (event_tx, _) = tokio::sync::broadcast::channel(512);
792        let event_loop = spawn_event_loop(
793            session_id.clone(),
794            self.clone(),
795            handler,
796            hooks,
797            transforms,
798            command_handlers,
799            session_fs_provider,
800            channels,
801            idle_waiter.clone(),
802            capabilities.clone(),
803            event_tx.clone(),
804            shutdown.clone(),
805        );
806        let mut registration =
807            PendingSessionRegistration::new(self.clone(), session_id.clone(), shutdown.clone());
808        tracing::debug!(
809            elapsed_ms = setup_start.elapsed().as_millis(),
810            session_id = %session_id,
811            tools_count,
812            commands_count,
813            has_hooks,
814            "Client::create_session local setup complete"
815        );
816
817        let rpc_start = Instant::now();
818        let result = match self.call("session.create", Some(params)).await {
819            Ok(result) => result,
820            Err(error) => {
821                registration.cleanup(event_loop).await;
822                return Err(error);
823            }
824        };
825        tracing::debug!(
826            elapsed_ms = rpc_start.elapsed().as_millis(),
827            "Client::create_session session creation request completed successfully"
828        );
829        let create_result: CreateSessionResult = match serde_json::from_value(result) {
830            Ok(result) => result,
831            Err(error) => {
832                registration.cleanup(event_loop).await;
833                return Err(error.into());
834            }
835        };
836        if create_result.session_id != session_id {
837            registration.cleanup(event_loop).await;
838            return Err(Error::Session(SessionError::SessionIdMismatch {
839                requested: session_id,
840                returned: create_result.session_id,
841            }));
842        }
843        *capabilities.write() = create_result.capabilities.unwrap_or_default();
844
845        tracing::debug!(
846            elapsed_ms = total_start.elapsed().as_millis(),
847            session_id = %session_id,
848            "Client::create_session complete"
849        );
850        registration.disarm();
851        Ok(Session {
852            id: session_id,
853            cwd: self.cwd().clone(),
854            workspace_path: create_result.workspace_path,
855            remote_url: create_result.remote_url,
856            client: self.clone(),
857            event_loop: ParkingLotMutex::new(Some(event_loop)),
858            shutdown,
859            idle_waiter,
860            capabilities,
861            event_tx,
862        })
863    }
864
865    /// Resume an existing session on the CLI.
866    ///
867    /// Sends `session.resume` and `session.skills.reload`, registers the
868    /// session on the router, and spawns the event loop.
869    ///
870    /// All callbacks (event handler, hooks, transform) are configured
871    /// via [`ResumeSessionConfig`] using its `with_*` builder methods.
872    ///
873    /// See [`Self::create_session`] for the defaults applied when callback
874    /// fields are unset.
875    pub async fn resume_session(&self, mut config: ResumeSessionConfig) -> Result<Session, Error> {
876        let total_start = Instant::now();
877        let handler = config
878            .handler
879            .take()
880            .unwrap_or_else(|| Arc::new(crate::handler::DenyAllHandler));
881        let hooks = config.hooks_handler.take();
882        let transforms = config.transform.take();
883        let tools_count = config.tools.as_ref().map_or(0, Vec::len);
884        let commands_count = config.commands.as_ref().map_or(0, Vec::len);
885        let has_hooks = hooks.is_some();
886        let command_handlers = build_command_handler_map(config.commands.as_deref());
887        let session_fs_provider = config.session_fs_provider.take();
888        if self.inner.session_fs_configured && session_fs_provider.is_none() {
889            return Err(Error::Session(SessionError::SessionFsProviderRequired));
890        }
891
892        if hooks.is_some() && config.hooks.is_none() {
893            config.hooks = Some(true);
894        }
895        if let Some(ref transforms) = transforms {
896            inject_transform_sections_resume(&mut config, transforms.as_ref());
897        }
898        let session_id = config.session_id.clone();
899        let mut params = serde_json::to_value(&config)?;
900        let trace_ctx = self.resolve_trace_context().await;
901        inject_trace_context(&mut params, &trace_ctx);
902
903        let capabilities = Arc::new(parking_lot::RwLock::new(SessionCapabilities::default()));
904        let setup_start = Instant::now();
905        let channels = self.register_session(&session_id);
906        let idle_waiter = Arc::new(ParkingLotMutex::new(None));
907        let shutdown = CancellationToken::new();
908        let (event_tx, _) = tokio::sync::broadcast::channel(512);
909        let event_loop = spawn_event_loop(
910            session_id.clone(),
911            self.clone(),
912            handler,
913            hooks,
914            transforms,
915            command_handlers,
916            session_fs_provider,
917            channels,
918            idle_waiter.clone(),
919            capabilities.clone(),
920            event_tx.clone(),
921            shutdown.clone(),
922        );
923        let mut registration =
924            PendingSessionRegistration::new(self.clone(), session_id.clone(), shutdown.clone());
925        tracing::debug!(
926            elapsed_ms = setup_start.elapsed().as_millis(),
927            session_id = %session_id,
928            tools_count,
929            commands_count,
930            has_hooks,
931            "Client::resume_session local setup complete"
932        );
933
934        let rpc_start = Instant::now();
935        let result = match self.call("session.resume", Some(params)).await {
936            Ok(result) => result,
937            Err(error) => {
938                registration.cleanup(event_loop).await;
939                return Err(error);
940            }
941        };
942        tracing::debug!(
943            elapsed_ms = rpc_start.elapsed().as_millis(),
944            session_id = %session_id,
945            "Client::resume_session session resume request completed successfully"
946        );
947
948        // The CLI may reassign the session ID on resume.
949        let cli_session_id: SessionId = result
950            .get("sessionId")
951            .and_then(|v| v.as_str())
952            .unwrap_or(&session_id)
953            .into();
954        if cli_session_id != session_id {
955            registration.cleanup(event_loop).await;
956            return Err(Error::Session(SessionError::SessionIdMismatch {
957                requested: session_id,
958                returned: cli_session_id,
959            }));
960        }
961
962        let resume_capabilities: Option<SessionCapabilities> = result
963            .get("capabilities")
964            .and_then(|v| {
965                serde_json::from_value(v.clone())
966                    .map_err(|e| warn!(error = %e, "failed to deserialize capabilities from resume response"))
967                    .ok()
968            });
969        let remote_url = result
970            .get("remoteUrl")
971            .or_else(|| result.get("remote_url"))
972            .and_then(|value| value.as_str())
973            .map(ToString::to_string);
974
975        // Reload skills after resume (best-effort).
976        let skills_reload_start = Instant::now();
977        if let Err(e) = self
978            .call(
979                "session.skills.reload",
980                Some(serde_json::json!({ "sessionId": session_id })),
981            )
982            .await
983        {
984            warn!(
985                elapsed_ms = skills_reload_start.elapsed().as_millis(),
986                session_id = %session_id,
987                error = %e,
988                "Client::resume_session skills reload request failed"
989            );
990        } else {
991            tracing::debug!(
992                elapsed_ms = skills_reload_start.elapsed().as_millis(),
993                session_id = %session_id,
994                "Client::resume_session skills reload request completed successfully"
995            );
996        }
997
998        *capabilities.write() = resume_capabilities.unwrap_or_default();
999
1000        tracing::debug!(
1001            elapsed_ms = total_start.elapsed().as_millis(),
1002            session_id = %session_id,
1003            "Client::resume_session complete"
1004        );
1005        registration.disarm();
1006        Ok(Session {
1007            id: session_id,
1008            cwd: self.cwd().clone(),
1009            workspace_path: None,
1010            remote_url,
1011            client: self.clone(),
1012            event_loop: ParkingLotMutex::new(Some(event_loop)),
1013            shutdown,
1014            idle_waiter,
1015            capabilities,
1016            event_tx,
1017        })
1018    }
1019}
1020
1021type CommandHandlerMap = HashMap<String, Arc<dyn CommandHandler>>;
1022
1023fn build_command_handler_map(commands: Option<&[CommandDefinition]>) -> Arc<CommandHandlerMap> {
1024    let map = match commands {
1025        Some(commands) => commands
1026            .iter()
1027            .filter(|cmd| !cmd.name.is_empty())
1028            .map(|cmd| (cmd.name.clone(), cmd.handler.clone()))
1029            .collect(),
1030        None => HashMap::new(),
1031    };
1032    Arc::new(map)
1033}
1034
1035#[allow(clippy::too_many_arguments)]
1036fn spawn_event_loop(
1037    session_id: SessionId,
1038    client: Client,
1039    handler: Arc<dyn SessionHandler>,
1040    hooks: Option<Arc<dyn SessionHooks>>,
1041    transforms: Option<Arc<dyn SystemMessageTransform>>,
1042    command_handlers: Arc<CommandHandlerMap>,
1043    session_fs_provider: Option<Arc<dyn SessionFsProvider>>,
1044    channels: crate::router::SessionChannels,
1045    idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
1046    capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
1047    event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
1048    shutdown: CancellationToken,
1049) -> JoinHandle<()> {
1050    let crate::router::SessionChannels {
1051        mut notifications,
1052        mut requests,
1053    } = channels;
1054
1055    let span = tracing::error_span!("session_event_loop", session_id = %session_id);
1056    tokio::spawn(
1057        async move {
1058            loop {
1059                // `mpsc::UnboundedReceiver::recv` and
1060                // `CancellationToken::cancelled` are both cancel-safe per
1061                // RFD 400. The selected branch's `await`'d handler is
1062                // *not* mid-cancelled by the select — once a branch fires
1063                // it runs to completion within the loop's iteration.
1064                // Spawned child tasks inside `handle_notification`
1065                // (permission/tool/elicitation callbacks) intentionally
1066                // outlive the parent loop and own their own cleanup;
1067                // this is RFD 400's "spawn background tasks to perform
1068                // cancel-unsafe operations" pattern and is correct as-is.
1069                tokio::select! {
1070                    _ = shutdown.cancelled() => break,
1071                    Some(notification) = notifications.recv() => {
1072                        handle_notification(
1073                            &session_id, &client, &handler, &command_handlers, notification, &idle_waiter, &capabilities, &event_tx,
1074                        ).await;
1075                    }
1076                    Some(request) = requests.recv() => {
1077                        handle_request(
1078                            &session_id, &client, &handler, hooks.as_deref(), transforms.as_deref(), session_fs_provider.as_ref(), request,
1079                        ).await;
1080                    }
1081                    else => break,
1082                }
1083            }
1084            // Channels closed or shutdown signaled — fail any pending
1085            // send_and_wait so the caller observes a clean error.
1086            if let Some(waiter) = idle_waiter.lock().take() {
1087                let _ = waiter
1088                    .tx
1089                    .send(Err(Error::Session(SessionError::EventLoopClosed)));
1090            }
1091        }
1092        .instrument(span),
1093    )
1094}
1095
1096fn extract_request_id(data: &Value) -> Option<RequestId> {
1097    data.get("requestId")
1098        .and_then(|v| v.as_str())
1099        .filter(|s| !s.is_empty())
1100        .map(RequestId::new)
1101}
1102
1103fn pending_permission_result_kind(response: &HandlerResponse) -> &'static str {
1104    match response {
1105        HandlerResponse::Permission(PermissionResult::Approved) => "approve-once",
1106        HandlerResponse::Permission(PermissionResult::Denied) => "reject",
1107        HandlerResponse::Permission(PermissionResult::NoResult) => "no-result",
1108        // Fallback to "user-not-available" for UserNotAvailable, Deferred (when
1109        // forced through this path), Custom (handled separately upstream), and
1110        // any non-permission HandlerResponse that gets here defensively.
1111        _ => "user-not-available",
1112    }
1113}
1114
1115fn permission_request_response(response: &HandlerResponse) -> PermissionDecision {
1116    match response {
1117        HandlerResponse::Permission(PermissionResult::Approved) => {
1118            PermissionDecision::ApproveOnce(PermissionDecisionApproveOnce {
1119                kind: PermissionDecisionApproveOnceKind::ApproveOnce,
1120            })
1121        }
1122        _ => PermissionDecision::Reject(PermissionDecisionReject {
1123            kind: PermissionDecisionRejectKind::Reject,
1124            feedback: None,
1125        }),
1126    }
1127}
1128
1129/// Map a handler response into the `result` payload for the notification
1130/// path (`session.permissions.handlePendingPermissionRequest`).
1131///
1132/// Returns `None` when the SDK must not respond — currently only the
1133/// [`PermissionResult::Deferred`] case, where the handler takes over
1134/// responsibility for the round-trip itself.
1135fn notification_permission_payload(response: &HandlerResponse) -> Option<Value> {
1136    match response {
1137        HandlerResponse::Permission(PermissionResult::Deferred) => None,
1138        HandlerResponse::Permission(PermissionResult::Custom(value)) => Some(value.clone()),
1139        _ => Some(serde_json::json!({
1140            "kind": pending_permission_result_kind(response),
1141        })),
1142    }
1143}
1144
1145/// Map a handler response into the JSON-RPC `result` payload for the
1146/// direct-RPC path (`permission.request`).
1147///
1148/// Always returns a value. [`PermissionResult::Deferred`] is treated as
1149/// [`PermissionResult::Approved`] here because the JSON-RPC contract
1150/// requires a reply — see the variant's doc comment.
1151fn direct_permission_payload(response: &HandlerResponse) -> Value {
1152    match response {
1153        HandlerResponse::Permission(PermissionResult::Custom(value)) => value.clone(),
1154        HandlerResponse::Permission(PermissionResult::Deferred) => serde_json::to_value(
1155            permission_request_response(&HandlerResponse::Permission(PermissionResult::Approved)),
1156        )
1157        .expect("serializing direct permission response should succeed"),
1158        HandlerResponse::Permission(PermissionResult::NoResult)
1159        | HandlerResponse::Permission(PermissionResult::UserNotAvailable) => serde_json::json!({
1160            "kind": pending_permission_result_kind(response),
1161        }),
1162        _ => serde_json::to_value(permission_request_response(response))
1163            .expect("serializing direct permission response should succeed"),
1164    }
1165}
1166
1167/// Process a notification from the CLI's broadcast channel.
1168#[allow(clippy::too_many_arguments)]
1169async fn handle_notification(
1170    session_id: &SessionId,
1171    client: &Client,
1172    handler: &Arc<dyn SessionHandler>,
1173    command_handlers: &Arc<CommandHandlerMap>,
1174    notification: SessionEventNotification,
1175    idle_waiter: &Arc<ParkingLotMutex<Option<IdleWaiter>>>,
1176    capabilities: &Arc<parking_lot::RwLock<SessionCapabilities>>,
1177    event_tx: &tokio::sync::broadcast::Sender<SessionEvent>,
1178) {
1179    let dispatch_start = Instant::now();
1180    let event = notification.event.clone();
1181    let event_type = event.parsed_type();
1182    if event_type == SessionEventType::PermissionRequested {
1183        tracing::debug!(
1184            session_id = %session_id,
1185            event_type = %event.event_type,
1186            "Session::handle_notification permission request received"
1187        );
1188    }
1189
1190    // Signal send_and_wait if active. The lock is only contended when
1191    // a send_and_wait call is in flight (idle_waiter is Some).
1192    match event_type {
1193        SessionEventType::AssistantMessage
1194        | SessionEventType::SessionIdle
1195        | SessionEventType::SessionError => {
1196            let mut guard = idle_waiter.lock();
1197            if let Some(waiter) = guard.as_mut() {
1198                match event_type {
1199                    SessionEventType::AssistantMessage => {
1200                        if !waiter.first_assistant_message_seen {
1201                            waiter.first_assistant_message_seen = true;
1202                            tracing::debug!(
1203                                elapsed_ms = waiter.started_at.elapsed().as_millis(),
1204                                session_id = %session_id,
1205                                "Session::send_and_wait first assistant message"
1206                            );
1207                        }
1208                        waiter.last_assistant_message = Some(event.clone());
1209                    }
1210                    SessionEventType::SessionIdle | SessionEventType::SessionError => {
1211                        if let Some(waiter) = guard.take() {
1212                            if event_type == SessionEventType::SessionIdle {
1213                                tracing::debug!(
1214                                    elapsed_ms = waiter.started_at.elapsed().as_millis(),
1215                                    session_id = %session_id,
1216                                    "Session::send_and_wait idle received"
1217                                );
1218                                let _ = waiter.tx.send(Ok(waiter.last_assistant_message));
1219                            } else {
1220                                let error_msg = event
1221                                    .typed_data::<SessionErrorData>()
1222                                    .map(|d| d.message)
1223                                    .or_else(|| {
1224                                        event
1225                                            .data
1226                                            .get("message")
1227                                            .and_then(|v| v.as_str())
1228                                            .map(|s| s.to_string())
1229                                    })
1230                                    .unwrap_or_else(|| "session error".to_string());
1231                                let _ = waiter
1232                                    .tx
1233                                    .send(Err(Error::Session(SessionError::AgentError(error_msg))));
1234                            }
1235                        }
1236                    }
1237                    _ => {}
1238                }
1239            }
1240        }
1241        _ => {}
1242    }
1243
1244    // Fan out the event to runtime subscribers (`Session::subscribe`). `send`
1245    // only errors when there are no receivers, which is the normal case
1246    // before any consumer subscribes.
1247    let _ = event_tx.send(event.clone());
1248
1249    // Fire-and-forget dispatch for the general event.
1250    handler
1251        .on_event(HandlerEvent::SessionEvent {
1252            session_id: session_id.clone(),
1253            event,
1254        })
1255        .await;
1256
1257    // Update capabilities when the CLI reports changes. The CLI sends
1258    // the full updated capabilities object — replace wholesale so removals
1259    // and new subfields are handled correctly.
1260    if event_type == SessionEventType::CapabilitiesChanged {
1261        match serde_json::from_value::<SessionCapabilities>(notification.event.data.clone()) {
1262            Ok(changed) => *capabilities.write() = changed,
1263            Err(e) => warn!(error = %e, "failed to deserialize capabilities.changed payload"),
1264        }
1265    }
1266
1267    tracing::debug!(
1268        elapsed_ms = dispatch_start.elapsed().as_millis(),
1269        session_id = %session_id,
1270        event_type = %notification.event.event_type,
1271        "Session::handle_notification dispatch"
1272    );
1273
1274    // Notification-based permission/tool/elicitation requests require a
1275    // separate RPC callback. Spawn concurrently since the CLI doesn't block.
1276    match event_type {
1277        SessionEventType::PermissionRequested => {
1278            let Some(request_id) = extract_request_id(&notification.event.data) else {
1279                return;
1280            };
1281            let client = client.clone();
1282            let handler = handler.clone();
1283            let sid = session_id.clone();
1284            let data: PermissionRequestData =
1285                serde_json::from_value(notification.event.data.clone()).unwrap_or_else(|_| {
1286                    PermissionRequestData {
1287                        kind: None,
1288                        tool_call_id: None,
1289                        extra: notification.event.data.clone(),
1290                    }
1291                });
1292            let span = tracing::error_span!(
1293                "permission_request_handler",
1294                session_id = %sid,
1295                request_id = %request_id
1296            );
1297            tokio::spawn(
1298                async move {
1299                    let handler_start = Instant::now();
1300                    let response = handler
1301                        .on_event(HandlerEvent::PermissionRequest {
1302                            session_id: sid.clone(),
1303                            request_id: request_id.clone(),
1304                            data,
1305                        })
1306                        .await;
1307                    tracing::debug!(
1308                        elapsed_ms = handler_start.elapsed().as_millis(),
1309                        session_id = %sid,
1310                        request_id = %request_id,
1311                        "SessionHandler::on_permission_request dispatch"
1312                    );
1313                    let Some(result_value) = notification_permission_payload(&response) else {
1314                        // Handler returned Deferred — it will call
1315                        // handlePendingPermissionRequest itself.
1316                        return;
1317                    };
1318                    let rpc_start = Instant::now();
1319                    let _ = client
1320                        .call(
1321                            "session.permissions.handlePendingPermissionRequest",
1322                            Some(serde_json::json!({
1323                                "sessionId": sid,
1324                                "requestId": request_id,
1325                                "result": result_value,
1326                            })),
1327                        )
1328                        .await;
1329                    tracing::debug!(
1330                        elapsed_ms = rpc_start.elapsed().as_millis(),
1331                        session_id = %sid,
1332                        request_id = %request_id,
1333                        "Session::handle_notification response sent successfully"
1334                    );
1335                }
1336                .instrument(span),
1337            );
1338        }
1339        SessionEventType::ExternalToolRequested => {
1340            let Some(request_id) = extract_request_id(&notification.event.data) else {
1341                return;
1342            };
1343            let data: ExternalToolRequestedData =
1344                match serde_json::from_value(notification.event.data.clone()) {
1345                    Ok(d) => d,
1346                    Err(e) => {
1347                        warn!(error = %e, "failed to deserialize external_tool.requested");
1348                        let client = client.clone();
1349                        let sid = session_id.clone();
1350                        let span = tracing::error_span!(
1351                            "external_tool_deserialize_error",
1352                            session_id = %sid,
1353                            request_id = %request_id
1354                        );
1355                        tokio::spawn(
1356                            async move {
1357                                let rpc_start = Instant::now();
1358                                let _ = client
1359                                .call(
1360                                    "session.tools.handlePendingToolCall",
1361                                    Some(serde_json::json!({
1362                                        "sessionId": sid,
1363                                        "requestId": request_id,
1364                                        "error": format!("Failed to deserialize tool request: {e}"),
1365                                    })),
1366                                )
1367                                .await;
1368                                tracing::debug!(
1369                                    elapsed_ms = rpc_start.elapsed().as_millis(),
1370                                    session_id = %sid,
1371                                    request_id = %request_id,
1372                                    "Session::handle_notification response sent successfully"
1373                                );
1374                            }
1375                            .instrument(span),
1376                        );
1377                        return;
1378                    }
1379                };
1380            let client = client.clone();
1381            let handler = handler.clone();
1382            let sid = session_id.clone();
1383            let span = tracing::error_span!(
1384                "external_tool_handler",
1385                session_id = %sid,
1386                request_id = %request_id
1387            );
1388            tokio::spawn(
1389                async move {
1390                    if data.tool_call_id.is_empty() || data.tool_name.is_empty() {
1391                        let error_msg = if data.tool_call_id.is_empty() {
1392                            "Missing toolCallId"
1393                        } else {
1394                            "Missing toolName"
1395                        };
1396                        let rpc_start = Instant::now();
1397                        let _ = client
1398                            .call(
1399                                "session.tools.handlePendingToolCall",
1400                                Some(serde_json::json!({
1401                                    "sessionId": sid,
1402                                    "requestId": request_id,
1403                                    "error": error_msg,
1404                                })),
1405                            )
1406                            .await;
1407                        tracing::debug!(
1408                            elapsed_ms = rpc_start.elapsed().as_millis(),
1409                            session_id = %sid,
1410                            request_id = %request_id,
1411                            "Session::handle_notification response sent successfully"
1412                        );
1413                        return;
1414                    }
1415                    let tool_call_id = data.tool_call_id.clone();
1416                    let tool_name = data.tool_name.clone();
1417                    let invocation = ToolInvocation {
1418                        session_id: sid.clone(),
1419                        tool_call_id: data.tool_call_id,
1420                        tool_name: data.tool_name,
1421                        arguments: data
1422                            .arguments
1423                            .unwrap_or(Value::Object(serde_json::Map::new())),
1424                        traceparent: data.traceparent,
1425                        tracestate: data.tracestate,
1426                    };
1427                    let handler_start = Instant::now();
1428                    let response = handler
1429                        .on_event(HandlerEvent::ExternalTool { invocation })
1430                        .await;
1431                    tracing::debug!(
1432                        elapsed_ms = handler_start.elapsed().as_millis(),
1433                        session_id = %sid,
1434                        request_id = %request_id,
1435                        tool_call_id = %tool_call_id,
1436                        tool_name = %tool_name,
1437                        "ToolHandler::call dispatch"
1438                    );
1439                    let tool_result = match response {
1440                        HandlerResponse::ToolResult(r) => r,
1441                        _ => ToolResult::Text("Unexpected handler response".to_string()),
1442                    };
1443                    let result_value = serde_json::to_value(&tool_result).unwrap_or(Value::Null);
1444                    let rpc_start = Instant::now();
1445                    let _ = client
1446                        .call(
1447                            "session.tools.handlePendingToolCall",
1448                            Some(serde_json::json!({
1449                                "sessionId": sid,
1450                                "requestId": request_id,
1451                                "result": result_value,
1452                            })),
1453                        )
1454                        .await;
1455                    tracing::debug!(
1456                        elapsed_ms = rpc_start.elapsed().as_millis(),
1457                        session_id = %sid,
1458                        request_id = %request_id,
1459                        tool_call_id = %tool_call_id,
1460                        tool_name = %tool_name,
1461                        "Session::handle_notification response sent successfully"
1462                    );
1463                }
1464                .instrument(span),
1465            );
1466        }
1467        SessionEventType::UserInputRequested => {
1468            // Notification-only signal for observers (UI, telemetry).
1469            // The CLI follows up with a `userInput.request` JSON-RPC call
1470            // that drives `HandlerEvent::UserInput` dispatch — handling
1471            // the notification here too would double-fire the handler
1472            // and produce duplicate prompts on the consumer side. See
1473            // github/github-app#4249.
1474        }
1475        SessionEventType::ElicitationRequested => {
1476            let Some(request_id) = extract_request_id(&notification.event.data) else {
1477                return;
1478            };
1479            let elicitation_data: ElicitationRequestedData =
1480                match serde_json::from_value(notification.event.data.clone()) {
1481                    Ok(d) => d,
1482                    Err(e) => {
1483                        warn!(error = %e, "failed to deserialize elicitation request");
1484                        return;
1485                    }
1486                };
1487            let request = ElicitationRequest {
1488                message: elicitation_data.message,
1489                requested_schema: elicitation_data
1490                    .requested_schema
1491                    .map(|s| serde_json::to_value(s).unwrap_or(Value::Null)),
1492                mode: elicitation_data.mode.map(|m| match m {
1493                    crate::generated::session_events::ElicitationRequestedMode::Form => {
1494                        crate::types::ElicitationMode::Form
1495                    }
1496                    crate::generated::session_events::ElicitationRequestedMode::Url => {
1497                        crate::types::ElicitationMode::Url
1498                    }
1499                    _ => crate::types::ElicitationMode::Unknown,
1500                }),
1501                elicitation_source: elicitation_data.elicitation_source,
1502                url: elicitation_data.url,
1503            };
1504            let client = client.clone();
1505            let handler = handler.clone();
1506            let sid = session_id.clone();
1507            let span = tracing::error_span!(
1508                "elicitation_request_handler",
1509                session_id = %sid,
1510                request_id = %request_id
1511            );
1512            tokio::spawn(
1513                async move {
1514                    let cancel = ElicitationResult {
1515                        action: "cancel".to_string(),
1516                        content: None,
1517                    };
1518                    // Dispatch to a nested task so panics are caught as JoinErrors.
1519                    let handler_task = tokio::spawn({
1520                        let sid = sid.clone();
1521                        let request_id = request_id.clone();
1522                        let span = tracing::error_span!(
1523                            "elicitation_callback",
1524                            session_id = %sid,
1525                            request_id = %request_id
1526                        );
1527                        async move {
1528                            let handler_start = Instant::now();
1529                            let response = handler
1530                                .on_event(HandlerEvent::ElicitationRequest {
1531                                    session_id: sid.clone(),
1532                                    request_id: request_id.clone(),
1533                                    request,
1534                                })
1535                                .await;
1536                            tracing::debug!(
1537                                elapsed_ms = handler_start.elapsed().as_millis(),
1538                                session_id = %sid,
1539                                request_id = %request_id,
1540                                "SessionHandler::on_elicitation dispatch"
1541                            );
1542                            response
1543                        }
1544                        .instrument(span)
1545                    });
1546                    let result = match handler_task.await {
1547                        Ok(HandlerResponse::Elicitation(r)) => r,
1548                        _ => cancel.clone(),
1549                    };
1550                    let rpc_start = Instant::now();
1551                    if let Err(e) = client
1552                        .call(
1553                            "session.ui.handlePendingElicitation",
1554                            Some(serde_json::json!({
1555                                "sessionId": sid,
1556                                "requestId": request_id,
1557                                "result": result,
1558                            })),
1559                        )
1560                        .await
1561                    {
1562                        // RPC failed — attempt cancel as last resort
1563                        warn!(error = %e, "handlePendingElicitation failed, sending cancel");
1564                        let _ = client
1565                            .call(
1566                                "session.ui.handlePendingElicitation",
1567                                Some(serde_json::json!({
1568                                    "sessionId": sid,
1569                                    "requestId": request_id,
1570                                    "result": cancel,
1571                                })),
1572                            )
1573                            .await;
1574                    } else {
1575                        tracing::debug!(
1576                            elapsed_ms = rpc_start.elapsed().as_millis(),
1577                            session_id = %sid,
1578                            request_id = %request_id,
1579                            "Session::handle_notification response sent successfully"
1580                        );
1581                    }
1582                }
1583                .instrument(span),
1584            );
1585        }
1586        SessionEventType::CommandExecute => {
1587            let data: CommandExecuteData =
1588                match serde_json::from_value(notification.event.data.clone()) {
1589                    Ok(d) => d,
1590                    Err(e) => {
1591                        warn!(error = %e, "failed to deserialize command.execute");
1592                        return;
1593                    }
1594                };
1595            let client = client.clone();
1596            let command_handlers = command_handlers.clone();
1597            let sid = session_id.clone();
1598            let span = tracing::error_span!("command_handler", session_id = %sid);
1599            tokio::spawn(
1600                async move {
1601                    let request_id = data.request_id;
1602                    let ack_error = match command_handlers.get(&data.command_name).cloned() {
1603                        None => Some(format!("Unknown command: {}", data.command_name)),
1604                        Some(handler) => {
1605                            let command_name = data.command_name.clone();
1606                            let ctx = CommandContext {
1607                                session_id: sid.clone(),
1608                                command: data.command,
1609                                command_name: data.command_name,
1610                                args: data.args,
1611                            };
1612                            let handler_start = Instant::now();
1613                            let result = handler.on_command(ctx).await;
1614                            tracing::debug!(
1615                                elapsed_ms = handler_start.elapsed().as_millis(),
1616                                session_id = %sid,
1617                                request_id = %request_id,
1618                                command_name = %command_name,
1619                                "CommandHandler::call dispatch"
1620                            );
1621                            match result {
1622                                Ok(()) => None,
1623                                Err(e) => Some(e.to_string()),
1624                            }
1625                        }
1626                    };
1627                    let mut params = serde_json::json!({
1628                        "sessionId": sid,
1629                        "requestId": request_id,
1630                    });
1631                    if let Some(error_msg) = ack_error {
1632                        params["error"] = serde_json::Value::String(error_msg);
1633                    }
1634                    let rpc_start = Instant::now();
1635                    let _ = client
1636                        .call("session.commands.handlePendingCommand", Some(params))
1637                        .await;
1638                    tracing::debug!(
1639                        elapsed_ms = rpc_start.elapsed().as_millis(),
1640                        session_id = %sid,
1641                        request_id = %request_id,
1642                        "Session::handle_notification response sent successfully"
1643                    );
1644                }
1645                .instrument(span),
1646            );
1647        }
1648        _ => {}
1649    }
1650}
1651
1652/// Process a JSON-RPC request from the CLI.
1653async fn handle_request(
1654    session_id: &SessionId,
1655    client: &Client,
1656    handler: &Arc<dyn SessionHandler>,
1657    hooks: Option<&dyn SessionHooks>,
1658    transforms: Option<&dyn SystemMessageTransform>,
1659    session_fs_provider: Option<&Arc<dyn SessionFsProvider>>,
1660    request: crate::JsonRpcRequest,
1661) {
1662    let sid = session_id.clone();
1663
1664    if request.method.starts_with("sessionFs.") {
1665        crate::session_fs_dispatch::dispatch(client, session_fs_provider, request).await;
1666        return;
1667    }
1668
1669    match request.method.as_str() {
1670        "hooks.invoke" => {
1671            let params = request.params.as_ref();
1672            let hook_type = params
1673                .and_then(|p| p.get("hookType"))
1674                .and_then(|v| v.as_str())
1675                .unwrap_or("");
1676            let input = params
1677                .and_then(|p| p.get("input"))
1678                .cloned()
1679                .unwrap_or(Value::Object(Default::default()));
1680
1681            let rpc_result = if let Some(hooks) = hooks {
1682                match crate::hooks::dispatch_hook(hooks, &sid, hook_type, input).await {
1683                    Ok(output) => output,
1684                    Err(e) => {
1685                        warn!(error = %e, hook_type = hook_type, "hook dispatch failed");
1686                        serde_json::json!({ "output": {} })
1687                    }
1688                }
1689            } else {
1690                serde_json::json!({ "output": {} })
1691            };
1692
1693            let rpc_response = JsonRpcResponse {
1694                jsonrpc: "2.0".to_string(),
1695                id: request.id,
1696                result: Some(rpc_result),
1697                error: None,
1698            };
1699            let _ = client.send_response(&rpc_response).await;
1700        }
1701
1702        "tool.call" => {
1703            let invocation: ToolInvocation = match request
1704                .params
1705                .as_ref()
1706                .and_then(|p| serde_json::from_value::<ToolInvocation>(p.clone()).ok())
1707            {
1708                Some(inv) => inv,
1709                None => {
1710                    let _ = send_error_response(
1711                        client,
1712                        request.id,
1713                        error_codes::INVALID_PARAMS,
1714                        "invalid tool.call params",
1715                    )
1716                    .await;
1717                    return;
1718                }
1719            };
1720            let tool_call_id = invocation.tool_call_id.clone();
1721            let tool_name = invocation.tool_name.clone();
1722            let handler_start = Instant::now();
1723            let response = handler
1724                .on_event(HandlerEvent::ExternalTool { invocation })
1725                .await;
1726            tracing::debug!(
1727                elapsed_ms = handler_start.elapsed().as_millis(),
1728                session_id = %sid,
1729                tool_call_id = %tool_call_id,
1730                tool_name = %tool_name,
1731                "ToolHandler::call dispatch"
1732            );
1733            let tool_result = match response {
1734                HandlerResponse::ToolResult(r) => r,
1735                _ => ToolResult::Text("Unexpected handler response".to_string()),
1736            };
1737            let rpc_response = JsonRpcResponse {
1738                jsonrpc: "2.0".to_string(),
1739                id: request.id,
1740                result: Some(serde_json::json!(ToolResultResponse {
1741                    result: tool_result
1742                })),
1743                error: None,
1744            };
1745            let _ = client.send_response(&rpc_response).await;
1746        }
1747
1748        "userInput.request" => {
1749            let params = request.params.as_ref();
1750            let Some(question) = params
1751                .and_then(|p| p.get("question"))
1752                .and_then(|v| v.as_str())
1753            else {
1754                warn!("userInput.request missing 'question' field");
1755                let rpc_response = JsonRpcResponse {
1756                    jsonrpc: "2.0".to_string(),
1757                    id: request.id,
1758                    result: None,
1759                    error: Some(crate::JsonRpcError {
1760                        code: error_codes::INVALID_PARAMS,
1761                        message: "missing required field: question".to_string(),
1762                        data: None,
1763                    }),
1764                };
1765                let _ = client.send_response(&rpc_response).await;
1766                return;
1767            };
1768            let question = question.to_string();
1769            let choices = params
1770                .and_then(|p| p.get("choices"))
1771                .and_then(|v| v.as_array())
1772                .map(|arr| {
1773                    arr.iter()
1774                        .filter_map(|v| v.as_str().map(|s| s.to_string()))
1775                        .collect()
1776                });
1777            let allow_freeform = params
1778                .and_then(|p| p.get("allowFreeform"))
1779                .and_then(|v| v.as_bool());
1780
1781            let handler_start = Instant::now();
1782            let response = handler
1783                .on_event(HandlerEvent::UserInput {
1784                    session_id: sid.clone(),
1785                    question,
1786                    choices,
1787                    allow_freeform,
1788                })
1789                .await;
1790            tracing::debug!(
1791                elapsed_ms = handler_start.elapsed().as_millis(),
1792                session_id = %sid,
1793                "SessionHandler::on_user_input dispatch"
1794            );
1795
1796            let rpc_result = match response {
1797                HandlerResponse::UserInput(Some(UserInputResponse {
1798                    answer,
1799                    was_freeform,
1800                })) => serde_json::json!({
1801                    "answer": answer,
1802                    "wasFreeform": was_freeform,
1803                }),
1804                _ => serde_json::json!({ "noResponse": true }),
1805            };
1806            let rpc_response = JsonRpcResponse {
1807                jsonrpc: "2.0".to_string(),
1808                id: request.id,
1809                result: Some(rpc_result),
1810                error: None,
1811            };
1812            let _ = client.send_response(&rpc_response).await;
1813        }
1814
1815        "exitPlanMode.request" => {
1816            let params = request
1817                .params
1818                .as_ref()
1819                .cloned()
1820                .unwrap_or(Value::Object(serde_json::Map::new()));
1821            let data: ExitPlanModeData = match serde_json::from_value(params) {
1822                Ok(d) => d,
1823                Err(e) => {
1824                    warn!(error = %e, "failed to deserialize exitPlanMode.request params, using defaults");
1825                    ExitPlanModeData::default()
1826                }
1827            };
1828
1829            let response = handler
1830                .on_event(HandlerEvent::ExitPlanMode {
1831                    session_id: sid,
1832                    data,
1833                })
1834                .await;
1835
1836            let rpc_result = match response {
1837                HandlerResponse::ExitPlanMode(result) => serde_json::to_value(result)
1838                    .expect("ExitPlanModeResult serialization cannot fail"),
1839                _ => serde_json::json!({ "approved": true }),
1840            };
1841            let rpc_response = JsonRpcResponse {
1842                jsonrpc: "2.0".to_string(),
1843                id: request.id,
1844                result: Some(rpc_result),
1845                error: None,
1846            };
1847            let _ = client.send_response(&rpc_response).await;
1848        }
1849
1850        "autoModeSwitch.request" => {
1851            let error_code = request
1852                .params
1853                .as_ref()
1854                .and_then(|p| p.get("errorCode"))
1855                .and_then(|v| v.as_str())
1856                .map(|s| s.to_string());
1857            let retry_after_seconds = request
1858                .params
1859                .as_ref()
1860                .and_then(|p| p.get("retryAfterSeconds"))
1861                .and_then(|v| v.as_f64());
1862
1863            let response = handler
1864                .on_event(HandlerEvent::AutoModeSwitch {
1865                    session_id: sid,
1866                    error_code,
1867                    retry_after_seconds,
1868                })
1869                .await;
1870
1871            let answer = match response {
1872                HandlerResponse::AutoModeSwitch(answer) => answer,
1873                _ => AutoModeSwitchResponse::No,
1874            };
1875            let rpc_response = JsonRpcResponse {
1876                jsonrpc: "2.0".to_string(),
1877                id: request.id,
1878                result: Some(serde_json::json!({ "response": answer })),
1879                error: None,
1880            };
1881            let _ = client.send_response(&rpc_response).await;
1882        }
1883
1884        "permission.request" => {
1885            let Some(request_id) = request
1886                .params
1887                .as_ref()
1888                .and_then(|p| p.get("requestId"))
1889                .and_then(|v| v.as_str())
1890                .filter(|s| !s.is_empty())
1891            else {
1892                warn!("permission.request missing 'requestId' field");
1893                let rpc_response = JsonRpcResponse {
1894                    jsonrpc: "2.0".to_string(),
1895                    id: request.id,
1896                    result: None,
1897                    error: Some(crate::JsonRpcError {
1898                        code: error_codes::INVALID_PARAMS,
1899                        message: "missing required field: requestId".to_string(),
1900                        data: None,
1901                    }),
1902                };
1903                let _ = client.send_response(&rpc_response).await;
1904                return;
1905            };
1906            let request_id = RequestId::new(request_id);
1907            let raw_params = request
1908                .params
1909                .as_ref()
1910                .cloned()
1911                .unwrap_or(Value::Object(serde_json::Map::new()));
1912            let data: PermissionRequestData =
1913                serde_json::from_value(raw_params.clone()).unwrap_or(PermissionRequestData {
1914                    kind: None,
1915                    tool_call_id: None,
1916                    extra: raw_params,
1917                });
1918
1919            let handler_start = Instant::now();
1920            let response = handler
1921                .on_event(HandlerEvent::PermissionRequest {
1922                    session_id: sid.clone(),
1923                    request_id: request_id.clone(),
1924                    data,
1925                })
1926                .await;
1927            tracing::debug!(
1928                elapsed_ms = handler_start.elapsed().as_millis(),
1929                session_id = %sid,
1930                request_id = %request_id,
1931                "SessionHandler::on_permission_request dispatch"
1932            );
1933            let rpc_response = JsonRpcResponse {
1934                jsonrpc: "2.0".to_string(),
1935                id: request.id,
1936                result: Some(direct_permission_payload(&response)),
1937                error: None,
1938            };
1939            let _ = client.send_response(&rpc_response).await;
1940        }
1941
1942        "systemMessage.transform" => {
1943            let params = request.params.as_ref();
1944            let sections: HashMap<String, crate::transforms::TransformSection> =
1945                match params.and_then(|p| p.get("sections")) {
1946                    Some(v) => match serde_json::from_value(v.clone()) {
1947                        Ok(s) => s,
1948                        Err(e) => {
1949                            let _ = send_error_response(
1950                                client,
1951                                request.id,
1952                                error_codes::INVALID_PARAMS,
1953                                &format!("invalid sections: {e}"),
1954                            )
1955                            .await;
1956                            return;
1957                        }
1958                    },
1959                    None => {
1960                        let _ = send_error_response(
1961                            client,
1962                            request.id,
1963                            error_codes::INVALID_PARAMS,
1964                            "missing sections parameter",
1965                        )
1966                        .await;
1967                        return;
1968                    }
1969                };
1970
1971            let rpc_result = if let Some(transforms) = transforms {
1972                let transform_start = Instant::now();
1973                let response =
1974                    crate::transforms::dispatch_transform(transforms, &sid, sections).await;
1975                tracing::debug!(
1976                    elapsed_ms = transform_start.elapsed().as_millis(),
1977                    session_id = %sid,
1978                    "SystemMessageTransform::transform_section dispatch"
1979                );
1980                match serde_json::to_value(response) {
1981                    Ok(v) => v,
1982                    Err(e) => {
1983                        warn!(error = %e, "failed to serialize transform response");
1984                        serde_json::json!({ "sections": {} })
1985                    }
1986                }
1987            } else {
1988                // No transforms registered — pass through all sections unchanged.
1989                let passthrough: HashMap<String, crate::transforms::TransformSection> = sections;
1990                serde_json::json!({ "sections": passthrough })
1991            };
1992
1993            let rpc_response = JsonRpcResponse {
1994                jsonrpc: "2.0".to_string(),
1995                id: request.id,
1996                result: Some(rpc_result),
1997                error: None,
1998            };
1999            let _ = client.send_response(&rpc_response).await;
2000        }
2001
2002        method => {
2003            warn!(
2004                method = method,
2005                "unhandled request method in session event loop"
2006            );
2007            let _ = send_error_response(
2008                client,
2009                request.id,
2010                error_codes::METHOD_NOT_FOUND,
2011                &format!("unknown method: {method}"),
2012            )
2013            .await;
2014        }
2015    }
2016}
2017
2018async fn send_error_response(
2019    client: &Client,
2020    id: u64,
2021    code: i32,
2022    message: &str,
2023) -> Result<(), Error> {
2024    let response = JsonRpcResponse {
2025        jsonrpc: "2.0".to_string(),
2026        id,
2027        result: None,
2028        error: Some(crate::JsonRpcError {
2029            code,
2030            message: message.to_string(),
2031            data: None,
2032        }),
2033    };
2034    client.send_response(&response).await
2035}
2036
2037/// Inject `action: "transform"` sections into a `SystemMessageConfig`,
2038/// forcing `mode: "customize"` (required by the CLI for transforms to fire).
2039/// Preserves any existing caller-provided section overrides.
2040fn apply_transform_sections(
2041    sys_msg: &mut SystemMessageConfig,
2042    transforms: &dyn SystemMessageTransform,
2043) {
2044    sys_msg.mode = Some("customize".to_string());
2045    let sections = sys_msg.sections.get_or_insert_with(HashMap::new);
2046    for id in transforms.section_ids() {
2047        sections.entry(id).or_insert_with(|| SectionOverride {
2048            action: Some("transform".to_string()),
2049            content: None,
2050        });
2051    }
2052}
2053
2054fn inject_transform_sections(config: &mut SessionConfig, transforms: &dyn SystemMessageTransform) {
2055    let sys_msg = config.system_message.get_or_insert_with(Default::default);
2056    apply_transform_sections(sys_msg, transforms);
2057}
2058
2059fn inject_transform_sections_resume(
2060    config: &mut ResumeSessionConfig,
2061    transforms: &dyn SystemMessageTransform,
2062) {
2063    let sys_msg = config.system_message.get_or_insert_with(Default::default);
2064    apply_transform_sections(sys_msg, transforms);
2065}
2066
2067#[cfg(test)]
2068mod tests {
2069    use serde_json::json;
2070
2071    use super::{
2072        direct_permission_payload, notification_permission_payload, pending_permission_result_kind,
2073        permission_request_response,
2074    };
2075    use crate::handler::{HandlerResponse, PermissionResult};
2076
2077    #[test]
2078    fn pending_permission_requests_use_decision_kinds() {
2079        assert_eq!(
2080            pending_permission_result_kind(&HandlerResponse::Permission(
2081                PermissionResult::Approved,
2082            )),
2083            "approve-once"
2084        );
2085        assert_eq!(
2086            pending_permission_result_kind(&HandlerResponse::Permission(PermissionResult::Denied)),
2087            "reject"
2088        );
2089        assert_eq!(
2090            pending_permission_result_kind(&HandlerResponse::Ok),
2091            "user-not-available"
2092        );
2093    }
2094
2095    #[test]
2096    fn direct_permission_requests_use_decision_response_kinds() {
2097        assert_eq!(
2098            serde_json::to_value(permission_request_response(&HandlerResponse::Permission(
2099                PermissionResult::Approved
2100            ),))
2101            .expect("serializing approved permission response should succeed"),
2102            json!({ "kind": "approve-once" })
2103        );
2104        assert_eq!(
2105            serde_json::to_value(permission_request_response(&HandlerResponse::Permission(
2106                PermissionResult::Denied
2107            ),))
2108            .expect("serializing denied permission response should succeed"),
2109            json!({ "kind": "reject" })
2110        );
2111        assert_eq!(
2112            serde_json::to_value(permission_request_response(&HandlerResponse::Ok))
2113                .expect("serializing fallback permission response should succeed"),
2114            json!({ "kind": "reject" })
2115        );
2116    }
2117
2118    #[test]
2119    fn notification_payload_handles_deferred_and_custom() {
2120        // Deferred → no payload, SDK must not respond.
2121        assert!(
2122            notification_permission_payload(&HandlerResponse::Permission(
2123                PermissionResult::Deferred,
2124            ))
2125            .is_none()
2126        );
2127
2128        // Custom → handler-supplied value passed through verbatim.
2129        let custom = json!({
2130            "kind": "approve-and-remember",
2131            "allowlist": ["ls", "grep"],
2132        });
2133        assert_eq!(
2134            notification_permission_payload(&HandlerResponse::Permission(
2135                PermissionResult::Custom(custom.clone()),
2136            )),
2137            Some(custom)
2138        );
2139
2140        // Approved/Denied → existing kind-only shape.
2141        assert_eq!(
2142            notification_permission_payload(&HandlerResponse::Permission(
2143                PermissionResult::Approved,
2144            )),
2145            Some(json!({ "kind": "approve-once" }))
2146        );
2147        assert_eq!(
2148            notification_permission_payload(
2149                &HandlerResponse::Permission(PermissionResult::Denied,)
2150            ),
2151            Some(json!({ "kind": "reject" }))
2152        );
2153    }
2154
2155    #[test]
2156    fn direct_payload_handles_deferred_and_custom() {
2157        // Custom → handler-supplied value passed through verbatim.
2158        let custom = json!({
2159            "kind": "approve-and-remember",
2160            "allowlist": ["ls", "grep"],
2161        });
2162        assert_eq!(
2163            direct_permission_payload(&HandlerResponse::Permission(PermissionResult::Custom(
2164                custom.clone(),
2165            ))),
2166            custom
2167        );
2168
2169        // Deferred → falls back to Approved because the direct RPC must reply.
2170        assert_eq!(
2171            direct_permission_payload(&HandlerResponse::Permission(PermissionResult::Deferred)),
2172            json!({ "kind": "approve-once" })
2173        );
2174
2175        // Approved/Denied → existing kind-only shape.
2176        assert_eq!(
2177            direct_permission_payload(&HandlerResponse::Permission(PermissionResult::Approved)),
2178            json!({ "kind": "approve-once" })
2179        );
2180        assert_eq!(
2181            direct_permission_payload(&HandlerResponse::Permission(PermissionResult::Denied)),
2182            json!({ "kind": "reject" })
2183        );
2184    }
2185}