agent_client_protocol/session.rs
1use std::{future::Future, marker::PhantomData, path::Path};
2
3use agent_client_protocol_schema::{
4 ContentBlock, ContentChunk, NewSessionRequest, NewSessionResponse, PromptRequest,
5 PromptResponse, SessionModeState, SessionNotification, SessionUpdate, StopReason,
6};
7use futures::channel::mpsc;
8use tokio::sync::oneshot;
9
10use crate::{
11 Agent, Client, ConnectionTo, Dispatch, HandleDispatchFrom, Handled, Responder, Role,
12 jsonrpc::{
13 DynamicHandlerRegistration,
14 run::{ChainRun, NullRun, RunWithConnectionTo},
15 },
16 mcp_server::McpServer,
17 role::{HasPeer, acp::ProxySessionMessages},
18 schema::SessionId,
19 util::{MatchDispatch, MatchDispatchFrom, run_until},
20};
21
22/// Marker type indicating the session builder will block the current task.
23#[derive(Debug)]
24pub struct Blocking;
25impl SessionBlockState for Blocking {}
26
27/// Marker type indicating the session builder will not block the current task.
28#[derive(Debug)]
29pub struct NonBlocking;
30impl SessionBlockState for NonBlocking {}
31
32/// Trait for marker types that indicate blocking vs blocking API.
33/// See [`SessionBuilder::block_task`].
34pub trait SessionBlockState: Send + 'static + Sync + std::fmt::Debug {}
35
36impl<Counterpart: Role> ConnectionTo<Counterpart>
37where
38 Counterpart: HasPeer<Agent>,
39{
40 /// Session builder for a new session request.
41 pub fn build_session(&self, cwd: impl AsRef<Path>) -> SessionBuilder<Counterpart, NullRun> {
42 SessionBuilder::new(self, NewSessionRequest::new(cwd.as_ref()))
43 }
44
45 /// Session builder using the current working directory.
46 ///
47 /// This is a convenience wrapper around [`build_session`](Self::build_session)
48 /// that uses [`std::env::current_dir`] to get the working directory.
49 ///
50 /// Returns an error if the current directory cannot be determined.
51 pub fn build_session_cwd(&self) -> Result<SessionBuilder<Counterpart, NullRun>, crate::Error> {
52 let cwd = std::env::current_dir().map_err(|e| {
53 crate::Error::internal_error().data(format!("cannot get current directory: {e}"))
54 })?;
55 Ok(self.build_session(cwd))
56 }
57
58 /// Session builder starting from an existing request.
59 ///
60 /// Use this when you've intercepted a `session.new` request and want to
61 /// modify it (e.g., inject MCP servers) before forwarding.
62 pub fn build_session_from(
63 &self,
64 request: NewSessionRequest,
65 ) -> SessionBuilder<Counterpart, NullRun> {
66 SessionBuilder::new(self, request)
67 }
68
69 /// Given a session response received from the agent,
70 /// attach a handler to process messages related to this session
71 /// and let you access them.
72 ///
73 /// Normally you would not use this method directly but would
74 /// instead use [`Self::build_session`] and then [`SessionBuilder::start_session`].
75 ///
76 /// The vector `dynamic_handler_registrations` contains any dynamic
77 /// handle registrations associated with this session (e.g., from MCP servers).
78 /// You can simply pass `Default::default()` if not applicable.
79 pub fn attach_session<'responder>(
80 &self,
81 response: NewSessionResponse,
82 mcp_handler_registrations: Vec<DynamicHandlerRegistration<Counterpart>>,
83 ) -> Result<ActiveSession<'responder, Counterpart>, crate::Error> {
84 let NewSessionResponse {
85 session_id,
86 modes,
87 meta,
88 ..
89 } = response;
90
91 let (update_tx, update_rx) = mpsc::unbounded();
92 let handler = ActiveSessionHandler::new(session_id.clone(), update_tx.clone());
93 let session_handler_registration = self.add_dynamic_handler(handler)?;
94
95 Ok(ActiveSession {
96 session_id,
97 modes,
98 meta,
99 update_rx,
100 update_tx,
101 connection: self.clone(),
102 session_handler_registration,
103 mcp_handler_registrations,
104 _responder: PhantomData,
105 })
106 }
107}
108
109/// Session builder for a new session request.
110/// Allows you to add MCP servers or set other details for this session.
111///
112/// The `BlockState` type parameter tracks whether blocking methods are available:
113/// - `NonBlocking` (default): Only [`on_session_start`](Self::on_session_start) is available
114/// - `Blocking` (after calling [`block_task`](Self::block_task)):
115/// [`run_until`](Self::run_until) and [`start_session`](Self::start_session) become available
116#[must_use = "use `start_session`, `run_until`, or `on_session_start` to start the session"]
117#[derive(Debug)]
118pub struct SessionBuilder<
119 Counterpart,
120 Run: RunWithConnectionTo<Counterpart> = NullRun,
121 BlockState: SessionBlockState = NonBlocking,
122> where
123 Counterpart: HasPeer<Agent>,
124{
125 connection: ConnectionTo<Counterpart>,
126 request: NewSessionRequest,
127 dynamic_handler_registrations: Vec<DynamicHandlerRegistration<Counterpart>>,
128 run: Run,
129 block_state: PhantomData<BlockState>,
130}
131
132impl<Counterpart> SessionBuilder<Counterpart, NullRun, NonBlocking>
133where
134 Counterpart: HasPeer<Agent>,
135{
136 fn new(connection: &ConnectionTo<Counterpart>, request: NewSessionRequest) -> Self {
137 SessionBuilder {
138 connection: connection.clone(),
139 request,
140 dynamic_handler_registrations: Vec::default(),
141 run: NullRun,
142 block_state: PhantomData,
143 }
144 }
145}
146
147impl<Counterpart, R, BlockState> SessionBuilder<Counterpart, R, BlockState>
148where
149 Counterpart: HasPeer<Agent>,
150 R: RunWithConnectionTo<Counterpart>,
151 BlockState: SessionBlockState,
152{
153 /// Add the MCP servers from the given registry to this session.
154 pub fn with_mcp_server<McpRun>(
155 mut self,
156 mcp_server: McpServer<Counterpart, McpRun>,
157 ) -> Result<SessionBuilder<Counterpart, ChainRun<R, McpRun>, BlockState>, crate::Error>
158 where
159 McpRun: RunWithConnectionTo<Counterpart>,
160 {
161 let (handler, mcp_run) = mcp_server.into_handler_and_responder();
162 self.dynamic_handler_registrations
163 .push(handler.into_dynamic_handler(&mut self.request, &self.connection)?);
164 Ok(SessionBuilder {
165 connection: self.connection,
166 request: self.request,
167 dynamic_handler_registrations: self.dynamic_handler_registrations,
168 run: ChainRun::new(self.run, mcp_run),
169 block_state: self.block_state,
170 })
171 }
172
173 /// Spawn a task that runs the provided closure once the session starts.
174 ///
175 /// Unlike [`start_session`](Self::start_session), this method returns immediately
176 /// without blocking the current task. The session handshake and closure execution
177 /// happen in a spawned background task.
178 ///
179 /// The closure receives an `ActiveSession<'static, _>` and should return
180 /// `Result<(), Error>`. If the closure returns an error, it will propagate
181 /// to the connection's error handling.
182 ///
183 /// # Example
184 ///
185 /// ```
186 /// # use agent_client_protocol::{Client, Agent, ConnectTo};
187 /// # use agent_client_protocol::mcp_server::McpServer;
188 /// # async fn example(transport: impl ConnectTo<Client>) -> Result<(), agent_client_protocol::Error> {
189 /// # Client.builder().connect_with(transport, async |cx| {
190 /// # let mcp = McpServer::<Agent, _>::builder("tools").build();
191 /// cx.build_session_cwd()?
192 /// .with_mcp_server(mcp)?
193 /// .on_session_start(async |mut session| {
194 /// // Do something with the session
195 /// session.send_prompt("Hello")?;
196 /// let response = session.read_to_string().await?;
197 /// Ok(())
198 /// })?;
199 /// // Returns immediately, session runs in background
200 /// # Ok(())
201 /// # }).await?;
202 /// # Ok(())
203 /// # }
204 /// ```
205 ///
206 /// # Ordering
207 ///
208 /// This callback blocks the dispatch loop until the session starts and your
209 /// callback completes. See the [`ordering`](crate::concepts::ordering) module for details.
210 pub fn on_session_start<F, Fut>(self, op: F) -> Result<(), crate::Error>
211 where
212 R: 'static,
213 F: FnOnce(ActiveSession<'static, Counterpart>) -> Fut + Send + 'static,
214 Fut: Future<Output = Result<(), crate::Error>> + Send,
215 {
216 let Self {
217 connection,
218 request,
219 dynamic_handler_registrations,
220 run,
221 block_state: _,
222 } = self;
223
224 connection
225 .send_request_to(Agent, request)
226 .on_receiving_result({
227 let connection = connection.clone();
228 async move |result| {
229 let response = result?;
230
231 connection.spawn(run.run_with_connection_to(connection.clone()))?;
232
233 let active_session =
234 connection.attach_session(response, dynamic_handler_registrations)?;
235
236 op(active_session).await
237 }
238 })
239 }
240
241 /// Spawn a proxy session and run a closure with the session ID.
242 ///
243 /// A **proxy session** starts the session with the agent and then automatically
244 /// proxies all session updates (prompts, tool calls, etc.) from the agent back
245 /// to the client. You don't need to handle any messages yourself - the proxy
246 /// takes care of forwarding everything. This is useful when you want to inject
247 /// and/or filter prompts coming from the client but otherwise not be involved
248 /// in the session.
249 ///
250 /// Unlike [`start_session_proxy`](Self::start_session_proxy), this method returns
251 /// immediately without blocking the current task. The session handshake, client
252 /// response, and proxy setup all happen in a spawned background task.
253 ///
254 /// The closure receives the `SessionId` once the session is established, allowing
255 /// you to perform any custom work with that ID (e.g., tracking, logging).
256 ///
257 /// # Example
258 ///
259 /// ```
260 /// # use agent_client_protocol::{Proxy, Client, Conductor, ConnectTo};
261 /// # use agent_client_protocol::schema::NewSessionRequest;
262 /// # use agent_client_protocol::mcp_server::McpServer;
263 /// # async fn example(transport: impl ConnectTo<Proxy>) -> Result<(), agent_client_protocol::Error> {
264 /// Proxy.builder()
265 /// .on_receive_request_from(Client, async |request: NewSessionRequest, responder, cx| {
266 /// let mcp = McpServer::<Conductor, _>::builder("tools").build();
267 /// cx.build_session_from(request)
268 /// .with_mcp_server(mcp)?
269 /// .on_proxy_session_start(responder, async |session_id| {
270 /// // Session started
271 /// Ok(())
272 /// })
273 /// }, agent_client_protocol::on_receive_request!())
274 /// .connect_to(transport)
275 /// .await?;
276 /// # Ok(())
277 /// # }
278 /// ```
279 ///
280 /// # Ordering
281 ///
282 /// This callback blocks the dispatch loop until the session starts and your
283 /// callback completes. See the [`ordering`](crate::concepts::ordering) module for details.
284 pub fn on_proxy_session_start<F, Fut>(
285 self,
286 responder: Responder<NewSessionResponse>,
287 op: F,
288 ) -> Result<(), crate::Error>
289 where
290 F: FnOnce(SessionId) -> Fut + Send + 'static,
291 Fut: Future<Output = Result<(), crate::Error>> + Send,
292 Counterpart: HasPeer<Client>,
293 R: 'static,
294 {
295 let Self {
296 connection,
297 request,
298 dynamic_handler_registrations,
299 run,
300 block_state: _,
301 } = self;
302
303 // Spawn off the run and dynamic handlers to run indefinitely
304 connection.spawn(run.run_with_connection_to(connection.clone()))?;
305 dynamic_handler_registrations
306 .into_iter()
307 .for_each(super::jsonrpc::DynamicHandlerRegistration::run_indefinitely);
308
309 // Send the "new session" request to the agent
310 connection
311 .send_request_to(Agent, request)
312 .on_receiving_result({
313 let connection = connection.clone();
314 async move |result| {
315 let response = result?;
316
317 // Extract the session-id from the response and forward
318 // the response back to the client
319 let session_id = response.session_id.clone();
320 responder.respond(response)?;
321
322 // Install a dynamic handler to proxy messages from this session
323 connection
324 .add_dynamic_handler(ProxySessionMessages::new(session_id.clone()))?
325 .run_indefinitely();
326
327 op(session_id).await
328 }
329 })
330 }
331}
332
333impl<Counterpart, R> SessionBuilder<Counterpart, R, NonBlocking>
334where
335 Counterpart: HasPeer<Agent>,
336 R: RunWithConnectionTo<Counterpart>,
337{
338 /// Mark this session builder as being able to block the current task.
339 ///
340 /// After calling this, you can use [`run_until`](Self::run_until) or
341 /// [`start_session`](Self::start_session) which block the current task.
342 ///
343 /// This should not be used from inside a message handler like
344 /// [`Builder::on_receive_request`](`crate::Builder::on_receive_request`) or [`HandleDispatchFrom`]
345 /// implementations.
346 pub fn block_task(self) -> SessionBuilder<Counterpart, R, Blocking> {
347 SessionBuilder {
348 connection: self.connection,
349 request: self.request,
350 dynamic_handler_registrations: self.dynamic_handler_registrations,
351 run: self.run,
352 block_state: PhantomData,
353 }
354 }
355}
356
357impl<Counterpart, R> SessionBuilder<Counterpart, R, Blocking>
358where
359 Counterpart: HasPeer<Agent>,
360 R: RunWithConnectionTo<Counterpart>,
361{
362 /// Run this session synchronously. The current task will be blocked
363 /// and `op` will be executed with the active session information.
364 /// This is useful when you have MCP servers that are borrowed from your local
365 /// stack frame.
366 ///
367 /// The `ActiveSession` passed to `op` has a non-`'static` lifetime, which
368 /// prevents calling [`ActiveSession::proxy_remaining_messages`] (since the
369 /// responders would terminate when `op` returns).
370 ///
371 /// Requires calling [`block_task`](Self::block_task) first.
372 pub async fn run_until<T>(
373 self,
374 op: impl for<'responder> AsyncFnOnce(
375 ActiveSession<'responder, Counterpart>,
376 ) -> Result<T, crate::Error>,
377 ) -> Result<T, crate::Error> {
378 let Self {
379 connection,
380 request,
381 dynamic_handler_registrations,
382 run,
383 block_state: _,
384 } = self;
385
386 let response = connection
387 .send_request_to(Agent, request)
388 .block_task()
389 .await?;
390
391 let active_session = connection.attach_session(response, dynamic_handler_registrations)?;
392
393 run_until(
394 run.run_with_connection_to(connection.clone()),
395 op(active_session),
396 )
397 .await
398 }
399
400 /// Send the request to create the session and return a handle.
401 /// This is an alternative to [`Self::run_until`] that avoids rightward
402 /// drift but at the cost of requiring MCP servers that are `Send` and
403 /// don't access data from the surrounding scope.
404 ///
405 /// Returns an `ActiveSession<'static, _>` because responders are spawned
406 /// into background tasks that live for the connection lifetime.
407 ///
408 /// Requires calling [`block_task`](Self::block_task) first.
409 pub async fn start_session(self) -> Result<ActiveSession<'static, Counterpart>, crate::Error>
410 where
411 R: 'static,
412 {
413 let Self {
414 connection,
415 request,
416 dynamic_handler_registrations,
417 run,
418 block_state: _,
419 } = self;
420
421 let (active_session_tx, active_session_rx) = oneshot::channel();
422
423 connection.clone().spawn(async move {
424 let response = connection
425 .send_request_to(Agent, request)
426 .block_task()
427 .await?;
428
429 connection.spawn(run.run_with_connection_to(connection.clone()))?;
430
431 let active_session =
432 connection.attach_session(response, dynamic_handler_registrations)?;
433
434 active_session_tx
435 .send(active_session)
436 .map_err(|_| crate::Error::internal_error())?;
437
438 Ok(())
439 })?;
440
441 active_session_rx
442 .await
443 .map_err(|_| crate::Error::internal_error())
444 }
445
446 /// Start a proxy session that forwards all messages between client and agent.
447 ///
448 /// A **proxy session** starts the session with the agent and then automatically
449 /// proxies all session updates (prompts, tool calls, etc.) from the agent back
450 /// to the client. You don't need to handle any messages yourself - the proxy
451 /// takes care of forwarding everything. This is useful when you want to inject
452 /// and/or filter prompts coming from the client but otherwise not be involved
453 /// in the session.
454 ///
455 /// This is a convenience method that combines [`start_session`](Self::start_session),
456 /// responding to the client, and [`ActiveSession::proxy_remaining_messages`].
457 ///
458 /// For more control (e.g., to send some messages before proxying), use
459 /// [`start_session`](Self::start_session) instead and call
460 /// [`proxy_remaining_messages`](ActiveSession::proxy_remaining_messages) manually.
461 ///
462 /// Requires calling [`block_task`](Self::block_task) first.
463 pub async fn start_session_proxy(
464 self,
465 responder: Responder<NewSessionResponse>,
466 ) -> Result<SessionId, crate::Error>
467 where
468 Counterpart: HasPeer<Client>,
469 R: 'static,
470 {
471 let active_session = self.start_session().await?;
472 let session_id = active_session.session_id().clone();
473 responder.respond(active_session.response())?;
474 active_session.proxy_remaining_messages()?;
475 Ok(session_id)
476 }
477}
478
479/// Active session struct that lets you send prompts and receive updates.
480///
481/// The `'responder` lifetime represents the span during which responders
482/// (e.g., MCP server handlers) are active. When created via [`SessionBuilder::start_session`],
483/// this is `'static` because responders are spawned into background tasks.
484/// When created via [`SessionBuilder::run_until`], this is tied to the
485/// closure scope, preventing [`Self::proxy_remaining_messages`] from being called
486/// (since the responders would die when the closure returns).
487#[derive(Debug)]
488pub struct ActiveSession<'responder, Link>
489where
490 Link: HasPeer<Agent>,
491{
492 session_id: SessionId,
493 update_rx: mpsc::UnboundedReceiver<SessionMessage>,
494 update_tx: mpsc::UnboundedSender<SessionMessage>,
495 modes: Option<SessionModeState>,
496 meta: Option<serde_json::Map<String, serde_json::Value>>,
497 connection: ConnectionTo<Link>,
498
499 /// Registration for the handler that routes session messages to `update_rx`.
500 /// This is separate from MCP handlers so it can be dropped independently
501 /// when switching to proxy mode.
502 session_handler_registration: DynamicHandlerRegistration<Link>,
503
504 /// Registrations for MCP server handlers.
505 /// These will be dropped once the active-session struct is dropped
506 /// which will cause them to be deregistered.
507 mcp_handler_registrations: Vec<DynamicHandlerRegistration<Link>>,
508
509 /// Phantom lifetime representing the responder lifetime.
510 _responder: PhantomData<&'responder ()>,
511}
512
513/// Incoming message from the agent
514#[non_exhaustive]
515#[derive(Debug)]
516pub enum SessionMessage {
517 /// Periodic updates with new content, tool requests, etc.
518 /// Use [`MatchDispatch`] to match on the message type.
519 SessionMessage(Dispatch),
520
521 /// When a prompt completes, the stop reason.
522 StopReason(StopReason),
523}
524
525impl<Link> ActiveSession<'_, Link>
526where
527 Link: HasPeer<Agent>,
528{
529 /// Access the session ID.
530 pub fn session_id(&self) -> &SessionId {
531 &self.session_id
532 }
533
534 /// Access modes available in this session.
535 pub fn modes(&self) -> &Option<SessionModeState> {
536 &self.modes
537 }
538
539 /// Access meta data from session response.
540 pub fn meta(&self) -> &Option<serde_json::Map<String, serde_json::Value>> {
541 &self.meta
542 }
543
544 /// Build a `NewSessionResponse` from the session information.
545 ///
546 /// Useful when you need to forward the session response to a client
547 /// after doing some processing.
548 pub fn response(&self) -> NewSessionResponse {
549 NewSessionResponse::new(self.session_id.clone())
550 .modes(self.modes.clone())
551 .meta(self.meta.clone())
552 }
553
554 /// Access the underlying connection context used to communicate with the agent.
555 pub fn connection(&self) -> ConnectionTo<Link> {
556 self.connection.clone()
557 }
558
559 /// Send a prompt to the agent. You can then read messages sent in response.
560 pub fn send_prompt(&mut self, prompt: impl ToString) -> Result<(), crate::Error> {
561 let update_tx = self.update_tx.clone();
562 self.connection
563 .send_request_to(
564 Agent,
565 PromptRequest::new(self.session_id.clone(), vec![prompt.to_string().into()]),
566 )
567 .on_receiving_result(async move |result| {
568 let PromptResponse {
569 stop_reason,
570 meta: _,
571 ..
572 } = result?;
573
574 update_tx
575 .unbounded_send(SessionMessage::StopReason(stop_reason))
576 .map_err(crate::util::internal_error)?;
577
578 Ok(())
579 })
580 }
581
582 /// Read an update from the agent in response to the prompt.
583 pub async fn read_update(&mut self) -> Result<SessionMessage, crate::Error> {
584 use futures::StreamExt;
585 let message =
586 self.update_rx.next().await.ok_or_else(|| {
587 crate::util::internal_error("session channel closed unexpectedly")
588 })?;
589
590 Ok(message)
591 }
592
593 /// Read all updates until the end of the turn and create a string.
594 /// Ignores non-text updates.
595 pub async fn read_to_string(&mut self) -> Result<String, crate::Error> {
596 let mut output = String::new();
597 loop {
598 let update = self.read_update().await?;
599 tracing::trace!(?update, "read_to_string update");
600 match update {
601 SessionMessage::SessionMessage(dispatch) => MatchDispatch::new(dispatch)
602 .if_notification(async |notif: SessionNotification| match notif.update {
603 SessionUpdate::AgentMessageChunk(ContentChunk {
604 content: ContentBlock::Text(text),
605 meta: _,
606 ..
607 }) => {
608 output.push_str(&text.text);
609 Ok(())
610 }
611 _ => Ok(()),
612 })
613 .await
614 .otherwise_ignore()?,
615 SessionMessage::StopReason(_stop_reason) => break,
616 }
617 }
618 Ok(output)
619 }
620}
621
622impl<Link> ActiveSession<'static, Link>
623where
624 Link: HasPeer<Agent>,
625{
626 /// Proxy all remaining messages for this session between client and agent.
627 ///
628 /// Use this when you want to inject MCP servers into a session but don't need
629 /// to actively interact with it after setup. The session messages will be proxied
630 /// between client and agent automatically.
631 ///
632 /// This consumes the `ActiveSession` since you're giving up active control.
633 ///
634 /// This method is only available on `ActiveSession<'static, _>` (from
635 /// [`SessionBuilder::start_session`]) because it requires responders to
636 /// outlive the method call.
637 ///
638 /// # Message Ordering Guarantees
639 ///
640 /// This method ensures proper handoff from active session mode to proxy mode
641 /// without losing or reordering messages:
642 ///
643 /// 1. **Stop the session handler** - Drop the registration that routes messages
644 /// to `update_rx`. After this, no new messages will be queued.
645 /// 2. **Close the channel** - Drop `update_tx` so we can detect when the channel
646 /// is fully drained.
647 /// 3. **Drain queued messages** - Forward any messages that were already queued
648 /// in `update_rx` to the client, preserving order.
649 /// 4. **Install proxy handler** - Now that all queued messages are forwarded,
650 /// install the proxy handler to handle future messages.
651 ///
652 /// This sequence prevents the race condition where messages could be delivered
653 /// out of order or lost during the transition.
654 pub fn proxy_remaining_messages(self) -> Result<(), crate::Error>
655 where
656 Link: HasPeer<Client>,
657 {
658 // Destructure self to get ownership of all fields
659 let ActiveSession {
660 session_id,
661 mut update_rx,
662 update_tx,
663 connection,
664 session_handler_registration,
665 mcp_handler_registrations,
666 // These fields are not needed for proxying
667 modes: _,
668 meta: _,
669 _responder,
670 } = self;
671
672 // Step 1: Drop the session handler registration.
673 // This unregisters the handler that was routing messages to update_rx.
674 // After this point, no new messages will be added to the channel.
675 drop(session_handler_registration);
676
677 // Step 2: Drop the sender side of the channel.
678 // This allows us to detect when the channel is fully drained
679 // (recv will return None when empty and sender is dropped).
680 drop(update_tx);
681
682 // Step 3: Drain any messages that were already queued and forward to client.
683 // These messages arrived before we dropped the handler but haven't been
684 // consumed yet. We must forward them to maintain message ordering.
685 while let Ok(message) = update_rx.try_recv() {
686 match message {
687 SessionMessage::SessionMessage(dispatch) => {
688 // Forward the message to the client
689 connection.send_proxied_message_to(Client, dispatch)?;
690 }
691 SessionMessage::StopReason(_) => {
692 // StopReason is internal bookkeeping, not forwarded
693 }
694 }
695 }
696
697 // Step 4: Install the proxy handler for future messages.
698 // Now that all queued messages have been forwarded, the proxy handler
699 // can take over. Any new messages will go directly through the proxy.
700 connection
701 .add_dynamic_handler(ProxySessionMessages::new(session_id))?
702 .run_indefinitely();
703
704 // Keep MCP server handlers alive for the lifetime of the proxy
705 for registration in mcp_handler_registrations {
706 registration.run_indefinitely();
707 }
708
709 Ok(())
710 }
711}
712
713struct ActiveSessionHandler {
714 session_id: SessionId,
715 update_tx: mpsc::UnboundedSender<SessionMessage>,
716}
717
718impl ActiveSessionHandler {
719 pub fn new(session_id: SessionId, update_tx: mpsc::UnboundedSender<SessionMessage>) -> Self {
720 Self {
721 session_id,
722 update_tx,
723 }
724 }
725}
726
727impl<Counterpart: Role> HandleDispatchFrom<Counterpart> for ActiveSessionHandler
728where
729 Counterpart: HasPeer<Agent>,
730{
731 async fn handle_dispatch_from(
732 &mut self,
733 message: Dispatch,
734 cx: ConnectionTo<Counterpart>,
735 ) -> Result<Handled<Dispatch>, crate::Error> {
736 // If this is a message for our session, grab it.
737 tracing::trace!(
738 ?message,
739 handler_session_id = ?self.session_id,
740 "ActiveSessionHandler::handle_dispatch"
741 );
742 MatchDispatchFrom::new(message, &cx)
743 .if_message_from(Agent, async |message| {
744 if let Some(session_id) = message.get_session_id()? {
745 tracing::trace!(
746 message_session_id = ?session_id,
747 handler_session_id = ?self.session_id,
748 "ActiveSessionHandler::handle_dispatch"
749 );
750 if session_id == self.session_id {
751 self.update_tx
752 .unbounded_send(SessionMessage::SessionMessage(message))
753 .map_err(crate::util::internal_error)?;
754 return Ok(Handled::Yes);
755 }
756 }
757
758 // Otherwise, pass it through.
759 Ok(Handled::No {
760 message,
761 retry: false,
762 })
763 })
764 .await
765 .done()
766 }
767
768 fn describe_chain(&self) -> impl std::fmt::Debug {
769 format!("ActiveSessionHandler({})", self.session_id)
770 }
771}