Skip to main content

agent_client_protocol/
session.rs

1use std::{future::Future, marker::PhantomData, path::Path};
2
3use agent_client_protocol_schema::{
4    ContentBlock, ContentChunk, NewSessionRequest, NewSessionResponse, PromptRequest,
5    PromptResponse, SessionModeState, SessionNotification, SessionUpdate, StopReason,
6};
7use futures::channel::mpsc;
8use tokio::sync::oneshot;
9
10use crate::{
11    Agent, Client, ConnectionTo, Dispatch, HandleDispatchFrom, Handled, Responder, Role,
12    jsonrpc::{
13        DynamicHandlerRegistration,
14        run::{ChainRun, NullRun, RunWithConnectionTo},
15    },
16    mcp_server::McpServer,
17    role::{HasPeer, acp::ProxySessionMessages},
18    schema::SessionId,
19    util::{MatchDispatch, MatchDispatchFrom, run_until},
20};
21
22/// Marker type indicating the session builder will block the current task.
23#[derive(Debug)]
24pub struct Blocking;
25impl SessionBlockState for Blocking {}
26
27/// Marker type indicating the session builder will not block the current task.
28#[derive(Debug)]
29pub struct NonBlocking;
30impl SessionBlockState for NonBlocking {}
31
32/// Trait for marker types that indicate blocking vs blocking API.
33/// See [`SessionBuilder::block_task`].
34pub trait SessionBlockState: Send + 'static + Sync + std::fmt::Debug {}
35
36impl<Counterpart: Role> ConnectionTo<Counterpart>
37where
38    Counterpart: HasPeer<Agent>,
39{
40    /// Session builder for a new session request.
41    pub fn build_session(&self, cwd: impl AsRef<Path>) -> SessionBuilder<Counterpart, NullRun> {
42        SessionBuilder::new(self, NewSessionRequest::new(cwd.as_ref()))
43    }
44
45    /// Session builder using the current working directory.
46    ///
47    /// This is a convenience wrapper around [`build_session`](Self::build_session)
48    /// that uses [`std::env::current_dir`] to get the working directory.
49    ///
50    /// Returns an error if the current directory cannot be determined.
51    pub fn build_session_cwd(&self) -> Result<SessionBuilder<Counterpart, NullRun>, crate::Error> {
52        let cwd = std::env::current_dir().map_err(|e| {
53            crate::Error::internal_error().data(format!("cannot get current directory: {e}"))
54        })?;
55        Ok(self.build_session(cwd))
56    }
57
58    /// Session builder starting from an existing request.
59    ///
60    /// Use this when you've intercepted a `session.new` request and want to
61    /// modify it (e.g., inject MCP servers) before forwarding.
62    pub fn build_session_from(
63        &self,
64        request: NewSessionRequest,
65    ) -> SessionBuilder<Counterpart, NullRun> {
66        SessionBuilder::new(self, request)
67    }
68
69    /// Given a session response received from the agent,
70    /// attach a handler to process messages related to this session
71    /// and let you access them.
72    ///
73    /// Normally you would not use this method directly but would
74    /// instead use [`Self::build_session`] and then [`SessionBuilder::start_session`].
75    ///
76    /// The vector `dynamic_handler_registrations` contains any dynamic
77    /// handle registrations associated with this session (e.g., from MCP servers).
78    /// You can simply pass `Default::default()` if not applicable.
79    pub fn attach_session<'responder>(
80        &self,
81        response: NewSessionResponse,
82        mcp_handler_registrations: Vec<DynamicHandlerRegistration<Counterpart>>,
83    ) -> Result<ActiveSession<'responder, Counterpart>, crate::Error> {
84        let NewSessionResponse {
85            session_id,
86            modes,
87            meta,
88            ..
89        } = response;
90
91        let (update_tx, update_rx) = mpsc::unbounded();
92        let handler = ActiveSessionHandler::new(session_id.clone(), update_tx.clone());
93        let session_handler_registration = self.add_dynamic_handler(handler)?;
94
95        Ok(ActiveSession {
96            session_id,
97            modes,
98            meta,
99            update_rx,
100            update_tx,
101            connection: self.clone(),
102            session_handler_registration,
103            mcp_handler_registrations,
104            _responder: PhantomData,
105        })
106    }
107}
108
109/// Session builder for a new session request.
110/// Allows you to add MCP servers or set other details for this session.
111///
112/// The `BlockState` type parameter tracks whether blocking methods are available:
113/// - `NonBlocking` (default): Only [`on_session_start`](Self::on_session_start) is available
114/// - `Blocking` (after calling [`block_task`](Self::block_task)):
115///   [`run_until`](Self::run_until) and [`start_session`](Self::start_session) become available
116#[must_use = "use `start_session`, `run_until`, or `on_session_start` to start the session"]
117#[derive(Debug)]
118pub struct SessionBuilder<
119    Counterpart,
120    Run: RunWithConnectionTo<Counterpart> = NullRun,
121    BlockState: SessionBlockState = NonBlocking,
122> where
123    Counterpart: HasPeer<Agent>,
124{
125    connection: ConnectionTo<Counterpart>,
126    request: NewSessionRequest,
127    dynamic_handler_registrations: Vec<DynamicHandlerRegistration<Counterpart>>,
128    run: Run,
129    block_state: PhantomData<BlockState>,
130}
131
132impl<Counterpart> SessionBuilder<Counterpart, NullRun, NonBlocking>
133where
134    Counterpart: HasPeer<Agent>,
135{
136    fn new(connection: &ConnectionTo<Counterpart>, request: NewSessionRequest) -> Self {
137        SessionBuilder {
138            connection: connection.clone(),
139            request,
140            dynamic_handler_registrations: Vec::default(),
141            run: NullRun,
142            block_state: PhantomData,
143        }
144    }
145}
146
147impl<Counterpart, R, BlockState> SessionBuilder<Counterpart, R, BlockState>
148where
149    Counterpart: HasPeer<Agent>,
150    R: RunWithConnectionTo<Counterpart>,
151    BlockState: SessionBlockState,
152{
153    /// Add the MCP servers from the given registry to this session.
154    pub fn with_mcp_server<McpRun>(
155        mut self,
156        mcp_server: McpServer<Counterpart, McpRun>,
157    ) -> Result<SessionBuilder<Counterpart, ChainRun<R, McpRun>, BlockState>, crate::Error>
158    where
159        McpRun: RunWithConnectionTo<Counterpart>,
160    {
161        let (handler, mcp_run) = mcp_server.into_handler_and_responder();
162        self.dynamic_handler_registrations
163            .push(handler.into_dynamic_handler(&mut self.request, &self.connection)?);
164        Ok(SessionBuilder {
165            connection: self.connection,
166            request: self.request,
167            dynamic_handler_registrations: self.dynamic_handler_registrations,
168            run: ChainRun::new(self.run, mcp_run),
169            block_state: self.block_state,
170        })
171    }
172
173    /// Spawn a task that runs the provided closure once the session starts.
174    ///
175    /// Unlike [`start_session`](Self::start_session), this method returns immediately
176    /// without blocking the current task. The session handshake and closure execution
177    /// happen in a spawned background task.
178    ///
179    /// The closure receives an `ActiveSession<'static, _>` and should return
180    /// `Result<(), Error>`. If the closure returns an error, it will propagate
181    /// to the connection's error handling.
182    ///
183    /// # Example
184    ///
185    /// ```
186    /// # use agent_client_protocol::{Client, Agent, ConnectTo};
187    /// # use agent_client_protocol::mcp_server::McpServer;
188    /// # async fn example(transport: impl ConnectTo<Client>) -> Result<(), agent_client_protocol::Error> {
189    /// # Client.builder().connect_with(transport, async |cx| {
190    /// # let mcp = McpServer::<Agent, _>::builder("tools").build();
191    /// cx.build_session_cwd()?
192    ///     .with_mcp_server(mcp)?
193    ///     .on_session_start(async |mut session| {
194    ///         // Do something with the session
195    ///         session.send_prompt("Hello")?;
196    ///         let response = session.read_to_string().await?;
197    ///         Ok(())
198    ///     })?;
199    /// // Returns immediately, session runs in background
200    /// # Ok(())
201    /// # }).await?;
202    /// # Ok(())
203    /// # }
204    /// ```
205    ///
206    /// # Ordering
207    ///
208    /// This callback blocks the dispatch loop until the session starts and your
209    /// callback completes. See the [`ordering`](crate::concepts::ordering) module for details.
210    pub fn on_session_start<F, Fut>(self, op: F) -> Result<(), crate::Error>
211    where
212        R: 'static,
213        F: FnOnce(ActiveSession<'static, Counterpart>) -> Fut + Send + 'static,
214        Fut: Future<Output = Result<(), crate::Error>> + Send,
215    {
216        let Self {
217            connection,
218            request,
219            dynamic_handler_registrations,
220            run,
221            block_state: _,
222        } = self;
223
224        connection
225            .send_request_to(Agent, request)
226            .on_receiving_result({
227                let connection = connection.clone();
228                async move |result| {
229                    let response = result?;
230
231                    connection.spawn(run.run_with_connection_to(connection.clone()))?;
232
233                    let active_session =
234                        connection.attach_session(response, dynamic_handler_registrations)?;
235
236                    op(active_session).await
237                }
238            })
239    }
240
241    /// Spawn a proxy session and run a closure with the session ID.
242    ///
243    /// A **proxy session** starts the session with the agent and then automatically
244    /// proxies all session updates (prompts, tool calls, etc.) from the agent back
245    /// to the client. You don't need to handle any messages yourself - the proxy
246    /// takes care of forwarding everything. This is useful when you want to inject
247    /// and/or filter prompts coming from the client but otherwise not be involved
248    /// in the session.
249    ///
250    /// Unlike [`start_session_proxy`](Self::start_session_proxy), this method returns
251    /// immediately without blocking the current task. The session handshake, client
252    /// response, and proxy setup all happen in a spawned background task.
253    ///
254    /// The closure receives the `SessionId` once the session is established, allowing
255    /// you to perform any custom work with that ID (e.g., tracking, logging).
256    ///
257    /// # Example
258    ///
259    /// ```
260    /// # use agent_client_protocol::{Proxy, Client, Conductor, ConnectTo};
261    /// # use agent_client_protocol::schema::NewSessionRequest;
262    /// # use agent_client_protocol::mcp_server::McpServer;
263    /// # async fn example(transport: impl ConnectTo<Proxy>) -> Result<(), agent_client_protocol::Error> {
264    /// Proxy.builder()
265    ///     .on_receive_request_from(Client, async |request: NewSessionRequest, responder, cx| {
266    ///         let mcp = McpServer::<Conductor, _>::builder("tools").build();
267    ///         cx.build_session_from(request)
268    ///             .with_mcp_server(mcp)?
269    ///             .on_proxy_session_start(responder, async |session_id| {
270    ///                 // Session started
271    ///                 Ok(())
272    ///             })
273    ///     }, agent_client_protocol::on_receive_request!())
274    ///     .connect_to(transport)
275    ///     .await?;
276    /// # Ok(())
277    /// # }
278    /// ```
279    ///
280    /// # Ordering
281    ///
282    /// This callback blocks the dispatch loop until the session starts and your
283    /// callback completes. See the [`ordering`](crate::concepts::ordering) module for details.
284    pub fn on_proxy_session_start<F, Fut>(
285        self,
286        responder: Responder<NewSessionResponse>,
287        op: F,
288    ) -> Result<(), crate::Error>
289    where
290        F: FnOnce(SessionId) -> Fut + Send + 'static,
291        Fut: Future<Output = Result<(), crate::Error>> + Send,
292        Counterpart: HasPeer<Client>,
293        R: 'static,
294    {
295        let Self {
296            connection,
297            request,
298            dynamic_handler_registrations,
299            run,
300            block_state: _,
301        } = self;
302
303        // Spawn off the run and dynamic handlers to run indefinitely
304        connection.spawn(run.run_with_connection_to(connection.clone()))?;
305        dynamic_handler_registrations
306            .into_iter()
307            .for_each(super::jsonrpc::DynamicHandlerRegistration::run_indefinitely);
308
309        // Send the "new session" request to the agent
310        connection
311            .send_request_to(Agent, request)
312            .on_receiving_result({
313                let connection = connection.clone();
314                async move |result| {
315                    let response = result?;
316
317                    // Extract the session-id from the response and forward
318                    // the response back to the client
319                    let session_id = response.session_id.clone();
320                    responder.respond(response)?;
321
322                    // Install a dynamic handler to proxy messages from this session
323                    connection
324                        .add_dynamic_handler(ProxySessionMessages::new(session_id.clone()))?
325                        .run_indefinitely();
326
327                    op(session_id).await
328                }
329            })
330    }
331}
332
333impl<Counterpart, R> SessionBuilder<Counterpart, R, NonBlocking>
334where
335    Counterpart: HasPeer<Agent>,
336    R: RunWithConnectionTo<Counterpart>,
337{
338    /// Mark this session builder as being able to block the current task.
339    ///
340    /// After calling this, you can use [`run_until`](Self::run_until) or
341    /// [`start_session`](Self::start_session) which block the current task.
342    ///
343    /// This should not be used from inside a message handler like
344    /// [`Builder::on_receive_request`](`crate::Builder::on_receive_request`) or [`HandleDispatchFrom`]
345    /// implementations.
346    pub fn block_task(self) -> SessionBuilder<Counterpart, R, Blocking> {
347        SessionBuilder {
348            connection: self.connection,
349            request: self.request,
350            dynamic_handler_registrations: self.dynamic_handler_registrations,
351            run: self.run,
352            block_state: PhantomData,
353        }
354    }
355}
356
357impl<Counterpart, R> SessionBuilder<Counterpart, R, Blocking>
358where
359    Counterpart: HasPeer<Agent>,
360    R: RunWithConnectionTo<Counterpart>,
361{
362    /// Run this session synchronously. The current task will be blocked
363    /// and `op` will be executed with the active session information.
364    /// This is useful when you have MCP servers that are borrowed from your local
365    /// stack frame.
366    ///
367    /// The `ActiveSession` passed to `op` has a non-`'static` lifetime, which
368    /// prevents calling [`ActiveSession::proxy_remaining_messages`] (since the
369    /// responders would terminate when `op` returns).
370    ///
371    /// Requires calling [`block_task`](Self::block_task) first.
372    pub async fn run_until<T>(
373        self,
374        op: impl for<'responder> AsyncFnOnce(
375            ActiveSession<'responder, Counterpart>,
376        ) -> Result<T, crate::Error>,
377    ) -> Result<T, crate::Error> {
378        let Self {
379            connection,
380            request,
381            dynamic_handler_registrations,
382            run,
383            block_state: _,
384        } = self;
385
386        let response = connection
387            .send_request_to(Agent, request)
388            .block_task()
389            .await?;
390
391        let active_session = connection.attach_session(response, dynamic_handler_registrations)?;
392
393        run_until(
394            run.run_with_connection_to(connection.clone()),
395            op(active_session),
396        )
397        .await
398    }
399
400    /// Send the request to create the session and return a handle.
401    /// This is an alternative to [`Self::run_until`] that avoids rightward
402    /// drift but at the cost of requiring MCP servers that are `Send` and
403    /// don't access data from the surrounding scope.
404    ///
405    /// Returns an `ActiveSession<'static, _>` because responders are spawned
406    /// into background tasks that live for the connection lifetime.
407    ///
408    /// Requires calling [`block_task`](Self::block_task) first.
409    pub async fn start_session(self) -> Result<ActiveSession<'static, Counterpart>, crate::Error>
410    where
411        R: 'static,
412    {
413        let Self {
414            connection,
415            request,
416            dynamic_handler_registrations,
417            run,
418            block_state: _,
419        } = self;
420
421        let (active_session_tx, active_session_rx) = oneshot::channel();
422
423        connection.clone().spawn(async move {
424            let response = connection
425                .send_request_to(Agent, request)
426                .block_task()
427                .await?;
428
429            connection.spawn(run.run_with_connection_to(connection.clone()))?;
430
431            let active_session =
432                connection.attach_session(response, dynamic_handler_registrations)?;
433
434            active_session_tx
435                .send(active_session)
436                .map_err(|_| crate::Error::internal_error())?;
437
438            Ok(())
439        })?;
440
441        active_session_rx
442            .await
443            .map_err(|_| crate::Error::internal_error())
444    }
445
446    /// Start a proxy session that forwards all messages between client and agent.
447    ///
448    /// A **proxy session** starts the session with the agent and then automatically
449    /// proxies all session updates (prompts, tool calls, etc.) from the agent back
450    /// to the client. You don't need to handle any messages yourself - the proxy
451    /// takes care of forwarding everything. This is useful when you want to inject
452    /// and/or filter prompts coming from the client but otherwise not be involved
453    /// in the session.
454    ///
455    /// This is a convenience method that combines [`start_session`](Self::start_session),
456    /// responding to the client, and [`ActiveSession::proxy_remaining_messages`].
457    ///
458    /// For more control (e.g., to send some messages before proxying), use
459    /// [`start_session`](Self::start_session) instead and call
460    /// [`proxy_remaining_messages`](ActiveSession::proxy_remaining_messages) manually.
461    ///
462    /// Requires calling [`block_task`](Self::block_task) first.
463    pub async fn start_session_proxy(
464        self,
465        responder: Responder<NewSessionResponse>,
466    ) -> Result<SessionId, crate::Error>
467    where
468        Counterpart: HasPeer<Client>,
469        R: 'static,
470    {
471        let active_session = self.start_session().await?;
472        let session_id = active_session.session_id().clone();
473        responder.respond(active_session.response())?;
474        active_session.proxy_remaining_messages()?;
475        Ok(session_id)
476    }
477}
478
479/// Active session struct that lets you send prompts and receive updates.
480///
481/// The `'responder` lifetime represents the span during which responders
482/// (e.g., MCP server handlers) are active. When created via [`SessionBuilder::start_session`],
483/// this is `'static` because responders are spawned into background tasks.
484/// When created via [`SessionBuilder::run_until`], this is tied to the
485/// closure scope, preventing [`Self::proxy_remaining_messages`] from being called
486/// (since the responders would die when the closure returns).
487#[derive(Debug)]
488pub struct ActiveSession<'responder, Link>
489where
490    Link: HasPeer<Agent>,
491{
492    session_id: SessionId,
493    update_rx: mpsc::UnboundedReceiver<SessionMessage>,
494    update_tx: mpsc::UnboundedSender<SessionMessage>,
495    modes: Option<SessionModeState>,
496    meta: Option<serde_json::Map<String, serde_json::Value>>,
497    connection: ConnectionTo<Link>,
498
499    /// Registration for the handler that routes session messages to `update_rx`.
500    /// This is separate from MCP handlers so it can be dropped independently
501    /// when switching to proxy mode.
502    session_handler_registration: DynamicHandlerRegistration<Link>,
503
504    /// Registrations for MCP server handlers.
505    /// These will be dropped once the active-session struct is dropped
506    /// which will cause them to be deregistered.
507    mcp_handler_registrations: Vec<DynamicHandlerRegistration<Link>>,
508
509    /// Phantom lifetime representing the responder lifetime.
510    _responder: PhantomData<&'responder ()>,
511}
512
513/// Incoming message from the agent
514#[non_exhaustive]
515#[derive(Debug)]
516pub enum SessionMessage {
517    /// Periodic updates with new content, tool requests, etc.
518    /// Use [`MatchDispatch`] to match on the message type.
519    SessionMessage(Dispatch),
520
521    /// When a prompt completes, the stop reason.
522    StopReason(StopReason),
523}
524
525impl<Link> ActiveSession<'_, Link>
526where
527    Link: HasPeer<Agent>,
528{
529    /// Access the session ID.
530    pub fn session_id(&self) -> &SessionId {
531        &self.session_id
532    }
533
534    /// Access modes available in this session.
535    pub fn modes(&self) -> &Option<SessionModeState> {
536        &self.modes
537    }
538
539    /// Access meta data from session response.
540    pub fn meta(&self) -> &Option<serde_json::Map<String, serde_json::Value>> {
541        &self.meta
542    }
543
544    /// Build a `NewSessionResponse` from the session information.
545    ///
546    /// Useful when you need to forward the session response to a client
547    /// after doing some processing.
548    pub fn response(&self) -> NewSessionResponse {
549        NewSessionResponse::new(self.session_id.clone())
550            .modes(self.modes.clone())
551            .meta(self.meta.clone())
552    }
553
554    /// Access the underlying connection context used to communicate with the agent.
555    pub fn connection(&self) -> ConnectionTo<Link> {
556        self.connection.clone()
557    }
558
559    /// Send a prompt to the agent. You can then read messages sent in response.
560    pub fn send_prompt(&mut self, prompt: impl ToString) -> Result<(), crate::Error> {
561        let update_tx = self.update_tx.clone();
562        self.connection
563            .send_request_to(
564                Agent,
565                PromptRequest::new(self.session_id.clone(), vec![prompt.to_string().into()]),
566            )
567            .on_receiving_result(async move |result| {
568                let PromptResponse {
569                    stop_reason,
570                    meta: _,
571                    ..
572                } = result?;
573
574                update_tx
575                    .unbounded_send(SessionMessage::StopReason(stop_reason))
576                    .map_err(crate::util::internal_error)?;
577
578                Ok(())
579            })
580    }
581
582    /// Read an update from the agent in response to the prompt.
583    pub async fn read_update(&mut self) -> Result<SessionMessage, crate::Error> {
584        use futures::StreamExt;
585        let message =
586            self.update_rx.next().await.ok_or_else(|| {
587                crate::util::internal_error("session channel closed unexpectedly")
588            })?;
589
590        Ok(message)
591    }
592
593    /// Read all updates until the end of the turn and create a string.
594    /// Ignores non-text updates.
595    pub async fn read_to_string(&mut self) -> Result<String, crate::Error> {
596        let mut output = String::new();
597        loop {
598            let update = self.read_update().await?;
599            tracing::trace!(?update, "read_to_string update");
600            match update {
601                SessionMessage::SessionMessage(dispatch) => MatchDispatch::new(dispatch)
602                    .if_notification(async |notif: SessionNotification| match notif.update {
603                        SessionUpdate::AgentMessageChunk(ContentChunk {
604                            content: ContentBlock::Text(text),
605                            meta: _,
606                            ..
607                        }) => {
608                            output.push_str(&text.text);
609                            Ok(())
610                        }
611                        _ => Ok(()),
612                    })
613                    .await
614                    .otherwise_ignore()?,
615                SessionMessage::StopReason(_stop_reason) => break,
616            }
617        }
618        Ok(output)
619    }
620}
621
622impl<Link> ActiveSession<'static, Link>
623where
624    Link: HasPeer<Agent>,
625{
626    /// Proxy all remaining messages for this session between client and agent.
627    ///
628    /// Use this when you want to inject MCP servers into a session but don't need
629    /// to actively interact with it after setup. The session messages will be proxied
630    /// between client and agent automatically.
631    ///
632    /// This consumes the `ActiveSession` since you're giving up active control.
633    ///
634    /// This method is only available on `ActiveSession<'static, _>` (from
635    /// [`SessionBuilder::start_session`]) because it requires responders to
636    /// outlive the method call.
637    ///
638    /// # Message Ordering Guarantees
639    ///
640    /// This method ensures proper handoff from active session mode to proxy mode
641    /// without losing or reordering messages:
642    ///
643    /// 1. **Stop the session handler** - Drop the registration that routes messages
644    ///    to `update_rx`. After this, no new messages will be queued.
645    /// 2. **Close the channel** - Drop `update_tx` so we can detect when the channel
646    ///    is fully drained.
647    /// 3. **Drain queued messages** - Forward any messages that were already queued
648    ///    in `update_rx` to the client, preserving order.
649    /// 4. **Install proxy handler** - Now that all queued messages are forwarded,
650    ///    install the proxy handler to handle future messages.
651    ///
652    /// This sequence prevents the race condition where messages could be delivered
653    /// out of order or lost during the transition.
654    pub fn proxy_remaining_messages(self) -> Result<(), crate::Error>
655    where
656        Link: HasPeer<Client>,
657    {
658        // Destructure self to get ownership of all fields
659        let ActiveSession {
660            session_id,
661            mut update_rx,
662            update_tx,
663            connection,
664            session_handler_registration,
665            mcp_handler_registrations,
666            // These fields are not needed for proxying
667            modes: _,
668            meta: _,
669            _responder,
670        } = self;
671
672        // Step 1: Drop the session handler registration.
673        // This unregisters the handler that was routing messages to update_rx.
674        // After this point, no new messages will be added to the channel.
675        drop(session_handler_registration);
676
677        // Step 2: Drop the sender side of the channel.
678        // This allows us to detect when the channel is fully drained
679        // (recv will return None when empty and sender is dropped).
680        drop(update_tx);
681
682        // Step 3: Drain any messages that were already queued and forward to client.
683        // These messages arrived before we dropped the handler but haven't been
684        // consumed yet. We must forward them to maintain message ordering.
685        while let Ok(message) = update_rx.try_recv() {
686            match message {
687                SessionMessage::SessionMessage(dispatch) => {
688                    // Forward the message to the client
689                    connection.send_proxied_message_to(Client, dispatch)?;
690                }
691                SessionMessage::StopReason(_) => {
692                    // StopReason is internal bookkeeping, not forwarded
693                }
694            }
695        }
696
697        // Step 4: Install the proxy handler for future messages.
698        // Now that all queued messages have been forwarded, the proxy handler
699        // can take over. Any new messages will go directly through the proxy.
700        connection
701            .add_dynamic_handler(ProxySessionMessages::new(session_id))?
702            .run_indefinitely();
703
704        // Keep MCP server handlers alive for the lifetime of the proxy
705        for registration in mcp_handler_registrations {
706            registration.run_indefinitely();
707        }
708
709        Ok(())
710    }
711}
712
713struct ActiveSessionHandler {
714    session_id: SessionId,
715    update_tx: mpsc::UnboundedSender<SessionMessage>,
716}
717
718impl ActiveSessionHandler {
719    pub fn new(session_id: SessionId, update_tx: mpsc::UnboundedSender<SessionMessage>) -> Self {
720        Self {
721            session_id,
722            update_tx,
723        }
724    }
725}
726
727impl<Counterpart: Role> HandleDispatchFrom<Counterpart> for ActiveSessionHandler
728where
729    Counterpart: HasPeer<Agent>,
730{
731    async fn handle_dispatch_from(
732        &mut self,
733        message: Dispatch,
734        cx: ConnectionTo<Counterpart>,
735    ) -> Result<Handled<Dispatch>, crate::Error> {
736        // If this is a message for our session, grab it.
737        tracing::trace!(
738            ?message,
739            handler_session_id = ?self.session_id,
740            "ActiveSessionHandler::handle_dispatch"
741        );
742        MatchDispatchFrom::new(message, &cx)
743            .if_message_from(Agent, async |message| {
744                if let Some(session_id) = message.get_session_id()? {
745                    tracing::trace!(
746                        message_session_id = ?session_id,
747                        handler_session_id = ?self.session_id,
748                        "ActiveSessionHandler::handle_dispatch"
749                    );
750                    if session_id == self.session_id {
751                        self.update_tx
752                            .unbounded_send(SessionMessage::SessionMessage(message))
753                            .map_err(crate::util::internal_error)?;
754                        return Ok(Handled::Yes);
755                    }
756                }
757
758                // Otherwise, pass it through.
759                Ok(Handled::No {
760                    message,
761                    retry: false,
762                })
763            })
764            .await
765            .done()
766    }
767
768    fn describe_chain(&self) -> impl std::fmt::Debug {
769        format!("ActiveSessionHandler({})", self.session_id)
770    }
771}