agent_client_protocol/session.rs
1use std::{future::Future, marker::PhantomData, path::Path};
2
3use futures::channel::{mpsc, oneshot};
4
5use crate::{
6 Agent, Client, ConnectionTo, Dispatch, HandleDispatchFrom, Handled, Responder, Role,
7 jsonrpc::{
8 DynamicHandlerRegistration,
9 run::{ChainRun, NullRun, RunWithConnectionTo},
10 },
11 mcp_server::McpServer,
12 role::{HasPeer, acp::ProxySessionMessages},
13 schema::v1::{
14 ContentBlock, ContentChunk, NewSessionRequest, NewSessionResponse, PromptRequest,
15 PromptResponse, SessionId, SessionModeState, SessionNotification, SessionUpdate,
16 StopReason,
17 },
18 util::{MatchDispatch, MatchDispatchFrom, run_until},
19};
20
21/// Marker type indicating the session builder will block the current task.
22#[derive(Debug)]
23pub struct Blocking;
24impl SessionBlockState for Blocking {}
25
26/// Marker type indicating the session builder will not block the current task.
27#[derive(Debug)]
28pub struct NonBlocking;
29impl SessionBlockState for NonBlocking {}
30
31/// Trait for marker types that indicate blocking vs blocking API.
32/// See [`SessionBuilder::block_task`].
33pub trait SessionBlockState: Send + 'static + Sync + std::fmt::Debug {}
34
35impl<Counterpart: Role> ConnectionTo<Counterpart>
36where
37 Counterpart: HasPeer<Agent>,
38{
39 /// Session builder for a new session request.
40 pub fn build_session(&self, cwd: impl AsRef<Path>) -> SessionBuilder<Counterpart, NullRun> {
41 SessionBuilder::new(self, NewSessionRequest::new(cwd.as_ref()))
42 }
43
44 /// Session builder using the current working directory.
45 ///
46 /// This is a convenience wrapper around [`build_session`](Self::build_session)
47 /// that uses [`std::env::current_dir`] to get the working directory.
48 ///
49 /// Returns an error if the current directory cannot be determined.
50 pub fn build_session_cwd(&self) -> Result<SessionBuilder<Counterpart, NullRun>, crate::Error> {
51 let cwd = std::env::current_dir().map_err(|e| {
52 crate::Error::internal_error().data(format!("cannot get current directory: {e}"))
53 })?;
54 Ok(self.build_session(cwd))
55 }
56
57 /// Session builder starting from an existing request.
58 ///
59 /// Use this when you've intercepted a `session.new` request and want to
60 /// modify it (e.g., inject MCP servers) before forwarding.
61 pub fn build_session_from(
62 &self,
63 request: NewSessionRequest,
64 ) -> SessionBuilder<Counterpart, NullRun> {
65 SessionBuilder::new(self, request)
66 }
67
68 /// Given a session response received from the agent,
69 /// attach a handler to process messages related to this session
70 /// and let you access them.
71 ///
72 /// Normally you would not use this method directly but would
73 /// instead use [`Self::build_session`] and then [`SessionBuilder::start_session`].
74 ///
75 /// The vector `dynamic_handler_registrations` contains any dynamic
76 /// handle registrations associated with this session (e.g., from MCP servers).
77 /// You can simply pass `Default::default()` if not applicable.
78 pub fn attach_session<'responder>(
79 &self,
80 response: NewSessionResponse,
81 mcp_handler_registrations: Vec<DynamicHandlerRegistration<Counterpart>>,
82 ) -> Result<ActiveSession<'responder, Counterpart>, crate::Error> {
83 let NewSessionResponse {
84 session_id,
85 modes,
86 meta,
87 ..
88 } = response;
89
90 let (update_tx, update_rx) = mpsc::unbounded();
91 let handler = ActiveSessionHandler::new(session_id.clone(), update_tx.clone());
92 let session_handler_registration = self.add_dynamic_handler(handler)?;
93
94 Ok(ActiveSession {
95 session_id,
96 modes,
97 meta,
98 update_rx,
99 update_tx,
100 connection: self.clone(),
101 session_handler_registration,
102 mcp_handler_registrations,
103 _responder: PhantomData,
104 })
105 }
106}
107
108/// Session builder for a new session request.
109/// Allows you to add MCP servers or set other details for this session.
110///
111/// The `BlockState` type parameter tracks whether blocking methods are available:
112/// - `NonBlocking` (default): Only [`on_session_start`](Self::on_session_start) is available
113/// - `Blocking` (after calling [`block_task`](Self::block_task)):
114/// [`run_until`](Self::run_until) and [`start_session`](Self::start_session) become available
115#[must_use = "use `start_session`, `run_until`, or `on_session_start` to start the session"]
116#[derive(Debug)]
117pub struct SessionBuilder<
118 Counterpart,
119 Run: RunWithConnectionTo<Counterpart> = NullRun,
120 BlockState: SessionBlockState = NonBlocking,
121> where
122 Counterpart: HasPeer<Agent>,
123{
124 connection: ConnectionTo<Counterpart>,
125 request: NewSessionRequest,
126 dynamic_handler_registrations: Vec<DynamicHandlerRegistration<Counterpart>>,
127 run: Run,
128 block_state: PhantomData<BlockState>,
129}
130
131impl<Counterpart> SessionBuilder<Counterpart, NullRun, NonBlocking>
132where
133 Counterpart: HasPeer<Agent>,
134{
135 fn new(connection: &ConnectionTo<Counterpart>, request: NewSessionRequest) -> Self {
136 SessionBuilder {
137 connection: connection.clone(),
138 request,
139 dynamic_handler_registrations: Vec::default(),
140 run: NullRun,
141 block_state: PhantomData,
142 }
143 }
144}
145
146impl<Counterpart, R, BlockState> SessionBuilder<Counterpart, R, BlockState>
147where
148 Counterpart: HasPeer<Agent>,
149 R: RunWithConnectionTo<Counterpart>,
150 BlockState: SessionBlockState,
151{
152 /// Add the MCP servers from the given registry to this session.
153 pub fn with_mcp_server<McpRun>(
154 mut self,
155 mcp_server: McpServer<Counterpart, McpRun>,
156 ) -> Result<SessionBuilder<Counterpart, ChainRun<R, McpRun>, BlockState>, crate::Error>
157 where
158 McpRun: RunWithConnectionTo<Counterpart>,
159 {
160 let (handler, mcp_run) = mcp_server.into_handler_and_responder();
161 self.dynamic_handler_registrations
162 .push(handler.into_dynamic_handler(&mut self.request, &self.connection)?);
163 Ok(SessionBuilder {
164 connection: self.connection,
165 request: self.request,
166 dynamic_handler_registrations: self.dynamic_handler_registrations,
167 run: ChainRun::new(self.run, mcp_run),
168 block_state: self.block_state,
169 })
170 }
171
172 /// Spawn a task that runs the provided closure once the session starts.
173 ///
174 /// Unlike [`start_session`](Self::start_session), this method returns immediately
175 /// without blocking the current task. The session handshake and closure execution
176 /// happen in a spawned background task.
177 ///
178 /// The closure receives an `ActiveSession<'static, _>` and should return
179 /// `Result<(), Error>`. If the closure returns an error, it will propagate
180 /// to the connection's error handling.
181 ///
182 /// # Example
183 ///
184 /// ```ignore
185 /// # use agent_client_protocol::{Client, Agent, ConnectTo};
186 /// # use agent_client_protocol::mcp_server::McpServer;
187 /// # use agent_client_protocol_rmcp::McpServerExt;
188 /// # async fn example(transport: impl ConnectTo<Client>) -> Result<(), agent_client_protocol::Error> {
189 /// # Client.builder().connect_with(transport, async |cx| {
190 /// # let mcp = McpServer::<Agent, _>::builder("tools").build();
191 /// cx.build_session_cwd()?
192 /// .with_mcp_server(mcp)?
193 /// .on_session_start(async |mut session| {
194 /// // Do something with the session
195 /// session.send_prompt("Hello")?;
196 /// let response = session.read_to_string().await?;
197 /// Ok(())
198 /// })?;
199 /// // Returns immediately, session runs in background
200 /// # Ok(())
201 /// # }).await?;
202 /// # Ok(())
203 /// # }
204 /// ```
205 ///
206 /// # Ordering
207 ///
208 /// This callback blocks the dispatch loop until the session starts and your
209 /// callback completes. See the [`ordering`](crate::concepts::ordering) module for details.
210 pub fn on_session_start<F, Fut>(self, op: F) -> Result<(), crate::Error>
211 where
212 R: 'static,
213 F: FnOnce(ActiveSession<'static, Counterpart>) -> Fut + Send + 'static,
214 Fut: Future<Output = Result<(), crate::Error>> + Send,
215 {
216 let Self {
217 connection,
218 request,
219 dynamic_handler_registrations,
220 run,
221 block_state: _,
222 } = self;
223
224 connection
225 .send_request_to(Agent, request)
226 .on_receiving_result({
227 let connection = connection.clone();
228 async move |result| {
229 let response = result?;
230
231 connection.spawn(run.run_with_connection_to(connection.clone()))?;
232
233 let active_session =
234 connection.attach_session(response, dynamic_handler_registrations)?;
235
236 op(active_session).await
237 }
238 })
239 }
240
241 /// Spawn a proxy session and run a closure with the session ID.
242 ///
243 /// A **proxy session** starts the session with the agent and then automatically
244 /// proxies all session updates (prompts, tool calls, etc.) from the agent back
245 /// to the client. You don't need to handle any messages yourself - the proxy
246 /// takes care of forwarding everything. This is useful when you want to inject
247 /// and/or filter prompts coming from the client but otherwise not be involved
248 /// in the session.
249 ///
250 /// Unlike [`start_session_proxy`](Self::start_session_proxy), this method returns
251 /// immediately without blocking the current task. The session handshake, client
252 /// response, and proxy setup all happen in a spawned background task.
253 ///
254 /// The closure receives the `SessionId` once the session is established, allowing
255 /// you to perform any custom work with that ID (e.g., tracking, logging).
256 ///
257 /// # Example
258 ///
259 /// ```ignore
260 /// # use agent_client_protocol::{Proxy, Client, Conductor, ConnectTo};
261 /// # use agent_client_protocol::schema::v1::NewSessionRequest;
262 /// # use agent_client_protocol::mcp_server::McpServer;
263 /// # use agent_client_protocol_rmcp::McpServerExt;
264 /// # async fn example(transport: impl ConnectTo<Proxy>) -> Result<(), agent_client_protocol::Error> {
265 /// Proxy.builder()
266 /// .on_receive_request_from(Client, async |request: NewSessionRequest, responder, cx| {
267 /// let mcp = McpServer::<Conductor, _>::builder("tools").build();
268 /// cx.build_session_from(request)
269 /// .with_mcp_server(mcp)?
270 /// .on_proxy_session_start(responder, async |session_id| {
271 /// // Session started
272 /// Ok(())
273 /// })
274 /// }, agent_client_protocol::on_receive_request!())
275 /// .connect_to(transport)
276 /// .await?;
277 /// # Ok(())
278 /// # }
279 /// ```
280 ///
281 /// # Ordering
282 ///
283 /// This callback blocks the dispatch loop until the session starts and your
284 /// callback completes. See the [`ordering`](crate::concepts::ordering) module for details.
285 pub fn on_proxy_session_start<F, Fut>(
286 self,
287 responder: Responder<NewSessionResponse>,
288 op: F,
289 ) -> Result<(), crate::Error>
290 where
291 F: FnOnce(SessionId) -> Fut + Send + 'static,
292 Fut: Future<Output = Result<(), crate::Error>> + Send,
293 Counterpart: HasPeer<Client>,
294 R: 'static,
295 {
296 let Self {
297 connection,
298 request,
299 dynamic_handler_registrations,
300 run,
301 block_state: _,
302 } = self;
303
304 // Send the "new session" request to the agent.
305 let sent = connection.send_request_to(Agent, request);
306 #[cfg(feature = "unstable_cancel_request")]
307 let sent = sent.forward_cancellation_from(responder.cancellation());
308
309 sent.on_receiving_ok_result(responder, {
310 let connection = connection.clone();
311 async move |response, responder| {
312 // Extract the session-id from the response and forward
313 // the response back to the client
314 let session_id = response.session_id.clone();
315 responder.respond(response)?;
316
317 // Install a dynamic handler to proxy messages from this session
318 connection
319 .add_dynamic_handler(ProxySessionMessages::new(session_id.clone()))?
320 .run_indefinitely();
321
322 // Spawn off the run and dynamic handlers to run indefinitely
323 connection.spawn(run.run_with_connection_to(connection.clone()))?;
324 dynamic_handler_registrations
325 .into_iter()
326 .for_each(super::jsonrpc::DynamicHandlerRegistration::run_indefinitely);
327
328 op(session_id).await
329 }
330 })
331 }
332}
333
334impl<Counterpart, R> SessionBuilder<Counterpart, R, NonBlocking>
335where
336 Counterpart: HasPeer<Agent>,
337 R: RunWithConnectionTo<Counterpart>,
338{
339 /// Mark this session builder as being able to block the current task.
340 ///
341 /// After calling this, you can use [`run_until`](Self::run_until) or
342 /// [`start_session`](Self::start_session) which block the current task.
343 ///
344 /// This should not be used from inside a message handler like
345 /// [`Builder::on_receive_request`](`crate::Builder::on_receive_request`) or [`HandleDispatchFrom`]
346 /// implementations.
347 pub fn block_task(self) -> SessionBuilder<Counterpart, R, Blocking> {
348 SessionBuilder {
349 connection: self.connection,
350 request: self.request,
351 dynamic_handler_registrations: self.dynamic_handler_registrations,
352 run: self.run,
353 block_state: PhantomData,
354 }
355 }
356}
357
358impl<Counterpart, R> SessionBuilder<Counterpart, R, Blocking>
359where
360 Counterpart: HasPeer<Agent>,
361 R: RunWithConnectionTo<Counterpart>,
362{
363 /// Run this session synchronously. The current task will be blocked
364 /// and `op` will be executed with the active session information.
365 /// This is useful when you have MCP servers that are borrowed from your local
366 /// stack frame.
367 ///
368 /// The `ActiveSession` passed to `op` has a non-`'static` lifetime, which
369 /// prevents calling [`ActiveSession::proxy_remaining_messages`] (since the
370 /// responders would terminate when `op` returns).
371 ///
372 /// Requires calling [`block_task`](Self::block_task) first.
373 pub async fn run_until<T>(
374 self,
375 op: impl for<'responder> AsyncFnOnce(
376 ActiveSession<'responder, Counterpart>,
377 ) -> Result<T, crate::Error>,
378 ) -> Result<T, crate::Error> {
379 let Self {
380 connection,
381 request,
382 dynamic_handler_registrations,
383 run,
384 block_state: _,
385 } = self;
386
387 let response = connection
388 .send_request_to(Agent, request)
389 .block_task()
390 .await?;
391
392 let active_session = connection.attach_session(response, dynamic_handler_registrations)?;
393
394 run_until(
395 run.run_with_connection_to(connection.clone()),
396 op(active_session),
397 )
398 .await
399 }
400
401 /// Send the request to create the session and return a handle.
402 /// This is an alternative to [`Self::run_until`] that avoids rightward
403 /// drift but at the cost of requiring MCP servers that are `Send` and
404 /// don't access data from the surrounding scope.
405 ///
406 /// Returns an `ActiveSession<'static, _>` because responders are spawned
407 /// into background tasks that live for the connection lifetime.
408 ///
409 /// Requires calling [`block_task`](Self::block_task) first.
410 pub async fn start_session(self) -> Result<ActiveSession<'static, Counterpart>, crate::Error>
411 where
412 R: 'static,
413 {
414 let Self {
415 connection,
416 request,
417 dynamic_handler_registrations,
418 run,
419 block_state: _,
420 } = self;
421
422 let (active_session_tx, active_session_rx) = oneshot::channel();
423
424 connection.clone().spawn(async move {
425 let response = connection
426 .send_request_to(Agent, request)
427 .block_task()
428 .await?;
429
430 connection.spawn(run.run_with_connection_to(connection.clone()))?;
431
432 let active_session =
433 connection.attach_session(response, dynamic_handler_registrations)?;
434
435 active_session_tx
436 .send(active_session)
437 .map_err(|_| crate::Error::internal_error())?;
438
439 Ok(())
440 })?;
441
442 active_session_rx
443 .await
444 .map_err(|_| crate::Error::internal_error())
445 }
446
447 /// Start a proxy session that forwards all messages between client and agent.
448 ///
449 /// A **proxy session** starts the session with the agent and then automatically
450 /// proxies all session updates (prompts, tool calls, etc.) from the agent back
451 /// to the client. You don't need to handle any messages yourself - the proxy
452 /// takes care of forwarding everything. This is useful when you want to inject
453 /// and/or filter prompts coming from the client but otherwise not be involved
454 /// in the session.
455 ///
456 /// This is a convenience method that combines [`start_session`](Self::start_session),
457 /// responding to the client, and [`ActiveSession::proxy_remaining_messages`].
458 ///
459 /// For more control (e.g., to send some messages before proxying), use
460 /// [`start_session`](Self::start_session) instead and call
461 /// [`proxy_remaining_messages`](ActiveSession::proxy_remaining_messages) manually.
462 ///
463 /// Requires calling [`block_task`](Self::block_task) first.
464 pub async fn start_session_proxy(
465 self,
466 responder: Responder<NewSessionResponse>,
467 ) -> Result<SessionId, crate::Error>
468 where
469 Counterpart: HasPeer<Client>,
470 R: 'static,
471 {
472 let active_session = self.start_session().await?;
473 let session_id = active_session.session_id().clone();
474 responder.respond(active_session.response())?;
475 active_session.proxy_remaining_messages()?;
476 Ok(session_id)
477 }
478}
479
480/// Active session struct that lets you send prompts and receive updates.
481///
482/// The `'responder` lifetime represents the span during which responders
483/// (e.g., MCP server handlers) are active. When created via [`SessionBuilder::start_session`],
484/// this is `'static` because responders are spawned into background tasks.
485/// When created via [`SessionBuilder::run_until`], this is tied to the
486/// closure scope, preventing [`Self::proxy_remaining_messages`] from being called
487/// (since the responders would die when the closure returns).
488#[derive(Debug)]
489pub struct ActiveSession<'responder, Link>
490where
491 Link: HasPeer<Agent>,
492{
493 session_id: SessionId,
494 update_rx: mpsc::UnboundedReceiver<SessionMessage>,
495 update_tx: mpsc::UnboundedSender<SessionMessage>,
496 modes: Option<SessionModeState>,
497 meta: Option<serde_json::Map<String, serde_json::Value>>,
498 connection: ConnectionTo<Link>,
499
500 /// Registration for the handler that routes session messages to `update_rx`.
501 /// This is separate from MCP handlers so it can be dropped independently
502 /// when switching to proxy mode.
503 session_handler_registration: DynamicHandlerRegistration<Link>,
504
505 /// Registrations for MCP server handlers.
506 /// These will be dropped once the active-session struct is dropped
507 /// which will cause them to be deregistered.
508 mcp_handler_registrations: Vec<DynamicHandlerRegistration<Link>>,
509
510 /// Phantom lifetime representing the responder lifetime.
511 _responder: PhantomData<&'responder ()>,
512}
513
514/// Incoming message from the agent
515#[non_exhaustive]
516#[derive(Debug)]
517#[cfg_attr(
518 feature = "unstable_cancel_request",
519 allow(
520 clippy::large_enum_variant,
521 reason = "Dispatch messages vastly outnumber StopReason; boxing would add a heap allocation"
522 )
523)]
524pub enum SessionMessage {
525 /// Periodic updates with new content, tool requests, etc.
526 /// Use [`MatchDispatch`] to match on the message type.
527 SessionMessage(Dispatch),
528
529 /// When a prompt completes, the stop reason.
530 StopReason(StopReason),
531}
532
533impl<Link> ActiveSession<'_, Link>
534where
535 Link: HasPeer<Agent>,
536{
537 /// Access the session ID.
538 pub fn session_id(&self) -> &SessionId {
539 &self.session_id
540 }
541
542 /// Access modes available in this session.
543 pub fn modes(&self) -> &Option<SessionModeState> {
544 &self.modes
545 }
546
547 /// Access meta data from session response.
548 pub fn meta(&self) -> &Option<serde_json::Map<String, serde_json::Value>> {
549 &self.meta
550 }
551
552 /// Build a `NewSessionResponse` from the session information.
553 ///
554 /// Useful when you need to forward the session response to a client
555 /// after doing some processing.
556 pub fn response(&self) -> NewSessionResponse {
557 NewSessionResponse::new(self.session_id.clone())
558 .modes(self.modes.clone())
559 .meta(self.meta.clone())
560 }
561
562 /// Access the underlying connection context used to communicate with the agent.
563 pub fn connection(&self) -> ConnectionTo<Link> {
564 self.connection.clone()
565 }
566
567 /// Send a prompt to the agent. You can then read messages sent in response.
568 pub fn send_prompt(&mut self, prompt: impl ToString) -> Result<(), crate::Error> {
569 let update_tx = self.update_tx.clone();
570 self.connection
571 .send_request_to(
572 Agent,
573 PromptRequest::new(self.session_id.clone(), vec![prompt.to_string().into()]),
574 )
575 .on_receiving_result(async move |result| {
576 let PromptResponse { stop_reason, .. } = result?;
577
578 update_tx
579 .unbounded_send(SessionMessage::StopReason(stop_reason))
580 .map_err(crate::util::internal_error)?;
581
582 Ok(())
583 })
584 }
585
586 /// Read an update from the agent in response to the prompt.
587 pub async fn read_update(&mut self) -> Result<SessionMessage, crate::Error> {
588 use futures::StreamExt;
589 let message =
590 self.update_rx.next().await.ok_or_else(|| {
591 crate::util::internal_error("session channel closed unexpectedly")
592 })?;
593
594 Ok(message)
595 }
596
597 /// Read all updates until the end of the turn and create a string.
598 /// Ignores non-text updates.
599 pub async fn read_to_string(&mut self) -> Result<String, crate::Error> {
600 let mut output = String::new();
601 loop {
602 let update = self.read_update().await?;
603 tracing::trace!(?update, "read_to_string update");
604 match update {
605 SessionMessage::SessionMessage(dispatch) => MatchDispatch::new(dispatch)
606 .if_notification(async |notif: SessionNotification| match notif.update {
607 SessionUpdate::AgentMessageChunk(ContentChunk {
608 content: ContentBlock::Text(text),
609 ..
610 }) => {
611 output.push_str(&text.text);
612 Ok(())
613 }
614 _ => Ok(()),
615 })
616 .await
617 .otherwise_ignore()?,
618 SessionMessage::StopReason(_stop_reason) => break,
619 }
620 }
621 Ok(output)
622 }
623}
624
625impl<Link> ActiveSession<'static, Link>
626where
627 Link: HasPeer<Agent>,
628{
629 /// Proxy all remaining messages for this session between client and agent.
630 ///
631 /// Use this when you want to inject MCP servers into a session but don't need
632 /// to actively interact with it after setup. The session messages will be proxied
633 /// between client and agent automatically.
634 ///
635 /// This consumes the `ActiveSession` since you're giving up active control.
636 ///
637 /// This method is only available on `ActiveSession<'static, _>` (from
638 /// [`SessionBuilder::start_session`]) because it requires responders to
639 /// outlive the method call.
640 ///
641 /// # Message Ordering Guarantees
642 ///
643 /// This method ensures proper handoff from active session mode to proxy mode
644 /// without losing or reordering messages:
645 ///
646 /// 1. **Stop the session handler** - Drop the registration that routes messages
647 /// to `update_rx`. After this, no new messages will be queued.
648 /// 2. **Close the channel** - Drop `update_tx` so we can detect when the channel
649 /// is fully drained.
650 /// 3. **Drain queued messages** - Forward any messages that were already queued
651 /// in `update_rx` to the client, preserving order.
652 /// 4. **Install proxy handler** - Now that all queued messages are forwarded,
653 /// install the proxy handler to handle future messages.
654 ///
655 /// This sequence prevents the race condition where messages could be delivered
656 /// out of order or lost during the transition.
657 pub fn proxy_remaining_messages(self) -> Result<(), crate::Error>
658 where
659 Link: HasPeer<Client>,
660 {
661 // Destructure self to get ownership of all fields
662 let ActiveSession {
663 session_id,
664 mut update_rx,
665 update_tx,
666 connection,
667 session_handler_registration,
668 mcp_handler_registrations,
669 // These fields are not needed for proxying
670 modes: _,
671 meta: _,
672 _responder,
673 } = self;
674
675 // Step 1: Drop the session handler registration.
676 // This unregisters the handler that was routing messages to update_rx.
677 // After this point, no new messages will be added to the channel.
678 drop(session_handler_registration);
679
680 // Step 2: Drop the sender side of the channel.
681 // This allows us to detect when the channel is fully drained
682 // (recv will return None when empty and sender is dropped).
683 drop(update_tx);
684
685 // Step 3: Drain any messages that were already queued and forward to client.
686 // These messages arrived before we dropped the handler but haven't been
687 // consumed yet. We must forward them to maintain message ordering.
688 while let Ok(message) = update_rx.try_recv() {
689 match message {
690 SessionMessage::SessionMessage(dispatch) => {
691 // Forward the message to the client
692 connection.send_proxied_message_to(Client, dispatch)?;
693 }
694 SessionMessage::StopReason(_) => {
695 // StopReason is internal bookkeeping, not forwarded
696 }
697 }
698 }
699
700 // Step 4: Install the proxy handler for future messages.
701 // Now that all queued messages have been forwarded, the proxy handler
702 // can take over. Any new messages will go directly through the proxy.
703 connection
704 .add_dynamic_handler(ProxySessionMessages::new(session_id))?
705 .run_indefinitely();
706
707 // Keep MCP server handlers alive for the lifetime of the proxy
708 for registration in mcp_handler_registrations {
709 registration.run_indefinitely();
710 }
711
712 Ok(())
713 }
714}
715
716struct ActiveSessionHandler {
717 session_id: SessionId,
718 update_tx: mpsc::UnboundedSender<SessionMessage>,
719}
720
721impl ActiveSessionHandler {
722 pub fn new(session_id: SessionId, update_tx: mpsc::UnboundedSender<SessionMessage>) -> Self {
723 Self {
724 session_id,
725 update_tx,
726 }
727 }
728}
729
730impl<Counterpart: Role> HandleDispatchFrom<Counterpart> for ActiveSessionHandler
731where
732 Counterpart: HasPeer<Agent>,
733{
734 async fn handle_dispatch_from(
735 &mut self,
736 message: Dispatch,
737 cx: ConnectionTo<Counterpart>,
738 ) -> Result<Handled<Dispatch>, crate::Error> {
739 // If this is a message for our session, grab it.
740 tracing::trace!(
741 ?message,
742 handler_session_id = ?self.session_id,
743 "ActiveSessionHandler::handle_dispatch"
744 );
745 MatchDispatchFrom::new(message, &cx)
746 .if_message_from(Agent, async |message| {
747 if let Some(session_id) = message.get_session_id()? {
748 tracing::trace!(
749 message_session_id = ?session_id,
750 handler_session_id = ?self.session_id,
751 "ActiveSessionHandler::handle_dispatch"
752 );
753 if session_id == self.session_id {
754 self.update_tx
755 .unbounded_send(SessionMessage::SessionMessage(message))
756 .map_err(crate::util::internal_error)?;
757 return Ok(Handled::Yes);
758 }
759 }
760
761 // Otherwise, pass it through.
762 Ok(Handled::No {
763 message,
764 retry: false,
765 })
766 })
767 .await
768 .done()
769 }
770
771 fn describe_chain(&self) -> impl std::fmt::Debug {
772 format!("ActiveSessionHandler({})", self.session_id)
773 }
774}