Skip to main content

agent_client_protocol/
session.rs

1use std::{future::Future, marker::PhantomData, path::Path};
2
3use agent_client_protocol_schema::{
4    ContentBlock, ContentChunk, NewSessionRequest, NewSessionResponse, PromptRequest,
5    PromptResponse, SessionModeState, SessionNotification, SessionUpdate, StopReason,
6};
7use futures::channel::{mpsc, oneshot};
8
9use crate::{
10    Agent, Client, ConnectionTo, Dispatch, HandleDispatchFrom, Handled, Responder, Role,
11    jsonrpc::{
12        DynamicHandlerRegistration,
13        run::{ChainRun, NullRun, RunWithConnectionTo},
14    },
15    mcp_server::McpServer,
16    role::{HasPeer, acp::ProxySessionMessages},
17    schema::SessionId,
18    util::{MatchDispatch, MatchDispatchFrom, run_until},
19};
20
21/// Marker type indicating the session builder will block the current task.
22#[derive(Debug)]
23pub struct Blocking;
24impl SessionBlockState for Blocking {}
25
26/// Marker type indicating the session builder will not block the current task.
27#[derive(Debug)]
28pub struct NonBlocking;
29impl SessionBlockState for NonBlocking {}
30
31/// Trait for marker types that indicate blocking vs blocking API.
32/// See [`SessionBuilder::block_task`].
33pub trait SessionBlockState: Send + 'static + Sync + std::fmt::Debug {}
34
35impl<Counterpart: Role> ConnectionTo<Counterpart>
36where
37    Counterpart: HasPeer<Agent>,
38{
39    /// Session builder for a new session request.
40    pub fn build_session(&self, cwd: impl AsRef<Path>) -> SessionBuilder<Counterpart, NullRun> {
41        SessionBuilder::new(self, NewSessionRequest::new(cwd.as_ref()))
42    }
43
44    /// Session builder using the current working directory.
45    ///
46    /// This is a convenience wrapper around [`build_session`](Self::build_session)
47    /// that uses [`std::env::current_dir`] to get the working directory.
48    ///
49    /// Returns an error if the current directory cannot be determined.
50    pub fn build_session_cwd(&self) -> Result<SessionBuilder<Counterpart, NullRun>, crate::Error> {
51        let cwd = std::env::current_dir().map_err(|e| {
52            crate::Error::internal_error().data(format!("cannot get current directory: {e}"))
53        })?;
54        Ok(self.build_session(cwd))
55    }
56
57    /// Session builder starting from an existing request.
58    ///
59    /// Use this when you've intercepted a `session.new` request and want to
60    /// modify it (e.g., inject MCP servers) before forwarding.
61    pub fn build_session_from(
62        &self,
63        request: NewSessionRequest,
64    ) -> SessionBuilder<Counterpart, NullRun> {
65        SessionBuilder::new(self, request)
66    }
67
68    /// Given a session response received from the agent,
69    /// attach a handler to process messages related to this session
70    /// and let you access them.
71    ///
72    /// Normally you would not use this method directly but would
73    /// instead use [`Self::build_session`] and then [`SessionBuilder::start_session`].
74    ///
75    /// The vector `dynamic_handler_registrations` contains any dynamic
76    /// handle registrations associated with this session (e.g., from MCP servers).
77    /// You can simply pass `Default::default()` if not applicable.
78    pub fn attach_session<'responder>(
79        &self,
80        response: NewSessionResponse,
81        mcp_handler_registrations: Vec<DynamicHandlerRegistration<Counterpart>>,
82    ) -> Result<ActiveSession<'responder, Counterpart>, crate::Error> {
83        let NewSessionResponse {
84            session_id,
85            modes,
86            meta,
87            ..
88        } = response;
89
90        let (update_tx, update_rx) = mpsc::unbounded();
91        let handler = ActiveSessionHandler::new(session_id.clone(), update_tx.clone());
92        let session_handler_registration = self.add_dynamic_handler(handler)?;
93
94        Ok(ActiveSession {
95            session_id,
96            modes,
97            meta,
98            update_rx,
99            update_tx,
100            connection: self.clone(),
101            session_handler_registration,
102            mcp_handler_registrations,
103            _responder: PhantomData,
104        })
105    }
106}
107
108/// Session builder for a new session request.
109/// Allows you to add MCP servers or set other details for this session.
110///
111/// The `BlockState` type parameter tracks whether blocking methods are available:
112/// - `NonBlocking` (default): Only [`on_session_start`](Self::on_session_start) is available
113/// - `Blocking` (after calling [`block_task`](Self::block_task)):
114///   [`run_until`](Self::run_until) and [`start_session`](Self::start_session) become available
115#[must_use = "use `start_session`, `run_until`, or `on_session_start` to start the session"]
116#[derive(Debug)]
117pub struct SessionBuilder<
118    Counterpart,
119    Run: RunWithConnectionTo<Counterpart> = NullRun,
120    BlockState: SessionBlockState = NonBlocking,
121> where
122    Counterpart: HasPeer<Agent>,
123{
124    connection: ConnectionTo<Counterpart>,
125    request: NewSessionRequest,
126    dynamic_handler_registrations: Vec<DynamicHandlerRegistration<Counterpart>>,
127    run: Run,
128    block_state: PhantomData<BlockState>,
129}
130
131impl<Counterpart> SessionBuilder<Counterpart, NullRun, NonBlocking>
132where
133    Counterpart: HasPeer<Agent>,
134{
135    fn new(connection: &ConnectionTo<Counterpart>, request: NewSessionRequest) -> Self {
136        SessionBuilder {
137            connection: connection.clone(),
138            request,
139            dynamic_handler_registrations: Vec::default(),
140            run: NullRun,
141            block_state: PhantomData,
142        }
143    }
144}
145
146impl<Counterpart, R, BlockState> SessionBuilder<Counterpart, R, BlockState>
147where
148    Counterpart: HasPeer<Agent>,
149    R: RunWithConnectionTo<Counterpart>,
150    BlockState: SessionBlockState,
151{
152    /// Add the MCP servers from the given registry to this session.
153    pub fn with_mcp_server<McpRun>(
154        mut self,
155        mcp_server: McpServer<Counterpart, McpRun>,
156    ) -> Result<SessionBuilder<Counterpart, ChainRun<R, McpRun>, BlockState>, crate::Error>
157    where
158        McpRun: RunWithConnectionTo<Counterpart>,
159    {
160        let (handler, mcp_run) = mcp_server.into_handler_and_responder();
161        self.dynamic_handler_registrations
162            .push(handler.into_dynamic_handler(&mut self.request, &self.connection)?);
163        Ok(SessionBuilder {
164            connection: self.connection,
165            request: self.request,
166            dynamic_handler_registrations: self.dynamic_handler_registrations,
167            run: ChainRun::new(self.run, mcp_run),
168            block_state: self.block_state,
169        })
170    }
171
172    /// Spawn a task that runs the provided closure once the session starts.
173    ///
174    /// Unlike [`start_session`](Self::start_session), this method returns immediately
175    /// without blocking the current task. The session handshake and closure execution
176    /// happen in a spawned background task.
177    ///
178    /// The closure receives an `ActiveSession<'static, _>` and should return
179    /// `Result<(), Error>`. If the closure returns an error, it will propagate
180    /// to the connection's error handling.
181    ///
182    /// # Example
183    ///
184    /// ```
185    /// # use agent_client_protocol::{Client, Agent, ConnectTo};
186    /// # use agent_client_protocol::mcp_server::McpServer;
187    /// # async fn example(transport: impl ConnectTo<Client>) -> Result<(), agent_client_protocol::Error> {
188    /// # Client.builder().connect_with(transport, async |cx| {
189    /// # let mcp = McpServer::<Agent, _>::builder("tools").build();
190    /// cx.build_session_cwd()?
191    ///     .with_mcp_server(mcp)?
192    ///     .on_session_start(async |mut session| {
193    ///         // Do something with the session
194    ///         session.send_prompt("Hello")?;
195    ///         let response = session.read_to_string().await?;
196    ///         Ok(())
197    ///     })?;
198    /// // Returns immediately, session runs in background
199    /// # Ok(())
200    /// # }).await?;
201    /// # Ok(())
202    /// # }
203    /// ```
204    ///
205    /// # Ordering
206    ///
207    /// This callback blocks the dispatch loop until the session starts and your
208    /// callback completes. See the [`ordering`](crate::concepts::ordering) module for details.
209    pub fn on_session_start<F, Fut>(self, op: F) -> Result<(), crate::Error>
210    where
211        R: 'static,
212        F: FnOnce(ActiveSession<'static, Counterpart>) -> Fut + Send + 'static,
213        Fut: Future<Output = Result<(), crate::Error>> + Send,
214    {
215        let Self {
216            connection,
217            request,
218            dynamic_handler_registrations,
219            run,
220            block_state: _,
221        } = self;
222
223        connection
224            .send_request_to(Agent, request)
225            .on_receiving_result({
226                let connection = connection.clone();
227                async move |result| {
228                    let response = result?;
229
230                    connection.spawn(run.run_with_connection_to(connection.clone()))?;
231
232                    let active_session =
233                        connection.attach_session(response, dynamic_handler_registrations)?;
234
235                    op(active_session).await
236                }
237            })
238    }
239
240    /// Spawn a proxy session and run a closure with the session ID.
241    ///
242    /// A **proxy session** starts the session with the agent and then automatically
243    /// proxies all session updates (prompts, tool calls, etc.) from the agent back
244    /// to the client. You don't need to handle any messages yourself - the proxy
245    /// takes care of forwarding everything. This is useful when you want to inject
246    /// and/or filter prompts coming from the client but otherwise not be involved
247    /// in the session.
248    ///
249    /// Unlike [`start_session_proxy`](Self::start_session_proxy), this method returns
250    /// immediately without blocking the current task. The session handshake, client
251    /// response, and proxy setup all happen in a spawned background task.
252    ///
253    /// The closure receives the `SessionId` once the session is established, allowing
254    /// you to perform any custom work with that ID (e.g., tracking, logging).
255    ///
256    /// # Example
257    ///
258    /// ```
259    /// # use agent_client_protocol::{Proxy, Client, Conductor, ConnectTo};
260    /// # use agent_client_protocol::schema::NewSessionRequest;
261    /// # use agent_client_protocol::mcp_server::McpServer;
262    /// # async fn example(transport: impl ConnectTo<Proxy>) -> Result<(), agent_client_protocol::Error> {
263    /// Proxy.builder()
264    ///     .on_receive_request_from(Client, async |request: NewSessionRequest, responder, cx| {
265    ///         let mcp = McpServer::<Conductor, _>::builder("tools").build();
266    ///         cx.build_session_from(request)
267    ///             .with_mcp_server(mcp)?
268    ///             .on_proxy_session_start(responder, async |session_id| {
269    ///                 // Session started
270    ///                 Ok(())
271    ///             })
272    ///     }, agent_client_protocol::on_receive_request!())
273    ///     .connect_to(transport)
274    ///     .await?;
275    /// # Ok(())
276    /// # }
277    /// ```
278    ///
279    /// # Ordering
280    ///
281    /// This callback blocks the dispatch loop until the session starts and your
282    /// callback completes. See the [`ordering`](crate::concepts::ordering) module for details.
283    pub fn on_proxy_session_start<F, Fut>(
284        self,
285        responder: Responder<NewSessionResponse>,
286        op: F,
287    ) -> Result<(), crate::Error>
288    where
289        F: FnOnce(SessionId) -> Fut + Send + 'static,
290        Fut: Future<Output = Result<(), crate::Error>> + Send,
291        Counterpart: HasPeer<Client>,
292        R: 'static,
293    {
294        let Self {
295            connection,
296            request,
297            dynamic_handler_registrations,
298            run,
299            block_state: _,
300        } = self;
301
302        // Spawn off the run and dynamic handlers to run indefinitely
303        connection.spawn(run.run_with_connection_to(connection.clone()))?;
304        dynamic_handler_registrations
305            .into_iter()
306            .for_each(super::jsonrpc::DynamicHandlerRegistration::run_indefinitely);
307
308        // Send the "new session" request to the agent
309        connection
310            .send_request_to(Agent, request)
311            .on_receiving_result({
312                let connection = connection.clone();
313                async move |result| {
314                    let response = result?;
315
316                    // Extract the session-id from the response and forward
317                    // the response back to the client
318                    let session_id = response.session_id.clone();
319                    responder.respond(response)?;
320
321                    // Install a dynamic handler to proxy messages from this session
322                    connection
323                        .add_dynamic_handler(ProxySessionMessages::new(session_id.clone()))?
324                        .run_indefinitely();
325
326                    op(session_id).await
327                }
328            })
329    }
330}
331
332impl<Counterpart, R> SessionBuilder<Counterpart, R, NonBlocking>
333where
334    Counterpart: HasPeer<Agent>,
335    R: RunWithConnectionTo<Counterpart>,
336{
337    /// Mark this session builder as being able to block the current task.
338    ///
339    /// After calling this, you can use [`run_until`](Self::run_until) or
340    /// [`start_session`](Self::start_session) which block the current task.
341    ///
342    /// This should not be used from inside a message handler like
343    /// [`Builder::on_receive_request`](`crate::Builder::on_receive_request`) or [`HandleDispatchFrom`]
344    /// implementations.
345    pub fn block_task(self) -> SessionBuilder<Counterpart, R, Blocking> {
346        SessionBuilder {
347            connection: self.connection,
348            request: self.request,
349            dynamic_handler_registrations: self.dynamic_handler_registrations,
350            run: self.run,
351            block_state: PhantomData,
352        }
353    }
354}
355
356impl<Counterpart, R> SessionBuilder<Counterpart, R, Blocking>
357where
358    Counterpart: HasPeer<Agent>,
359    R: RunWithConnectionTo<Counterpart>,
360{
361    /// Run this session synchronously. The current task will be blocked
362    /// and `op` will be executed with the active session information.
363    /// This is useful when you have MCP servers that are borrowed from your local
364    /// stack frame.
365    ///
366    /// The `ActiveSession` passed to `op` has a non-`'static` lifetime, which
367    /// prevents calling [`ActiveSession::proxy_remaining_messages`] (since the
368    /// responders would terminate when `op` returns).
369    ///
370    /// Requires calling [`block_task`](Self::block_task) first.
371    pub async fn run_until<T>(
372        self,
373        op: impl for<'responder> AsyncFnOnce(
374            ActiveSession<'responder, Counterpart>,
375        ) -> Result<T, crate::Error>,
376    ) -> Result<T, crate::Error> {
377        let Self {
378            connection,
379            request,
380            dynamic_handler_registrations,
381            run,
382            block_state: _,
383        } = self;
384
385        let response = connection
386            .send_request_to(Agent, request)
387            .block_task()
388            .await?;
389
390        let active_session = connection.attach_session(response, dynamic_handler_registrations)?;
391
392        run_until(
393            run.run_with_connection_to(connection.clone()),
394            op(active_session),
395        )
396        .await
397    }
398
399    /// Send the request to create the session and return a handle.
400    /// This is an alternative to [`Self::run_until`] that avoids rightward
401    /// drift but at the cost of requiring MCP servers that are `Send` and
402    /// don't access data from the surrounding scope.
403    ///
404    /// Returns an `ActiveSession<'static, _>` because responders are spawned
405    /// into background tasks that live for the connection lifetime.
406    ///
407    /// Requires calling [`block_task`](Self::block_task) first.
408    pub async fn start_session(self) -> Result<ActiveSession<'static, Counterpart>, crate::Error>
409    where
410        R: 'static,
411    {
412        let Self {
413            connection,
414            request,
415            dynamic_handler_registrations,
416            run,
417            block_state: _,
418        } = self;
419
420        let (active_session_tx, active_session_rx) = oneshot::channel();
421
422        connection.clone().spawn(async move {
423            let response = connection
424                .send_request_to(Agent, request)
425                .block_task()
426                .await?;
427
428            connection.spawn(run.run_with_connection_to(connection.clone()))?;
429
430            let active_session =
431                connection.attach_session(response, dynamic_handler_registrations)?;
432
433            active_session_tx
434                .send(active_session)
435                .map_err(|_| crate::Error::internal_error())?;
436
437            Ok(())
438        })?;
439
440        active_session_rx
441            .await
442            .map_err(|_| crate::Error::internal_error())
443    }
444
445    /// Start a proxy session that forwards all messages between client and agent.
446    ///
447    /// A **proxy session** starts the session with the agent and then automatically
448    /// proxies all session updates (prompts, tool calls, etc.) from the agent back
449    /// to the client. You don't need to handle any messages yourself - the proxy
450    /// takes care of forwarding everything. This is useful when you want to inject
451    /// and/or filter prompts coming from the client but otherwise not be involved
452    /// in the session.
453    ///
454    /// This is a convenience method that combines [`start_session`](Self::start_session),
455    /// responding to the client, and [`ActiveSession::proxy_remaining_messages`].
456    ///
457    /// For more control (e.g., to send some messages before proxying), use
458    /// [`start_session`](Self::start_session) instead and call
459    /// [`proxy_remaining_messages`](ActiveSession::proxy_remaining_messages) manually.
460    ///
461    /// Requires calling [`block_task`](Self::block_task) first.
462    pub async fn start_session_proxy(
463        self,
464        responder: Responder<NewSessionResponse>,
465    ) -> Result<SessionId, crate::Error>
466    where
467        Counterpart: HasPeer<Client>,
468        R: 'static,
469    {
470        let active_session = self.start_session().await?;
471        let session_id = active_session.session_id().clone();
472        responder.respond(active_session.response())?;
473        active_session.proxy_remaining_messages()?;
474        Ok(session_id)
475    }
476}
477
478/// Active session struct that lets you send prompts and receive updates.
479///
480/// The `'responder` lifetime represents the span during which responders
481/// (e.g., MCP server handlers) are active. When created via [`SessionBuilder::start_session`],
482/// this is `'static` because responders are spawned into background tasks.
483/// When created via [`SessionBuilder::run_until`], this is tied to the
484/// closure scope, preventing [`Self::proxy_remaining_messages`] from being called
485/// (since the responders would die when the closure returns).
486#[derive(Debug)]
487pub struct ActiveSession<'responder, Link>
488where
489    Link: HasPeer<Agent>,
490{
491    session_id: SessionId,
492    update_rx: mpsc::UnboundedReceiver<SessionMessage>,
493    update_tx: mpsc::UnboundedSender<SessionMessage>,
494    modes: Option<SessionModeState>,
495    meta: Option<serde_json::Map<String, serde_json::Value>>,
496    connection: ConnectionTo<Link>,
497
498    /// Registration for the handler that routes session messages to `update_rx`.
499    /// This is separate from MCP handlers so it can be dropped independently
500    /// when switching to proxy mode.
501    session_handler_registration: DynamicHandlerRegistration<Link>,
502
503    /// Registrations for MCP server handlers.
504    /// These will be dropped once the active-session struct is dropped
505    /// which will cause them to be deregistered.
506    mcp_handler_registrations: Vec<DynamicHandlerRegistration<Link>>,
507
508    /// Phantom lifetime representing the responder lifetime.
509    _responder: PhantomData<&'responder ()>,
510}
511
512/// Incoming message from the agent
513#[non_exhaustive]
514#[derive(Debug)]
515pub enum SessionMessage {
516    /// Periodic updates with new content, tool requests, etc.
517    /// Use [`MatchDispatch`] to match on the message type.
518    SessionMessage(Dispatch),
519
520    /// When a prompt completes, the stop reason.
521    StopReason(StopReason),
522}
523
524impl<Link> ActiveSession<'_, Link>
525where
526    Link: HasPeer<Agent>,
527{
528    /// Access the session ID.
529    pub fn session_id(&self) -> &SessionId {
530        &self.session_id
531    }
532
533    /// Access modes available in this session.
534    pub fn modes(&self) -> &Option<SessionModeState> {
535        &self.modes
536    }
537
538    /// Access meta data from session response.
539    pub fn meta(&self) -> &Option<serde_json::Map<String, serde_json::Value>> {
540        &self.meta
541    }
542
543    /// Build a `NewSessionResponse` from the session information.
544    ///
545    /// Useful when you need to forward the session response to a client
546    /// after doing some processing.
547    pub fn response(&self) -> NewSessionResponse {
548        NewSessionResponse::new(self.session_id.clone())
549            .modes(self.modes.clone())
550            .meta(self.meta.clone())
551    }
552
553    /// Access the underlying connection context used to communicate with the agent.
554    pub fn connection(&self) -> ConnectionTo<Link> {
555        self.connection.clone()
556    }
557
558    /// Send a prompt to the agent. You can then read messages sent in response.
559    pub fn send_prompt(&mut self, prompt: impl ToString) -> Result<(), crate::Error> {
560        let update_tx = self.update_tx.clone();
561        self.connection
562            .send_request_to(
563                Agent,
564                PromptRequest::new(self.session_id.clone(), vec![prompt.to_string().into()]),
565            )
566            .on_receiving_result(async move |result| {
567                let PromptResponse { stop_reason, .. } = result?;
568
569                update_tx
570                    .unbounded_send(SessionMessage::StopReason(stop_reason))
571                    .map_err(crate::util::internal_error)?;
572
573                Ok(())
574            })
575    }
576
577    /// Read an update from the agent in response to the prompt.
578    pub async fn read_update(&mut self) -> Result<SessionMessage, crate::Error> {
579        use futures::StreamExt;
580        let message =
581            self.update_rx.next().await.ok_or_else(|| {
582                crate::util::internal_error("session channel closed unexpectedly")
583            })?;
584
585        Ok(message)
586    }
587
588    /// Read all updates until the end of the turn and create a string.
589    /// Ignores non-text updates.
590    pub async fn read_to_string(&mut self) -> Result<String, crate::Error> {
591        let mut output = String::new();
592        loop {
593            let update = self.read_update().await?;
594            tracing::trace!(?update, "read_to_string update");
595            match update {
596                SessionMessage::SessionMessage(dispatch) => MatchDispatch::new(dispatch)
597                    .if_notification(async |notif: SessionNotification| match notif.update {
598                        SessionUpdate::AgentMessageChunk(ContentChunk {
599                            content: ContentBlock::Text(text),
600                            ..
601                        }) => {
602                            output.push_str(&text.text);
603                            Ok(())
604                        }
605                        _ => Ok(()),
606                    })
607                    .await
608                    .otherwise_ignore()?,
609                SessionMessage::StopReason(_stop_reason) => break,
610            }
611        }
612        Ok(output)
613    }
614}
615
616impl<Link> ActiveSession<'static, Link>
617where
618    Link: HasPeer<Agent>,
619{
620    /// Proxy all remaining messages for this session between client and agent.
621    ///
622    /// Use this when you want to inject MCP servers into a session but don't need
623    /// to actively interact with it after setup. The session messages will be proxied
624    /// between client and agent automatically.
625    ///
626    /// This consumes the `ActiveSession` since you're giving up active control.
627    ///
628    /// This method is only available on `ActiveSession<'static, _>` (from
629    /// [`SessionBuilder::start_session`]) because it requires responders to
630    /// outlive the method call.
631    ///
632    /// # Message Ordering Guarantees
633    ///
634    /// This method ensures proper handoff from active session mode to proxy mode
635    /// without losing or reordering messages:
636    ///
637    /// 1. **Stop the session handler** - Drop the registration that routes messages
638    ///    to `update_rx`. After this, no new messages will be queued.
639    /// 2. **Close the channel** - Drop `update_tx` so we can detect when the channel
640    ///    is fully drained.
641    /// 3. **Drain queued messages** - Forward any messages that were already queued
642    ///    in `update_rx` to the client, preserving order.
643    /// 4. **Install proxy handler** - Now that all queued messages are forwarded,
644    ///    install the proxy handler to handle future messages.
645    ///
646    /// This sequence prevents the race condition where messages could be delivered
647    /// out of order or lost during the transition.
648    pub fn proxy_remaining_messages(self) -> Result<(), crate::Error>
649    where
650        Link: HasPeer<Client>,
651    {
652        // Destructure self to get ownership of all fields
653        let ActiveSession {
654            session_id,
655            mut update_rx,
656            update_tx,
657            connection,
658            session_handler_registration,
659            mcp_handler_registrations,
660            // These fields are not needed for proxying
661            modes: _,
662            meta: _,
663            _responder,
664        } = self;
665
666        // Step 1: Drop the session handler registration.
667        // This unregisters the handler that was routing messages to update_rx.
668        // After this point, no new messages will be added to the channel.
669        drop(session_handler_registration);
670
671        // Step 2: Drop the sender side of the channel.
672        // This allows us to detect when the channel is fully drained
673        // (recv will return None when empty and sender is dropped).
674        drop(update_tx);
675
676        // Step 3: Drain any messages that were already queued and forward to client.
677        // These messages arrived before we dropped the handler but haven't been
678        // consumed yet. We must forward them to maintain message ordering.
679        while let Ok(message) = update_rx.try_recv() {
680            match message {
681                SessionMessage::SessionMessage(dispatch) => {
682                    // Forward the message to the client
683                    connection.send_proxied_message_to(Client, dispatch)?;
684                }
685                SessionMessage::StopReason(_) => {
686                    // StopReason is internal bookkeeping, not forwarded
687                }
688            }
689        }
690
691        // Step 4: Install the proxy handler for future messages.
692        // Now that all queued messages have been forwarded, the proxy handler
693        // can take over. Any new messages will go directly through the proxy.
694        connection
695            .add_dynamic_handler(ProxySessionMessages::new(session_id))?
696            .run_indefinitely();
697
698        // Keep MCP server handlers alive for the lifetime of the proxy
699        for registration in mcp_handler_registrations {
700            registration.run_indefinitely();
701        }
702
703        Ok(())
704    }
705}
706
707struct ActiveSessionHandler {
708    session_id: SessionId,
709    update_tx: mpsc::UnboundedSender<SessionMessage>,
710}
711
712impl ActiveSessionHandler {
713    pub fn new(session_id: SessionId, update_tx: mpsc::UnboundedSender<SessionMessage>) -> Self {
714        Self {
715            session_id,
716            update_tx,
717        }
718    }
719}
720
721impl<Counterpart: Role> HandleDispatchFrom<Counterpart> for ActiveSessionHandler
722where
723    Counterpart: HasPeer<Agent>,
724{
725    async fn handle_dispatch_from(
726        &mut self,
727        message: Dispatch,
728        cx: ConnectionTo<Counterpart>,
729    ) -> Result<Handled<Dispatch>, crate::Error> {
730        // If this is a message for our session, grab it.
731        tracing::trace!(
732            ?message,
733            handler_session_id = ?self.session_id,
734            "ActiveSessionHandler::handle_dispatch"
735        );
736        MatchDispatchFrom::new(message, &cx)
737            .if_message_from(Agent, async |message| {
738                if let Some(session_id) = message.get_session_id()? {
739                    tracing::trace!(
740                        message_session_id = ?session_id,
741                        handler_session_id = ?self.session_id,
742                        "ActiveSessionHandler::handle_dispatch"
743                    );
744                    if session_id == self.session_id {
745                        self.update_tx
746                            .unbounded_send(SessionMessage::SessionMessage(message))
747                            .map_err(crate::util::internal_error)?;
748                        return Ok(Handled::Yes);
749                    }
750                }
751
752                // Otherwise, pass it through.
753                Ok(Handled::No {
754                    message,
755                    retry: false,
756                })
757            })
758            .await
759            .done()
760    }
761
762    fn describe_chain(&self) -> impl std::fmt::Debug {
763        format!("ActiveSessionHandler({})", self.session_id)
764    }
765}