Skip to main content

agent_client_protocol/
session.rs

1use std::{future::Future, marker::PhantomData, path::Path};
2
3use agent_client_protocol_schema::{
4    ContentBlock, ContentChunk, NewSessionRequest, NewSessionResponse, PromptRequest,
5    PromptResponse, SessionModeState, SessionNotification, SessionUpdate, StopReason,
6};
7use futures::channel::{mpsc, oneshot};
8
9use crate::{
10    Agent, Client, ConnectionTo, Dispatch, HandleDispatchFrom, Handled, Responder, Role,
11    jsonrpc::{
12        DynamicHandlerRegistration,
13        run::{ChainRun, NullRun, RunWithConnectionTo},
14    },
15    mcp_server::McpServer,
16    role::{HasPeer, acp::ProxySessionMessages},
17    schema::SessionId,
18    util::{MatchDispatch, MatchDispatchFrom, run_until},
19};
20
21/// Marker type indicating the session builder will block the current task.
22#[derive(Debug)]
23pub struct Blocking;
24impl SessionBlockState for Blocking {}
25
26/// Marker type indicating the session builder will not block the current task.
27#[derive(Debug)]
28pub struct NonBlocking;
29impl SessionBlockState for NonBlocking {}
30
31/// Trait for marker types that indicate blocking vs blocking API.
32/// See [`SessionBuilder::block_task`].
33pub trait SessionBlockState: Send + 'static + Sync + std::fmt::Debug {}
34
35impl<Counterpart: Role> ConnectionTo<Counterpart>
36where
37    Counterpart: HasPeer<Agent>,
38{
39    /// Session builder for a new session request.
40    pub fn build_session(&self, cwd: impl AsRef<Path>) -> SessionBuilder<Counterpart, NullRun> {
41        SessionBuilder::new(self, NewSessionRequest::new(cwd.as_ref()))
42    }
43
44    /// Session builder using the current working directory.
45    ///
46    /// This is a convenience wrapper around [`build_session`](Self::build_session)
47    /// that uses [`std::env::current_dir`] to get the working directory.
48    ///
49    /// Returns an error if the current directory cannot be determined.
50    pub fn build_session_cwd(&self) -> Result<SessionBuilder<Counterpart, NullRun>, crate::Error> {
51        let cwd = std::env::current_dir().map_err(|e| {
52            crate::Error::internal_error().data(format!("cannot get current directory: {e}"))
53        })?;
54        Ok(self.build_session(cwd))
55    }
56
57    /// Session builder starting from an existing request.
58    ///
59    /// Use this when you've intercepted a `session.new` request and want to
60    /// modify it (e.g., inject MCP servers) before forwarding.
61    pub fn build_session_from(
62        &self,
63        request: NewSessionRequest,
64    ) -> SessionBuilder<Counterpart, NullRun> {
65        SessionBuilder::new(self, request)
66    }
67
68    /// Given a session response received from the agent,
69    /// attach a handler to process messages related to this session
70    /// and let you access them.
71    ///
72    /// Normally you would not use this method directly but would
73    /// instead use [`Self::build_session`] and then [`SessionBuilder::start_session`].
74    ///
75    /// The vector `dynamic_handler_registrations` contains any dynamic
76    /// handle registrations associated with this session (e.g., from MCP servers).
77    /// You can simply pass `Default::default()` if not applicable.
78    pub fn attach_session<'responder>(
79        &self,
80        response: NewSessionResponse,
81        mcp_handler_registrations: Vec<DynamicHandlerRegistration<Counterpart>>,
82    ) -> Result<ActiveSession<'responder, Counterpart>, crate::Error> {
83        let NewSessionResponse {
84            session_id,
85            modes,
86            meta,
87            ..
88        } = response;
89
90        let (update_tx, update_rx) = mpsc::unbounded();
91        let handler = ActiveSessionHandler::new(session_id.clone(), update_tx.clone());
92        let session_handler_registration = self.add_dynamic_handler(handler)?;
93
94        Ok(ActiveSession {
95            session_id,
96            modes,
97            meta,
98            update_rx,
99            update_tx,
100            connection: self.clone(),
101            session_handler_registration,
102            mcp_handler_registrations,
103            _responder: PhantomData,
104        })
105    }
106}
107
108/// Session builder for a new session request.
109/// Allows you to add MCP servers or set other details for this session.
110///
111/// The `BlockState` type parameter tracks whether blocking methods are available:
112/// - `NonBlocking` (default): Only [`on_session_start`](Self::on_session_start) is available
113/// - `Blocking` (after calling [`block_task`](Self::block_task)):
114///   [`run_until`](Self::run_until) and [`start_session`](Self::start_session) become available
115#[must_use = "use `start_session`, `run_until`, or `on_session_start` to start the session"]
116#[derive(Debug)]
117pub struct SessionBuilder<
118    Counterpart,
119    Run: RunWithConnectionTo<Counterpart> = NullRun,
120    BlockState: SessionBlockState = NonBlocking,
121> where
122    Counterpart: HasPeer<Agent>,
123{
124    connection: ConnectionTo<Counterpart>,
125    request: NewSessionRequest,
126    dynamic_handler_registrations: Vec<DynamicHandlerRegistration<Counterpart>>,
127    run: Run,
128    block_state: PhantomData<BlockState>,
129}
130
131impl<Counterpart> SessionBuilder<Counterpart, NullRun, NonBlocking>
132where
133    Counterpart: HasPeer<Agent>,
134{
135    fn new(connection: &ConnectionTo<Counterpart>, request: NewSessionRequest) -> Self {
136        SessionBuilder {
137            connection: connection.clone(),
138            request,
139            dynamic_handler_registrations: Vec::default(),
140            run: NullRun,
141            block_state: PhantomData,
142        }
143    }
144}
145
146impl<Counterpart, R, BlockState> SessionBuilder<Counterpart, R, BlockState>
147where
148    Counterpart: HasPeer<Agent>,
149    R: RunWithConnectionTo<Counterpart>,
150    BlockState: SessionBlockState,
151{
152    /// Add the MCP servers from the given registry to this session.
153    pub fn with_mcp_server<McpRun>(
154        mut self,
155        mcp_server: McpServer<Counterpart, McpRun>,
156    ) -> Result<SessionBuilder<Counterpart, ChainRun<R, McpRun>, BlockState>, crate::Error>
157    where
158        McpRun: RunWithConnectionTo<Counterpart>,
159    {
160        let (handler, mcp_run) = mcp_server.into_handler_and_responder();
161        self.dynamic_handler_registrations
162            .push(handler.into_dynamic_handler(&mut self.request, &self.connection)?);
163        Ok(SessionBuilder {
164            connection: self.connection,
165            request: self.request,
166            dynamic_handler_registrations: self.dynamic_handler_registrations,
167            run: ChainRun::new(self.run, mcp_run),
168            block_state: self.block_state,
169        })
170    }
171
172    /// Spawn a task that runs the provided closure once the session starts.
173    ///
174    /// Unlike [`start_session`](Self::start_session), this method returns immediately
175    /// without blocking the current task. The session handshake and closure execution
176    /// happen in a spawned background task.
177    ///
178    /// The closure receives an `ActiveSession<'static, _>` and should return
179    /// `Result<(), Error>`. If the closure returns an error, it will propagate
180    /// to the connection's error handling.
181    ///
182    /// # Example
183    ///
184    /// ```ignore
185    /// # use agent_client_protocol::{Client, Agent, ConnectTo};
186    /// # use agent_client_protocol::mcp_server::McpServer;
187    /// # use agent_client_protocol_rmcp::McpServerExt;
188    /// # async fn example(transport: impl ConnectTo<Client>) -> Result<(), agent_client_protocol::Error> {
189    /// # Client.builder().connect_with(transport, async |cx| {
190    /// # let mcp = McpServer::<Agent, _>::builder("tools").build();
191    /// cx.build_session_cwd()?
192    ///     .with_mcp_server(mcp)?
193    ///     .on_session_start(async |mut session| {
194    ///         // Do something with the session
195    ///         session.send_prompt("Hello")?;
196    ///         let response = session.read_to_string().await?;
197    ///         Ok(())
198    ///     })?;
199    /// // Returns immediately, session runs in background
200    /// # Ok(())
201    /// # }).await?;
202    /// # Ok(())
203    /// # }
204    /// ```
205    ///
206    /// # Ordering
207    ///
208    /// This callback blocks the dispatch loop until the session starts and your
209    /// callback completes. See the [`ordering`](crate::concepts::ordering) module for details.
210    pub fn on_session_start<F, Fut>(self, op: F) -> Result<(), crate::Error>
211    where
212        R: 'static,
213        F: FnOnce(ActiveSession<'static, Counterpart>) -> Fut + Send + 'static,
214        Fut: Future<Output = Result<(), crate::Error>> + Send,
215    {
216        let Self {
217            connection,
218            request,
219            dynamic_handler_registrations,
220            run,
221            block_state: _,
222        } = self;
223
224        connection
225            .send_request_to(Agent, request)
226            .on_receiving_result({
227                let connection = connection.clone();
228                async move |result| {
229                    let response = result?;
230
231                    connection.spawn(run.run_with_connection_to(connection.clone()))?;
232
233                    let active_session =
234                        connection.attach_session(response, dynamic_handler_registrations)?;
235
236                    op(active_session).await
237                }
238            })
239    }
240
241    /// Spawn a proxy session and run a closure with the session ID.
242    ///
243    /// A **proxy session** starts the session with the agent and then automatically
244    /// proxies all session updates (prompts, tool calls, etc.) from the agent back
245    /// to the client. You don't need to handle any messages yourself - the proxy
246    /// takes care of forwarding everything. This is useful when you want to inject
247    /// and/or filter prompts coming from the client but otherwise not be involved
248    /// in the session.
249    ///
250    /// Unlike [`start_session_proxy`](Self::start_session_proxy), this method returns
251    /// immediately without blocking the current task. The session handshake, client
252    /// response, and proxy setup all happen in a spawned background task.
253    ///
254    /// The closure receives the `SessionId` once the session is established, allowing
255    /// you to perform any custom work with that ID (e.g., tracking, logging).
256    ///
257    /// # Example
258    ///
259    /// ```ignore
260    /// # use agent_client_protocol::{Proxy, Client, Conductor, ConnectTo};
261    /// # use agent_client_protocol::schema::NewSessionRequest;
262    /// # use agent_client_protocol::mcp_server::McpServer;
263    /// # use agent_client_protocol_rmcp::McpServerExt;
264    /// # async fn example(transport: impl ConnectTo<Proxy>) -> Result<(), agent_client_protocol::Error> {
265    /// Proxy.builder()
266    ///     .on_receive_request_from(Client, async |request: NewSessionRequest, responder, cx| {
267    ///         let mcp = McpServer::<Conductor, _>::builder("tools").build();
268    ///         cx.build_session_from(request)
269    ///             .with_mcp_server(mcp)?
270    ///             .on_proxy_session_start(responder, async |session_id| {
271    ///                 // Session started
272    ///                 Ok(())
273    ///             })
274    ///     }, agent_client_protocol::on_receive_request!())
275    ///     .connect_to(transport)
276    ///     .await?;
277    /// # Ok(())
278    /// # }
279    /// ```
280    ///
281    /// # Ordering
282    ///
283    /// This callback blocks the dispatch loop until the session starts and your
284    /// callback completes. See the [`ordering`](crate::concepts::ordering) module for details.
285    pub fn on_proxy_session_start<F, Fut>(
286        self,
287        responder: Responder<NewSessionResponse>,
288        op: F,
289    ) -> Result<(), crate::Error>
290    where
291        F: FnOnce(SessionId) -> Fut + Send + 'static,
292        Fut: Future<Output = Result<(), crate::Error>> + Send,
293        Counterpart: HasPeer<Client>,
294        R: 'static,
295    {
296        let Self {
297            connection,
298            request,
299            dynamic_handler_registrations,
300            run,
301            block_state: _,
302        } = self;
303
304        // Spawn off the run and dynamic handlers to run indefinitely
305        connection.spawn(run.run_with_connection_to(connection.clone()))?;
306        dynamic_handler_registrations
307            .into_iter()
308            .for_each(super::jsonrpc::DynamicHandlerRegistration::run_indefinitely);
309
310        // Send the "new session" request to the agent
311        connection
312            .send_request_to(Agent, request)
313            .on_receiving_result({
314                let connection = connection.clone();
315                async move |result| {
316                    let response = result?;
317
318                    // Extract the session-id from the response and forward
319                    // the response back to the client
320                    let session_id = response.session_id.clone();
321                    responder.respond(response)?;
322
323                    // Install a dynamic handler to proxy messages from this session
324                    connection
325                        .add_dynamic_handler(ProxySessionMessages::new(session_id.clone()))?
326                        .run_indefinitely();
327
328                    op(session_id).await
329                }
330            })
331    }
332}
333
334impl<Counterpart, R> SessionBuilder<Counterpart, R, NonBlocking>
335where
336    Counterpart: HasPeer<Agent>,
337    R: RunWithConnectionTo<Counterpart>,
338{
339    /// Mark this session builder as being able to block the current task.
340    ///
341    /// After calling this, you can use [`run_until`](Self::run_until) or
342    /// [`start_session`](Self::start_session) which block the current task.
343    ///
344    /// This should not be used from inside a message handler like
345    /// [`Builder::on_receive_request`](`crate::Builder::on_receive_request`) or [`HandleDispatchFrom`]
346    /// implementations.
347    pub fn block_task(self) -> SessionBuilder<Counterpart, R, Blocking> {
348        SessionBuilder {
349            connection: self.connection,
350            request: self.request,
351            dynamic_handler_registrations: self.dynamic_handler_registrations,
352            run: self.run,
353            block_state: PhantomData,
354        }
355    }
356}
357
358impl<Counterpart, R> SessionBuilder<Counterpart, R, Blocking>
359where
360    Counterpart: HasPeer<Agent>,
361    R: RunWithConnectionTo<Counterpart>,
362{
363    /// Run this session synchronously. The current task will be blocked
364    /// and `op` will be executed with the active session information.
365    /// This is useful when you have MCP servers that are borrowed from your local
366    /// stack frame.
367    ///
368    /// The `ActiveSession` passed to `op` has a non-`'static` lifetime, which
369    /// prevents calling [`ActiveSession::proxy_remaining_messages`] (since the
370    /// responders would terminate when `op` returns).
371    ///
372    /// Requires calling [`block_task`](Self::block_task) first.
373    pub async fn run_until<T>(
374        self,
375        op: impl for<'responder> AsyncFnOnce(
376            ActiveSession<'responder, Counterpart>,
377        ) -> Result<T, crate::Error>,
378    ) -> Result<T, crate::Error> {
379        let Self {
380            connection,
381            request,
382            dynamic_handler_registrations,
383            run,
384            block_state: _,
385        } = self;
386
387        let response = connection
388            .send_request_to(Agent, request)
389            .block_task()
390            .await?;
391
392        let active_session = connection.attach_session(response, dynamic_handler_registrations)?;
393
394        run_until(
395            run.run_with_connection_to(connection.clone()),
396            op(active_session),
397        )
398        .await
399    }
400
401    /// Send the request to create the session and return a handle.
402    /// This is an alternative to [`Self::run_until`] that avoids rightward
403    /// drift but at the cost of requiring MCP servers that are `Send` and
404    /// don't access data from the surrounding scope.
405    ///
406    /// Returns an `ActiveSession<'static, _>` because responders are spawned
407    /// into background tasks that live for the connection lifetime.
408    ///
409    /// Requires calling [`block_task`](Self::block_task) first.
410    pub async fn start_session(self) -> Result<ActiveSession<'static, Counterpart>, crate::Error>
411    where
412        R: 'static,
413    {
414        let Self {
415            connection,
416            request,
417            dynamic_handler_registrations,
418            run,
419            block_state: _,
420        } = self;
421
422        let (active_session_tx, active_session_rx) = oneshot::channel();
423
424        connection.clone().spawn(async move {
425            let response = connection
426                .send_request_to(Agent, request)
427                .block_task()
428                .await?;
429
430            connection.spawn(run.run_with_connection_to(connection.clone()))?;
431
432            let active_session =
433                connection.attach_session(response, dynamic_handler_registrations)?;
434
435            active_session_tx
436                .send(active_session)
437                .map_err(|_| crate::Error::internal_error())?;
438
439            Ok(())
440        })?;
441
442        active_session_rx
443            .await
444            .map_err(|_| crate::Error::internal_error())
445    }
446
447    /// Start a proxy session that forwards all messages between client and agent.
448    ///
449    /// A **proxy session** starts the session with the agent and then automatically
450    /// proxies all session updates (prompts, tool calls, etc.) from the agent back
451    /// to the client. You don't need to handle any messages yourself - the proxy
452    /// takes care of forwarding everything. This is useful when you want to inject
453    /// and/or filter prompts coming from the client but otherwise not be involved
454    /// in the session.
455    ///
456    /// This is a convenience method that combines [`start_session`](Self::start_session),
457    /// responding to the client, and [`ActiveSession::proxy_remaining_messages`].
458    ///
459    /// For more control (e.g., to send some messages before proxying), use
460    /// [`start_session`](Self::start_session) instead and call
461    /// [`proxy_remaining_messages`](ActiveSession::proxy_remaining_messages) manually.
462    ///
463    /// Requires calling [`block_task`](Self::block_task) first.
464    pub async fn start_session_proxy(
465        self,
466        responder: Responder<NewSessionResponse>,
467    ) -> Result<SessionId, crate::Error>
468    where
469        Counterpart: HasPeer<Client>,
470        R: 'static,
471    {
472        let active_session = self.start_session().await?;
473        let session_id = active_session.session_id().clone();
474        responder.respond(active_session.response())?;
475        active_session.proxy_remaining_messages()?;
476        Ok(session_id)
477    }
478}
479
480/// Active session struct that lets you send prompts and receive updates.
481///
482/// The `'responder` lifetime represents the span during which responders
483/// (e.g., MCP server handlers) are active. When created via [`SessionBuilder::start_session`],
484/// this is `'static` because responders are spawned into background tasks.
485/// When created via [`SessionBuilder::run_until`], this is tied to the
486/// closure scope, preventing [`Self::proxy_remaining_messages`] from being called
487/// (since the responders would die when the closure returns).
488#[derive(Debug)]
489pub struct ActiveSession<'responder, Link>
490where
491    Link: HasPeer<Agent>,
492{
493    session_id: SessionId,
494    update_rx: mpsc::UnboundedReceiver<SessionMessage>,
495    update_tx: mpsc::UnboundedSender<SessionMessage>,
496    modes: Option<SessionModeState>,
497    meta: Option<serde_json::Map<String, serde_json::Value>>,
498    connection: ConnectionTo<Link>,
499
500    /// Registration for the handler that routes session messages to `update_rx`.
501    /// This is separate from MCP handlers so it can be dropped independently
502    /// when switching to proxy mode.
503    session_handler_registration: DynamicHandlerRegistration<Link>,
504
505    /// Registrations for MCP server handlers.
506    /// These will be dropped once the active-session struct is dropped
507    /// which will cause them to be deregistered.
508    mcp_handler_registrations: Vec<DynamicHandlerRegistration<Link>>,
509
510    /// Phantom lifetime representing the responder lifetime.
511    _responder: PhantomData<&'responder ()>,
512}
513
514/// Incoming message from the agent
515#[non_exhaustive]
516#[derive(Debug)]
517pub enum SessionMessage {
518    /// Periodic updates with new content, tool requests, etc.
519    /// Use [`MatchDispatch`] to match on the message type.
520    SessionMessage(Dispatch),
521
522    /// When a prompt completes, the stop reason.
523    StopReason(StopReason),
524}
525
526impl<Link> ActiveSession<'_, Link>
527where
528    Link: HasPeer<Agent>,
529{
530    /// Access the session ID.
531    pub fn session_id(&self) -> &SessionId {
532        &self.session_id
533    }
534
535    /// Access modes available in this session.
536    pub fn modes(&self) -> &Option<SessionModeState> {
537        &self.modes
538    }
539
540    /// Access meta data from session response.
541    pub fn meta(&self) -> &Option<serde_json::Map<String, serde_json::Value>> {
542        &self.meta
543    }
544
545    /// Build a `NewSessionResponse` from the session information.
546    ///
547    /// Useful when you need to forward the session response to a client
548    /// after doing some processing.
549    pub fn response(&self) -> NewSessionResponse {
550        NewSessionResponse::new(self.session_id.clone())
551            .modes(self.modes.clone())
552            .meta(self.meta.clone())
553    }
554
555    /// Access the underlying connection context used to communicate with the agent.
556    pub fn connection(&self) -> ConnectionTo<Link> {
557        self.connection.clone()
558    }
559
560    /// Send a prompt to the agent. You can then read messages sent in response.
561    pub fn send_prompt(&mut self, prompt: impl ToString) -> Result<(), crate::Error> {
562        let update_tx = self.update_tx.clone();
563        self.connection
564            .send_request_to(
565                Agent,
566                PromptRequest::new(self.session_id.clone(), vec![prompt.to_string().into()]),
567            )
568            .on_receiving_result(async move |result| {
569                let PromptResponse { stop_reason, .. } = result?;
570
571                update_tx
572                    .unbounded_send(SessionMessage::StopReason(stop_reason))
573                    .map_err(crate::util::internal_error)?;
574
575                Ok(())
576            })
577    }
578
579    /// Read an update from the agent in response to the prompt.
580    pub async fn read_update(&mut self) -> Result<SessionMessage, crate::Error> {
581        use futures::StreamExt;
582        let message =
583            self.update_rx.next().await.ok_or_else(|| {
584                crate::util::internal_error("session channel closed unexpectedly")
585            })?;
586
587        Ok(message)
588    }
589
590    /// Read all updates until the end of the turn and create a string.
591    /// Ignores non-text updates.
592    pub async fn read_to_string(&mut self) -> Result<String, crate::Error> {
593        let mut output = String::new();
594        loop {
595            let update = self.read_update().await?;
596            tracing::trace!(?update, "read_to_string update");
597            match update {
598                SessionMessage::SessionMessage(dispatch) => MatchDispatch::new(dispatch)
599                    .if_notification(async |notif: SessionNotification| match notif.update {
600                        SessionUpdate::AgentMessageChunk(ContentChunk {
601                            content: ContentBlock::Text(text),
602                            ..
603                        }) => {
604                            output.push_str(&text.text);
605                            Ok(())
606                        }
607                        _ => Ok(()),
608                    })
609                    .await
610                    .otherwise_ignore()?,
611                SessionMessage::StopReason(_stop_reason) => break,
612            }
613        }
614        Ok(output)
615    }
616}
617
618impl<Link> ActiveSession<'static, Link>
619where
620    Link: HasPeer<Agent>,
621{
622    /// Proxy all remaining messages for this session between client and agent.
623    ///
624    /// Use this when you want to inject MCP servers into a session but don't need
625    /// to actively interact with it after setup. The session messages will be proxied
626    /// between client and agent automatically.
627    ///
628    /// This consumes the `ActiveSession` since you're giving up active control.
629    ///
630    /// This method is only available on `ActiveSession<'static, _>` (from
631    /// [`SessionBuilder::start_session`]) because it requires responders to
632    /// outlive the method call.
633    ///
634    /// # Message Ordering Guarantees
635    ///
636    /// This method ensures proper handoff from active session mode to proxy mode
637    /// without losing or reordering messages:
638    ///
639    /// 1. **Stop the session handler** - Drop the registration that routes messages
640    ///    to `update_rx`. After this, no new messages will be queued.
641    /// 2. **Close the channel** - Drop `update_tx` so we can detect when the channel
642    ///    is fully drained.
643    /// 3. **Drain queued messages** - Forward any messages that were already queued
644    ///    in `update_rx` to the client, preserving order.
645    /// 4. **Install proxy handler** - Now that all queued messages are forwarded,
646    ///    install the proxy handler to handle future messages.
647    ///
648    /// This sequence prevents the race condition where messages could be delivered
649    /// out of order or lost during the transition.
650    pub fn proxy_remaining_messages(self) -> Result<(), crate::Error>
651    where
652        Link: HasPeer<Client>,
653    {
654        // Destructure self to get ownership of all fields
655        let ActiveSession {
656            session_id,
657            mut update_rx,
658            update_tx,
659            connection,
660            session_handler_registration,
661            mcp_handler_registrations,
662            // These fields are not needed for proxying
663            modes: _,
664            meta: _,
665            _responder,
666        } = self;
667
668        // Step 1: Drop the session handler registration.
669        // This unregisters the handler that was routing messages to update_rx.
670        // After this point, no new messages will be added to the channel.
671        drop(session_handler_registration);
672
673        // Step 2: Drop the sender side of the channel.
674        // This allows us to detect when the channel is fully drained
675        // (recv will return None when empty and sender is dropped).
676        drop(update_tx);
677
678        // Step 3: Drain any messages that were already queued and forward to client.
679        // These messages arrived before we dropped the handler but haven't been
680        // consumed yet. We must forward them to maintain message ordering.
681        while let Ok(message) = update_rx.try_recv() {
682            match message {
683                SessionMessage::SessionMessage(dispatch) => {
684                    // Forward the message to the client
685                    connection.send_proxied_message_to(Client, dispatch)?;
686                }
687                SessionMessage::StopReason(_) => {
688                    // StopReason is internal bookkeeping, not forwarded
689                }
690            }
691        }
692
693        // Step 4: Install the proxy handler for future messages.
694        // Now that all queued messages have been forwarded, the proxy handler
695        // can take over. Any new messages will go directly through the proxy.
696        connection
697            .add_dynamic_handler(ProxySessionMessages::new(session_id))?
698            .run_indefinitely();
699
700        // Keep MCP server handlers alive for the lifetime of the proxy
701        for registration in mcp_handler_registrations {
702            registration.run_indefinitely();
703        }
704
705        Ok(())
706    }
707}
708
709struct ActiveSessionHandler {
710    session_id: SessionId,
711    update_tx: mpsc::UnboundedSender<SessionMessage>,
712}
713
714impl ActiveSessionHandler {
715    pub fn new(session_id: SessionId, update_tx: mpsc::UnboundedSender<SessionMessage>) -> Self {
716        Self {
717            session_id,
718            update_tx,
719        }
720    }
721}
722
723impl<Counterpart: Role> HandleDispatchFrom<Counterpart> for ActiveSessionHandler
724where
725    Counterpart: HasPeer<Agent>,
726{
727    async fn handle_dispatch_from(
728        &mut self,
729        message: Dispatch,
730        cx: ConnectionTo<Counterpart>,
731    ) -> Result<Handled<Dispatch>, crate::Error> {
732        // If this is a message for our session, grab it.
733        tracing::trace!(
734            ?message,
735            handler_session_id = ?self.session_id,
736            "ActiveSessionHandler::handle_dispatch"
737        );
738        MatchDispatchFrom::new(message, &cx)
739            .if_message_from(Agent, async |message| {
740                if let Some(session_id) = message.get_session_id()? {
741                    tracing::trace!(
742                        message_session_id = ?session_id,
743                        handler_session_id = ?self.session_id,
744                        "ActiveSessionHandler::handle_dispatch"
745                    );
746                    if session_id == self.session_id {
747                        self.update_tx
748                            .unbounded_send(SessionMessage::SessionMessage(message))
749                            .map_err(crate::util::internal_error)?;
750                        return Ok(Handled::Yes);
751                    }
752                }
753
754                // Otherwise, pass it through.
755                Ok(Handled::No {
756                    message,
757                    retry: false,
758                })
759            })
760            .await
761            .done()
762    }
763
764    fn describe_chain(&self) -> impl std::fmt::Debug {
765        format!("ActiveSessionHandler({})", self.session_id)
766    }
767}