Skip to main content

agent_client_protocol/
session.rs

1use std::{future::Future, marker::PhantomData, path::Path};
2
3use futures::channel::{mpsc, oneshot};
4
5use crate::{
6    Agent, Client, ConnectionTo, Dispatch, HandleDispatchFrom, Handled, Responder, Role,
7    jsonrpc::{
8        DynamicHandlerRegistration,
9        run::{ChainRun, NullRun, RunWithConnectionTo},
10    },
11    mcp_server::McpServer,
12    role::{HasPeer, acp::ProxySessionMessages},
13    schema::v1::{
14        ContentBlock, ContentChunk, NewSessionRequest, NewSessionResponse, PromptRequest,
15        PromptResponse, SessionId, SessionModeState, SessionNotification, SessionUpdate,
16        StopReason,
17    },
18    util::{MatchDispatch, MatchDispatchFrom, run_until},
19};
20
21/// Marker type indicating the session builder will block the current task.
22#[derive(Debug)]
23pub struct Blocking;
24impl SessionBlockState for Blocking {}
25
26/// Marker type indicating the session builder will not block the current task.
27#[derive(Debug)]
28pub struct NonBlocking;
29impl SessionBlockState for NonBlocking {}
30
31/// Trait for marker types that indicate blocking vs blocking API.
32/// See [`SessionBuilder::block_task`].
33pub trait SessionBlockState: Send + 'static + Sync + std::fmt::Debug {}
34
35impl<Counterpart: Role> ConnectionTo<Counterpart>
36where
37    Counterpart: HasPeer<Agent>,
38{
39    /// Session builder for a new session request.
40    pub fn build_session(&self, cwd: impl AsRef<Path>) -> SessionBuilder<Counterpart, NullRun> {
41        SessionBuilder::new(self, NewSessionRequest::new(cwd.as_ref()))
42    }
43
44    /// Session builder using the current working directory.
45    ///
46    /// This is a convenience wrapper around [`build_session`](Self::build_session)
47    /// that uses [`std::env::current_dir`] to get the working directory.
48    ///
49    /// Returns an error if the current directory cannot be determined.
50    pub fn build_session_cwd(&self) -> Result<SessionBuilder<Counterpart, NullRun>, crate::Error> {
51        let cwd = std::env::current_dir().map_err(|e| {
52            crate::Error::internal_error().data(format!("cannot get current directory: {e}"))
53        })?;
54        Ok(self.build_session(cwd))
55    }
56
57    /// Session builder starting from an existing request.
58    ///
59    /// Use this when you've intercepted a `session.new` request and want to
60    /// modify it (e.g., inject MCP servers) before forwarding.
61    pub fn build_session_from(
62        &self,
63        request: NewSessionRequest,
64    ) -> SessionBuilder<Counterpart, NullRun> {
65        SessionBuilder::new(self, request)
66    }
67
68    /// Given a session response received from the agent,
69    /// attach a handler to process messages related to this session
70    /// and let you access them.
71    ///
72    /// Normally you would not use this method directly but would
73    /// instead use [`Self::build_session`] and then [`SessionBuilder::start_session`].
74    ///
75    /// The vector `dynamic_handler_registrations` contains any dynamic
76    /// handle registrations associated with this session (e.g., from MCP servers).
77    /// You can simply pass `Default::default()` if not applicable.
78    pub fn attach_session<'responder>(
79        &self,
80        response: NewSessionResponse,
81        mcp_handler_registrations: Vec<DynamicHandlerRegistration<Counterpart>>,
82    ) -> Result<ActiveSession<'responder, Counterpart>, crate::Error> {
83        let NewSessionResponse {
84            session_id,
85            modes,
86            meta,
87            ..
88        } = response;
89
90        let (update_tx, update_rx) = mpsc::unbounded();
91        let handler = ActiveSessionHandler::new(session_id.clone(), update_tx.clone());
92        let session_handler_registration = self.add_dynamic_handler(handler)?;
93
94        Ok(ActiveSession {
95            session_id,
96            modes,
97            meta,
98            update_rx,
99            update_tx,
100            connection: self.clone(),
101            session_handler_registration,
102            mcp_handler_registrations,
103            _responder: PhantomData,
104        })
105    }
106}
107
108/// Session builder for a new session request.
109/// Allows you to add MCP servers or set other details for this session.
110///
111/// The `BlockState` type parameter tracks whether blocking methods are available:
112/// - `NonBlocking` (default): Only [`on_session_start`](Self::on_session_start) is available
113/// - `Blocking` (after calling [`block_task`](Self::block_task)):
114///   [`run_until`](Self::run_until) and [`start_session`](Self::start_session) become available
115#[must_use = "use `start_session`, `run_until`, or `on_session_start` to start the session"]
116#[derive(Debug)]
117pub struct SessionBuilder<
118    Counterpart,
119    Run: RunWithConnectionTo<Counterpart> = NullRun,
120    BlockState: SessionBlockState = NonBlocking,
121> where
122    Counterpart: HasPeer<Agent>,
123{
124    connection: ConnectionTo<Counterpart>,
125    request: NewSessionRequest,
126    dynamic_handler_registrations: Vec<DynamicHandlerRegistration<Counterpart>>,
127    run: Run,
128    block_state: PhantomData<BlockState>,
129}
130
131impl<Counterpart> SessionBuilder<Counterpart, NullRun, NonBlocking>
132where
133    Counterpart: HasPeer<Agent>,
134{
135    fn new(connection: &ConnectionTo<Counterpart>, request: NewSessionRequest) -> Self {
136        SessionBuilder {
137            connection: connection.clone(),
138            request,
139            dynamic_handler_registrations: Vec::default(),
140            run: NullRun,
141            block_state: PhantomData,
142        }
143    }
144}
145
146impl<Counterpart, R, BlockState> SessionBuilder<Counterpart, R, BlockState>
147where
148    Counterpart: HasPeer<Agent>,
149    R: RunWithConnectionTo<Counterpart>,
150    BlockState: SessionBlockState,
151{
152    /// Add the MCP servers from the given registry to this session.
153    pub fn with_mcp_server<McpRun>(
154        mut self,
155        mcp_server: McpServer<Counterpart, McpRun>,
156    ) -> Result<SessionBuilder<Counterpart, ChainRun<R, McpRun>, BlockState>, crate::Error>
157    where
158        McpRun: RunWithConnectionTo<Counterpart>,
159    {
160        let (handler, mcp_run) = mcp_server.into_handler_and_responder();
161        self.dynamic_handler_registrations
162            .push(handler.into_dynamic_handler(&mut self.request, &self.connection)?);
163        Ok(SessionBuilder {
164            connection: self.connection,
165            request: self.request,
166            dynamic_handler_registrations: self.dynamic_handler_registrations,
167            run: ChainRun::new(self.run, mcp_run),
168            block_state: self.block_state,
169        })
170    }
171
172    /// Spawn a task that runs the provided closure once the session starts.
173    ///
174    /// Unlike [`start_session`](Self::start_session), this method returns immediately
175    /// without blocking the current task. The session handshake and closure execution
176    /// happen in a spawned background task.
177    ///
178    /// The closure receives an `ActiveSession<'static, _>` and should return
179    /// `Result<(), Error>`. If the closure returns an error, it will propagate
180    /// to the connection's error handling.
181    ///
182    /// # Example
183    ///
184    /// ```ignore
185    /// # use agent_client_protocol::{Client, Agent, ConnectTo};
186    /// # use agent_client_protocol::mcp_server::McpServer;
187    /// # use agent_client_protocol_rmcp::McpServerExt;
188    /// # async fn example(transport: impl ConnectTo<Client>) -> Result<(), agent_client_protocol::Error> {
189    /// # Client.builder().connect_with(transport, async |cx| {
190    /// # let mcp = McpServer::<Agent, _>::builder("tools").build();
191    /// cx.build_session_cwd()?
192    ///     .with_mcp_server(mcp)?
193    ///     .on_session_start(async |mut session| {
194    ///         // Do something with the session
195    ///         session.send_prompt("Hello")?;
196    ///         let response = session.read_to_string().await?;
197    ///         Ok(())
198    ///     })?;
199    /// // Returns immediately, session runs in background
200    /// # Ok(())
201    /// # }).await?;
202    /// # Ok(())
203    /// # }
204    /// ```
205    ///
206    /// # Ordering
207    ///
208    /// This callback blocks the dispatch loop until the session starts and your
209    /// callback completes. See the [`ordering`](crate::concepts::ordering) module for details.
210    pub fn on_session_start<F, Fut>(self, op: F) -> Result<(), crate::Error>
211    where
212        R: 'static,
213        F: FnOnce(ActiveSession<'static, Counterpart>) -> Fut + Send + 'static,
214        Fut: Future<Output = Result<(), crate::Error>> + Send,
215    {
216        let Self {
217            connection,
218            request,
219            dynamic_handler_registrations,
220            run,
221            block_state: _,
222        } = self;
223
224        connection
225            .send_request_to(Agent, request)
226            .on_receiving_result({
227                let connection = connection.clone();
228                async move |result| {
229                    let response = result?;
230
231                    connection.spawn(run.run_with_connection_to(connection.clone()))?;
232
233                    let active_session =
234                        connection.attach_session(response, dynamic_handler_registrations)?;
235
236                    op(active_session).await
237                }
238            })
239    }
240
241    /// Spawn a proxy session and run a closure with the session ID.
242    ///
243    /// A **proxy session** starts the session with the agent and then automatically
244    /// proxies all session updates (prompts, tool calls, etc.) from the agent back
245    /// to the client. You don't need to handle any messages yourself - the proxy
246    /// takes care of forwarding everything. This is useful when you want to inject
247    /// and/or filter prompts coming from the client but otherwise not be involved
248    /// in the session.
249    ///
250    /// Unlike [`start_session_proxy`](Self::start_session_proxy), this method returns
251    /// immediately without blocking the current task. The session handshake, client
252    /// response, and proxy setup all happen in a spawned background task.
253    ///
254    /// The closure receives the `SessionId` once the session is established, allowing
255    /// you to perform any custom work with that ID (e.g., tracking, logging).
256    ///
257    /// # Example
258    ///
259    /// ```ignore
260    /// # use agent_client_protocol::{Proxy, Client, Conductor, ConnectTo};
261    /// # use agent_client_protocol::schema::v1::NewSessionRequest;
262    /// # use agent_client_protocol::mcp_server::McpServer;
263    /// # use agent_client_protocol_rmcp::McpServerExt;
264    /// # async fn example(transport: impl ConnectTo<Proxy>) -> Result<(), agent_client_protocol::Error> {
265    /// Proxy.builder()
266    ///     .on_receive_request_from(Client, async |request: NewSessionRequest, responder, cx| {
267    ///         let mcp = McpServer::<Conductor, _>::builder("tools").build();
268    ///         cx.build_session_from(request)
269    ///             .with_mcp_server(mcp)?
270    ///             .on_proxy_session_start(responder, async |session_id| {
271    ///                 // Session started
272    ///                 Ok(())
273    ///             })
274    ///     }, agent_client_protocol::on_receive_request!())
275    ///     .connect_to(transport)
276    ///     .await?;
277    /// # Ok(())
278    /// # }
279    /// ```
280    ///
281    /// # Ordering
282    ///
283    /// This callback blocks the dispatch loop until the session starts and your
284    /// callback completes. See the [`ordering`](crate::concepts::ordering) module for details.
285    pub fn on_proxy_session_start<F, Fut>(
286        self,
287        responder: Responder<NewSessionResponse>,
288        op: F,
289    ) -> Result<(), crate::Error>
290    where
291        F: FnOnce(SessionId) -> Fut + Send + 'static,
292        Fut: Future<Output = Result<(), crate::Error>> + Send,
293        Counterpart: HasPeer<Client>,
294        R: 'static,
295    {
296        let Self {
297            connection,
298            request,
299            dynamic_handler_registrations,
300            run,
301            block_state: _,
302        } = self;
303
304        // Send the "new session" request to the agent.
305        let sent = connection.send_request_to(Agent, request);
306        #[cfg(feature = "unstable_cancel_request")]
307        let sent = sent.forward_cancellation_from(responder.cancellation());
308
309        sent.on_receiving_ok_result(responder, {
310            let connection = connection.clone();
311            async move |response, responder| {
312                // Extract the session-id from the response and forward
313                // the response back to the client
314                let session_id = response.session_id.clone();
315                responder.respond(response)?;
316
317                // Install a dynamic handler to proxy messages from this session
318                connection
319                    .add_dynamic_handler(ProxySessionMessages::new(session_id.clone()))?
320                    .run_indefinitely();
321
322                // Spawn off the run and dynamic handlers to run indefinitely
323                connection.spawn(run.run_with_connection_to(connection.clone()))?;
324                dynamic_handler_registrations
325                    .into_iter()
326                    .for_each(super::jsonrpc::DynamicHandlerRegistration::run_indefinitely);
327
328                op(session_id).await
329            }
330        })
331    }
332}
333
334impl<Counterpart, R> SessionBuilder<Counterpart, R, NonBlocking>
335where
336    Counterpart: HasPeer<Agent>,
337    R: RunWithConnectionTo<Counterpart>,
338{
339    /// Mark this session builder as being able to block the current task.
340    ///
341    /// After calling this, you can use [`run_until`](Self::run_until) or
342    /// [`start_session`](Self::start_session) which block the current task.
343    ///
344    /// This should not be used from inside a message handler like
345    /// [`Builder::on_receive_request`](`crate::Builder::on_receive_request`) or [`HandleDispatchFrom`]
346    /// implementations.
347    pub fn block_task(self) -> SessionBuilder<Counterpart, R, Blocking> {
348        SessionBuilder {
349            connection: self.connection,
350            request: self.request,
351            dynamic_handler_registrations: self.dynamic_handler_registrations,
352            run: self.run,
353            block_state: PhantomData,
354        }
355    }
356}
357
358impl<Counterpart, R> SessionBuilder<Counterpart, R, Blocking>
359where
360    Counterpart: HasPeer<Agent>,
361    R: RunWithConnectionTo<Counterpart>,
362{
363    /// Run this session synchronously. The current task will be blocked
364    /// and `op` will be executed with the active session information.
365    /// This is useful when you have MCP servers that are borrowed from your local
366    /// stack frame.
367    ///
368    /// The `ActiveSession` passed to `op` has a non-`'static` lifetime, which
369    /// prevents calling [`ActiveSession::proxy_remaining_messages`] (since the
370    /// responders would terminate when `op` returns).
371    ///
372    /// Requires calling [`block_task`](Self::block_task) first.
373    pub async fn run_until<T>(
374        self,
375        op: impl for<'responder> AsyncFnOnce(
376            ActiveSession<'responder, Counterpart>,
377        ) -> Result<T, crate::Error>,
378    ) -> Result<T, crate::Error> {
379        let Self {
380            connection,
381            request,
382            dynamic_handler_registrations,
383            run,
384            block_state: _,
385        } = self;
386
387        let response = connection
388            .send_request_to(Agent, request)
389            .block_task()
390            .await?;
391
392        let active_session = connection.attach_session(response, dynamic_handler_registrations)?;
393
394        run_until(
395            run.run_with_connection_to(connection.clone()),
396            op(active_session),
397        )
398        .await
399    }
400
401    /// Send the request to create the session and return a handle.
402    /// This is an alternative to [`Self::run_until`] that avoids rightward
403    /// drift but at the cost of requiring MCP servers that are `Send` and
404    /// don't access data from the surrounding scope.
405    ///
406    /// Returns an `ActiveSession<'static, _>` because responders are spawned
407    /// into background tasks that live for the connection lifetime.
408    ///
409    /// Requires calling [`block_task`](Self::block_task) first.
410    pub async fn start_session(self) -> Result<ActiveSession<'static, Counterpart>, crate::Error>
411    where
412        R: 'static,
413    {
414        let Self {
415            connection,
416            request,
417            dynamic_handler_registrations,
418            run,
419            block_state: _,
420        } = self;
421
422        let (active_session_tx, active_session_rx) = oneshot::channel();
423
424        connection.clone().spawn(async move {
425            let response = connection
426                .send_request_to(Agent, request)
427                .block_task()
428                .await?;
429
430            connection.spawn(run.run_with_connection_to(connection.clone()))?;
431
432            let active_session =
433                connection.attach_session(response, dynamic_handler_registrations)?;
434
435            active_session_tx
436                .send(active_session)
437                .map_err(|_| crate::Error::internal_error())?;
438
439            Ok(())
440        })?;
441
442        active_session_rx
443            .await
444            .map_err(|_| crate::Error::internal_error())
445    }
446
447    /// Start a proxy session that forwards all messages between client and agent.
448    ///
449    /// A **proxy session** starts the session with the agent and then automatically
450    /// proxies all session updates (prompts, tool calls, etc.) from the agent back
451    /// to the client. You don't need to handle any messages yourself - the proxy
452    /// takes care of forwarding everything. This is useful when you want to inject
453    /// and/or filter prompts coming from the client but otherwise not be involved
454    /// in the session.
455    ///
456    /// This is a convenience method that combines [`start_session`](Self::start_session),
457    /// responding to the client, and [`ActiveSession::proxy_remaining_messages`].
458    ///
459    /// For more control (e.g., to send some messages before proxying), use
460    /// [`start_session`](Self::start_session) instead and call
461    /// [`proxy_remaining_messages`](ActiveSession::proxy_remaining_messages) manually.
462    ///
463    /// Requires calling [`block_task`](Self::block_task) first.
464    pub async fn start_session_proxy(
465        self,
466        responder: Responder<NewSessionResponse>,
467    ) -> Result<SessionId, crate::Error>
468    where
469        Counterpart: HasPeer<Client>,
470        R: 'static,
471    {
472        let active_session = self.start_session().await?;
473        let session_id = active_session.session_id().clone();
474        responder.respond(active_session.response())?;
475        active_session.proxy_remaining_messages()?;
476        Ok(session_id)
477    }
478}
479
480/// Active session struct that lets you send prompts and receive updates.
481///
482/// The `'responder` lifetime represents the span during which responders
483/// (e.g., MCP server handlers) are active. When created via [`SessionBuilder::start_session`],
484/// this is `'static` because responders are spawned into background tasks.
485/// When created via [`SessionBuilder::run_until`], this is tied to the
486/// closure scope, preventing [`Self::proxy_remaining_messages`] from being called
487/// (since the responders would die when the closure returns).
488#[derive(Debug)]
489pub struct ActiveSession<'responder, Link>
490where
491    Link: HasPeer<Agent>,
492{
493    session_id: SessionId,
494    update_rx: mpsc::UnboundedReceiver<SessionMessage>,
495    update_tx: mpsc::UnboundedSender<SessionMessage>,
496    modes: Option<SessionModeState>,
497    meta: Option<serde_json::Map<String, serde_json::Value>>,
498    connection: ConnectionTo<Link>,
499
500    /// Registration for the handler that routes session messages to `update_rx`.
501    /// This is separate from MCP handlers so it can be dropped independently
502    /// when switching to proxy mode.
503    session_handler_registration: DynamicHandlerRegistration<Link>,
504
505    /// Registrations for MCP server handlers.
506    /// These will be dropped once the active-session struct is dropped
507    /// which will cause them to be deregistered.
508    mcp_handler_registrations: Vec<DynamicHandlerRegistration<Link>>,
509
510    /// Phantom lifetime representing the responder lifetime.
511    _responder: PhantomData<&'responder ()>,
512}
513
514/// Incoming message from the agent
515#[non_exhaustive]
516#[derive(Debug)]
517#[cfg_attr(
518    feature = "unstable_cancel_request",
519    allow(
520        clippy::large_enum_variant,
521        reason = "Dispatch messages vastly outnumber StopReason; boxing would add a heap allocation"
522    )
523)]
524pub enum SessionMessage {
525    /// Periodic updates with new content, tool requests, etc.
526    /// Use [`MatchDispatch`] to match on the message type.
527    SessionMessage(Dispatch),
528
529    /// When a prompt completes, the stop reason.
530    StopReason(StopReason),
531}
532
533impl<Link> ActiveSession<'_, Link>
534where
535    Link: HasPeer<Agent>,
536{
537    /// Access the session ID.
538    pub fn session_id(&self) -> &SessionId {
539        &self.session_id
540    }
541
542    /// Access modes available in this session.
543    pub fn modes(&self) -> &Option<SessionModeState> {
544        &self.modes
545    }
546
547    /// Access meta data from session response.
548    pub fn meta(&self) -> &Option<serde_json::Map<String, serde_json::Value>> {
549        &self.meta
550    }
551
552    /// Build a `NewSessionResponse` from the session information.
553    ///
554    /// Useful when you need to forward the session response to a client
555    /// after doing some processing.
556    pub fn response(&self) -> NewSessionResponse {
557        NewSessionResponse::new(self.session_id.clone())
558            .modes(self.modes.clone())
559            .meta(self.meta.clone())
560    }
561
562    /// Access the underlying connection context used to communicate with the agent.
563    pub fn connection(&self) -> ConnectionTo<Link> {
564        self.connection.clone()
565    }
566
567    /// Send a prompt to the agent. You can then read messages sent in response.
568    pub fn send_prompt(&mut self, prompt: impl ToString) -> Result<(), crate::Error> {
569        let update_tx = self.update_tx.clone();
570        self.connection
571            .send_request_to(
572                Agent,
573                PromptRequest::new(self.session_id.clone(), vec![prompt.to_string().into()]),
574            )
575            .on_receiving_result(async move |result| {
576                let PromptResponse { stop_reason, .. } = result?;
577
578                update_tx
579                    .unbounded_send(SessionMessage::StopReason(stop_reason))
580                    .map_err(crate::util::internal_error)?;
581
582                Ok(())
583            })
584    }
585
586    /// Read an update from the agent in response to the prompt.
587    pub async fn read_update(&mut self) -> Result<SessionMessage, crate::Error> {
588        use futures::StreamExt;
589        let message =
590            self.update_rx.next().await.ok_or_else(|| {
591                crate::util::internal_error("session channel closed unexpectedly")
592            })?;
593
594        Ok(message)
595    }
596
597    /// Read all updates until the end of the turn and create a string.
598    /// Ignores non-text updates.
599    pub async fn read_to_string(&mut self) -> Result<String, crate::Error> {
600        let mut output = String::new();
601        loop {
602            let update = self.read_update().await?;
603            tracing::trace!(?update, "read_to_string update");
604            match update {
605                SessionMessage::SessionMessage(dispatch) => MatchDispatch::new(dispatch)
606                    .if_notification(async |notif: SessionNotification| match notif.update {
607                        SessionUpdate::AgentMessageChunk(ContentChunk {
608                            content: ContentBlock::Text(text),
609                            ..
610                        }) => {
611                            output.push_str(&text.text);
612                            Ok(())
613                        }
614                        _ => Ok(()),
615                    })
616                    .await
617                    .otherwise_ignore()?,
618                SessionMessage::StopReason(_stop_reason) => break,
619            }
620        }
621        Ok(output)
622    }
623}
624
625impl<Link> ActiveSession<'static, Link>
626where
627    Link: HasPeer<Agent>,
628{
629    /// Proxy all remaining messages for this session between client and agent.
630    ///
631    /// Use this when you want to inject MCP servers into a session but don't need
632    /// to actively interact with it after setup. The session messages will be proxied
633    /// between client and agent automatically.
634    ///
635    /// This consumes the `ActiveSession` since you're giving up active control.
636    ///
637    /// This method is only available on `ActiveSession<'static, _>` (from
638    /// [`SessionBuilder::start_session`]) because it requires responders to
639    /// outlive the method call.
640    ///
641    /// # Message Ordering Guarantees
642    ///
643    /// This method ensures proper handoff from active session mode to proxy mode
644    /// without losing or reordering messages:
645    ///
646    /// 1. **Stop the session handler** - Drop the registration that routes messages
647    ///    to `update_rx`. After this, no new messages will be queued.
648    /// 2. **Close the channel** - Drop `update_tx` so we can detect when the channel
649    ///    is fully drained.
650    /// 3. **Drain queued messages** - Forward any messages that were already queued
651    ///    in `update_rx` to the client, preserving order.
652    /// 4. **Install proxy handler** - Now that all queued messages are forwarded,
653    ///    install the proxy handler to handle future messages.
654    ///
655    /// This sequence prevents the race condition where messages could be delivered
656    /// out of order or lost during the transition.
657    pub fn proxy_remaining_messages(self) -> Result<(), crate::Error>
658    where
659        Link: HasPeer<Client>,
660    {
661        // Destructure self to get ownership of all fields
662        let ActiveSession {
663            session_id,
664            mut update_rx,
665            update_tx,
666            connection,
667            session_handler_registration,
668            mcp_handler_registrations,
669            // These fields are not needed for proxying
670            modes: _,
671            meta: _,
672            _responder,
673        } = self;
674
675        // Step 1: Drop the session handler registration.
676        // This unregisters the handler that was routing messages to update_rx.
677        // After this point, no new messages will be added to the channel.
678        drop(session_handler_registration);
679
680        // Step 2: Drop the sender side of the channel.
681        // This allows us to detect when the channel is fully drained
682        // (recv will return None when empty and sender is dropped).
683        drop(update_tx);
684
685        // Step 3: Drain any messages that were already queued and forward to client.
686        // These messages arrived before we dropped the handler but haven't been
687        // consumed yet. We must forward them to maintain message ordering.
688        while let Ok(message) = update_rx.try_recv() {
689            match message {
690                SessionMessage::SessionMessage(dispatch) => {
691                    // Forward the message to the client
692                    connection.send_proxied_message_to(Client, dispatch)?;
693                }
694                SessionMessage::StopReason(_) => {
695                    // StopReason is internal bookkeeping, not forwarded
696                }
697            }
698        }
699
700        // Step 4: Install the proxy handler for future messages.
701        // Now that all queued messages have been forwarded, the proxy handler
702        // can take over. Any new messages will go directly through the proxy.
703        connection
704            .add_dynamic_handler(ProxySessionMessages::new(session_id))?
705            .run_indefinitely();
706
707        // Keep MCP server handlers alive for the lifetime of the proxy
708        for registration in mcp_handler_registrations {
709            registration.run_indefinitely();
710        }
711
712        Ok(())
713    }
714}
715
716struct ActiveSessionHandler {
717    session_id: SessionId,
718    update_tx: mpsc::UnboundedSender<SessionMessage>,
719}
720
721impl ActiveSessionHandler {
722    pub fn new(session_id: SessionId, update_tx: mpsc::UnboundedSender<SessionMessage>) -> Self {
723        Self {
724            session_id,
725            update_tx,
726        }
727    }
728}
729
730impl<Counterpart: Role> HandleDispatchFrom<Counterpart> for ActiveSessionHandler
731where
732    Counterpart: HasPeer<Agent>,
733{
734    async fn handle_dispatch_from(
735        &mut self,
736        message: Dispatch,
737        cx: ConnectionTo<Counterpart>,
738    ) -> Result<Handled<Dispatch>, crate::Error> {
739        // If this is a message for our session, grab it.
740        tracing::trace!(
741            ?message,
742            handler_session_id = ?self.session_id,
743            "ActiveSessionHandler::handle_dispatch"
744        );
745        MatchDispatchFrom::new(message, &cx)
746            .if_message_from(Agent, async |message| {
747                if let Some(session_id) = message.get_session_id()? {
748                    tracing::trace!(
749                        message_session_id = ?session_id,
750                        handler_session_id = ?self.session_id,
751                        "ActiveSessionHandler::handle_dispatch"
752                    );
753                    if session_id == self.session_id {
754                        self.update_tx
755                            .unbounded_send(SessionMessage::SessionMessage(message))
756                            .map_err(crate::util::internal_error)?;
757                        return Ok(Handled::Yes);
758                    }
759                }
760
761                // Otherwise, pass it through.
762                Ok(Handled::No {
763                    message,
764                    retry: false,
765                })
766            })
767            .await
768            .done()
769    }
770
771    fn describe_chain(&self) -> impl std::fmt::Debug {
772        format!("ActiveSessionHandler({})", self.session_id)
773    }
774}