agent-client-protocol 0.11.0

Core protocol types and traits for the Agent Client Protocol
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
use std::{future::Future, marker::PhantomData, path::Path};

use agent_client_protocol_schema::{
    ContentBlock, ContentChunk, NewSessionRequest, NewSessionResponse, PromptRequest,
    PromptResponse, SessionModeState, SessionNotification, SessionUpdate, StopReason,
};
use futures::channel::mpsc;
use tokio::sync::oneshot;

use crate::{
    Agent, Client, ConnectionTo, Dispatch, HandleDispatchFrom, Handled, Responder, Role,
    jsonrpc::{
        DynamicHandlerRegistration,
        run::{ChainRun, NullRun, RunWithConnectionTo},
    },
    mcp_server::McpServer,
    role::{HasPeer, acp::ProxySessionMessages},
    schema::SessionId,
    util::{MatchDispatch, MatchDispatchFrom, run_until},
};

/// Marker type indicating the session builder will block the current task.
#[derive(Debug)]
pub struct Blocking;
impl SessionBlockState for Blocking {}

/// Marker type indicating the session builder will not block the current task.
#[derive(Debug)]
pub struct NonBlocking;
impl SessionBlockState for NonBlocking {}

/// Trait for marker types that indicate blocking vs blocking API.
/// See [`SessionBuilder::block_task`].
pub trait SessionBlockState: Send + 'static + Sync + std::fmt::Debug {}

impl<Counterpart: Role> ConnectionTo<Counterpart>
where
    Counterpart: HasPeer<Agent>,
{
    /// Session builder for a new session request.
    pub fn build_session(&self, cwd: impl AsRef<Path>) -> SessionBuilder<Counterpart, NullRun> {
        SessionBuilder::new(self, NewSessionRequest::new(cwd.as_ref()))
    }

    /// Session builder using the current working directory.
    ///
    /// This is a convenience wrapper around [`build_session`](Self::build_session)
    /// that uses [`std::env::current_dir`] to get the working directory.
    ///
    /// Returns an error if the current directory cannot be determined.
    pub fn build_session_cwd(&self) -> Result<SessionBuilder<Counterpart, NullRun>, crate::Error> {
        let cwd = std::env::current_dir().map_err(|e| {
            crate::Error::internal_error().data(format!("cannot get current directory: {e}"))
        })?;
        Ok(self.build_session(cwd))
    }

    /// Session builder starting from an existing request.
    ///
    /// Use this when you've intercepted a `session.new` request and want to
    /// modify it (e.g., inject MCP servers) before forwarding.
    pub fn build_session_from(
        &self,
        request: NewSessionRequest,
    ) -> SessionBuilder<Counterpart, NullRun> {
        SessionBuilder::new(self, request)
    }

    /// Given a session response received from the agent,
    /// attach a handler to process messages related to this session
    /// and let you access them.
    ///
    /// Normally you would not use this method directly but would
    /// instead use [`Self::build_session`] and then [`SessionBuilder::start_session`].
    ///
    /// The vector `dynamic_handler_registrations` contains any dynamic
    /// handle registrations associated with this session (e.g., from MCP servers).
    /// You can simply pass `Default::default()` if not applicable.
    pub fn attach_session<'responder>(
        &self,
        response: NewSessionResponse,
        mcp_handler_registrations: Vec<DynamicHandlerRegistration<Counterpart>>,
    ) -> Result<ActiveSession<'responder, Counterpart>, crate::Error> {
        let NewSessionResponse {
            session_id,
            modes,
            meta,
            ..
        } = response;

        let (update_tx, update_rx) = mpsc::unbounded();
        let handler = ActiveSessionHandler::new(session_id.clone(), update_tx.clone());
        let session_handler_registration = self.add_dynamic_handler(handler)?;

        Ok(ActiveSession {
            session_id,
            modes,
            meta,
            update_rx,
            update_tx,
            connection: self.clone(),
            session_handler_registration,
            mcp_handler_registrations,
            _responder: PhantomData,
        })
    }
}

/// Session builder for a new session request.
/// Allows you to add MCP servers or set other details for this session.
///
/// The `BlockState` type parameter tracks whether blocking methods are available:
/// - `NonBlocking` (default): Only [`on_session_start`](Self::on_session_start) is available
/// - `Blocking` (after calling [`block_task`](Self::block_task)):
///   [`run_until`](Self::run_until) and [`start_session`](Self::start_session) become available
#[must_use = "use `start_session`, `run_until`, or `on_session_start` to start the session"]
#[derive(Debug)]
pub struct SessionBuilder<
    Counterpart,
    Run: RunWithConnectionTo<Counterpart> = NullRun,
    BlockState: SessionBlockState = NonBlocking,
> where
    Counterpart: HasPeer<Agent>,
{
    connection: ConnectionTo<Counterpart>,
    request: NewSessionRequest,
    dynamic_handler_registrations: Vec<DynamicHandlerRegistration<Counterpart>>,
    run: Run,
    block_state: PhantomData<BlockState>,
}

impl<Counterpart> SessionBuilder<Counterpart, NullRun, NonBlocking>
where
    Counterpart: HasPeer<Agent>,
{
    fn new(connection: &ConnectionTo<Counterpart>, request: NewSessionRequest) -> Self {
        SessionBuilder {
            connection: connection.clone(),
            request,
            dynamic_handler_registrations: Vec::default(),
            run: NullRun,
            block_state: PhantomData,
        }
    }
}

impl<Counterpart, R, BlockState> SessionBuilder<Counterpart, R, BlockState>
where
    Counterpart: HasPeer<Agent>,
    R: RunWithConnectionTo<Counterpart>,
    BlockState: SessionBlockState,
{
    /// Add the MCP servers from the given registry to this session.
    pub fn with_mcp_server<McpRun>(
        mut self,
        mcp_server: McpServer<Counterpart, McpRun>,
    ) -> Result<SessionBuilder<Counterpart, ChainRun<R, McpRun>, BlockState>, crate::Error>
    where
        McpRun: RunWithConnectionTo<Counterpart>,
    {
        let (handler, mcp_run) = mcp_server.into_handler_and_responder();
        self.dynamic_handler_registrations
            .push(handler.into_dynamic_handler(&mut self.request, &self.connection)?);
        Ok(SessionBuilder {
            connection: self.connection,
            request: self.request,
            dynamic_handler_registrations: self.dynamic_handler_registrations,
            run: ChainRun::new(self.run, mcp_run),
            block_state: self.block_state,
        })
    }

    /// Spawn a task that runs the provided closure once the session starts.
    ///
    /// Unlike [`start_session`](Self::start_session), this method returns immediately
    /// without blocking the current task. The session handshake and closure execution
    /// happen in a spawned background task.
    ///
    /// The closure receives an `ActiveSession<'static, _>` and should return
    /// `Result<(), Error>`. If the closure returns an error, it will propagate
    /// to the connection's error handling.
    ///
    /// # Example
    ///
    /// ```
    /// # use agent_client_protocol::{Client, Agent, ConnectTo};
    /// # use agent_client_protocol::mcp_server::McpServer;
    /// # async fn example(transport: impl ConnectTo<Client>) -> Result<(), agent_client_protocol::Error> {
    /// # Client.builder().connect_with(transport, async |cx| {
    /// # let mcp = McpServer::<Agent, _>::builder("tools").build();
    /// cx.build_session_cwd()?
    ///     .with_mcp_server(mcp)?
    ///     .on_session_start(async |mut session| {
    ///         // Do something with the session
    ///         session.send_prompt("Hello")?;
    ///         let response = session.read_to_string().await?;
    ///         Ok(())
    ///     })?;
    /// // Returns immediately, session runs in background
    /// # Ok(())
    /// # }).await?;
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Ordering
    ///
    /// This callback blocks the dispatch loop until the session starts and your
    /// callback completes. See the [`ordering`](crate::concepts::ordering) module for details.
    pub fn on_session_start<F, Fut>(self, op: F) -> Result<(), crate::Error>
    where
        R: 'static,
        F: FnOnce(ActiveSession<'static, Counterpart>) -> Fut + Send + 'static,
        Fut: Future<Output = Result<(), crate::Error>> + Send,
    {
        let Self {
            connection,
            request,
            dynamic_handler_registrations,
            run,
            block_state: _,
        } = self;

        connection
            .send_request_to(Agent, request)
            .on_receiving_result({
                let connection = connection.clone();
                async move |result| {
                    let response = result?;

                    connection.spawn(run.run_with_connection_to(connection.clone()))?;

                    let active_session =
                        connection.attach_session(response, dynamic_handler_registrations)?;

                    op(active_session).await
                }
            })
    }

    /// Spawn a proxy session and run a closure with the session ID.
    ///
    /// A **proxy session** starts the session with the agent and then automatically
    /// proxies all session updates (prompts, tool calls, etc.) from the agent back
    /// to the client. You don't need to handle any messages yourself - the proxy
    /// takes care of forwarding everything. This is useful when you want to inject
    /// and/or filter prompts coming from the client but otherwise not be involved
    /// in the session.
    ///
    /// Unlike [`start_session_proxy`](Self::start_session_proxy), this method returns
    /// immediately without blocking the current task. The session handshake, client
    /// response, and proxy setup all happen in a spawned background task.
    ///
    /// The closure receives the `SessionId` once the session is established, allowing
    /// you to perform any custom work with that ID (e.g., tracking, logging).
    ///
    /// # Example
    ///
    /// ```
    /// # use agent_client_protocol::{Proxy, Client, Conductor, ConnectTo};
    /// # use agent_client_protocol::schema::NewSessionRequest;
    /// # use agent_client_protocol::mcp_server::McpServer;
    /// # async fn example(transport: impl ConnectTo<Proxy>) -> Result<(), agent_client_protocol::Error> {
    /// Proxy.builder()
    ///     .on_receive_request_from(Client, async |request: NewSessionRequest, responder, cx| {
    ///         let mcp = McpServer::<Conductor, _>::builder("tools").build();
    ///         cx.build_session_from(request)
    ///             .with_mcp_server(mcp)?
    ///             .on_proxy_session_start(responder, async |session_id| {
    ///                 // Session started
    ///                 Ok(())
    ///             })
    ///     }, agent_client_protocol::on_receive_request!())
    ///     .connect_to(transport)
    ///     .await?;
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Ordering
    ///
    /// This callback blocks the dispatch loop until the session starts and your
    /// callback completes. See the [`ordering`](crate::concepts::ordering) module for details.
    pub fn on_proxy_session_start<F, Fut>(
        self,
        responder: Responder<NewSessionResponse>,
        op: F,
    ) -> Result<(), crate::Error>
    where
        F: FnOnce(SessionId) -> Fut + Send + 'static,
        Fut: Future<Output = Result<(), crate::Error>> + Send,
        Counterpart: HasPeer<Client>,
        R: 'static,
    {
        let Self {
            connection,
            request,
            dynamic_handler_registrations,
            run,
            block_state: _,
        } = self;

        // Spawn off the run and dynamic handlers to run indefinitely
        connection.spawn(run.run_with_connection_to(connection.clone()))?;
        dynamic_handler_registrations
            .into_iter()
            .for_each(super::jsonrpc::DynamicHandlerRegistration::run_indefinitely);

        // Send the "new session" request to the agent
        connection
            .send_request_to(Agent, request)
            .on_receiving_result({
                let connection = connection.clone();
                async move |result| {
                    let response = result?;

                    // Extract the session-id from the response and forward
                    // the response back to the client
                    let session_id = response.session_id.clone();
                    responder.respond(response)?;

                    // Install a dynamic handler to proxy messages from this session
                    connection
                        .add_dynamic_handler(ProxySessionMessages::new(session_id.clone()))?
                        .run_indefinitely();

                    op(session_id).await
                }
            })
    }
}

impl<Counterpart, R> SessionBuilder<Counterpart, R, NonBlocking>
where
    Counterpart: HasPeer<Agent>,
    R: RunWithConnectionTo<Counterpart>,
{
    /// Mark this session builder as being able to block the current task.
    ///
    /// After calling this, you can use [`run_until`](Self::run_until) or
    /// [`start_session`](Self::start_session) which block the current task.
    ///
    /// This should not be used from inside a message handler like
    /// [`Builder::on_receive_request`](`crate::Builder::on_receive_request`) or [`HandleDispatchFrom`]
    /// implementations.
    pub fn block_task(self) -> SessionBuilder<Counterpart, R, Blocking> {
        SessionBuilder {
            connection: self.connection,
            request: self.request,
            dynamic_handler_registrations: self.dynamic_handler_registrations,
            run: self.run,
            block_state: PhantomData,
        }
    }
}

impl<Counterpart, R> SessionBuilder<Counterpart, R, Blocking>
where
    Counterpart: HasPeer<Agent>,
    R: RunWithConnectionTo<Counterpart>,
{
    /// Run this session synchronously. The current task will be blocked
    /// and `op` will be executed with the active session information.
    /// This is useful when you have MCP servers that are borrowed from your local
    /// stack frame.
    ///
    /// The `ActiveSession` passed to `op` has a non-`'static` lifetime, which
    /// prevents calling [`ActiveSession::proxy_remaining_messages`] (since the
    /// responders would terminate when `op` returns).
    ///
    /// Requires calling [`block_task`](Self::block_task) first.
    pub async fn run_until<T>(
        self,
        op: impl for<'responder> AsyncFnOnce(
            ActiveSession<'responder, Counterpart>,
        ) -> Result<T, crate::Error>,
    ) -> Result<T, crate::Error> {
        let Self {
            connection,
            request,
            dynamic_handler_registrations,
            run,
            block_state: _,
        } = self;

        let response = connection
            .send_request_to(Agent, request)
            .block_task()
            .await?;

        let active_session = connection.attach_session(response, dynamic_handler_registrations)?;

        run_until(
            run.run_with_connection_to(connection.clone()),
            op(active_session),
        )
        .await
    }

    /// Send the request to create the session and return a handle.
    /// This is an alternative to [`Self::run_until`] that avoids rightward
    /// drift but at the cost of requiring MCP servers that are `Send` and
    /// don't access data from the surrounding scope.
    ///
    /// Returns an `ActiveSession<'static, _>` because responders are spawned
    /// into background tasks that live for the connection lifetime.
    ///
    /// Requires calling [`block_task`](Self::block_task) first.
    pub async fn start_session(self) -> Result<ActiveSession<'static, Counterpart>, crate::Error>
    where
        R: 'static,
    {
        let Self {
            connection,
            request,
            dynamic_handler_registrations,
            run,
            block_state: _,
        } = self;

        let (active_session_tx, active_session_rx) = oneshot::channel();

        connection.clone().spawn(async move {
            let response = connection
                .send_request_to(Agent, request)
                .block_task()
                .await?;

            connection.spawn(run.run_with_connection_to(connection.clone()))?;

            let active_session =
                connection.attach_session(response, dynamic_handler_registrations)?;

            active_session_tx
                .send(active_session)
                .map_err(|_| crate::Error::internal_error())?;

            Ok(())
        })?;

        active_session_rx
            .await
            .map_err(|_| crate::Error::internal_error())
    }

    /// Start a proxy session that forwards all messages between client and agent.
    ///
    /// A **proxy session** starts the session with the agent and then automatically
    /// proxies all session updates (prompts, tool calls, etc.) from the agent back
    /// to the client. You don't need to handle any messages yourself - the proxy
    /// takes care of forwarding everything. This is useful when you want to inject
    /// and/or filter prompts coming from the client but otherwise not be involved
    /// in the session.
    ///
    /// This is a convenience method that combines [`start_session`](Self::start_session),
    /// responding to the client, and [`ActiveSession::proxy_remaining_messages`].
    ///
    /// For more control (e.g., to send some messages before proxying), use
    /// [`start_session`](Self::start_session) instead and call
    /// [`proxy_remaining_messages`](ActiveSession::proxy_remaining_messages) manually.
    ///
    /// Requires calling [`block_task`](Self::block_task) first.
    pub async fn start_session_proxy(
        self,
        responder: Responder<NewSessionResponse>,
    ) -> Result<SessionId, crate::Error>
    where
        Counterpart: HasPeer<Client>,
        R: 'static,
    {
        let active_session = self.start_session().await?;
        let session_id = active_session.session_id().clone();
        responder.respond(active_session.response())?;
        active_session.proxy_remaining_messages()?;
        Ok(session_id)
    }
}

/// Active session struct that lets you send prompts and receive updates.
///
/// The `'responder` lifetime represents the span during which responders
/// (e.g., MCP server handlers) are active. When created via [`SessionBuilder::start_session`],
/// this is `'static` because responders are spawned into background tasks.
/// When created via [`SessionBuilder::run_until`], this is tied to the
/// closure scope, preventing [`Self::proxy_remaining_messages`] from being called
/// (since the responders would die when the closure returns).
#[derive(Debug)]
pub struct ActiveSession<'responder, Link>
where
    Link: HasPeer<Agent>,
{
    session_id: SessionId,
    update_rx: mpsc::UnboundedReceiver<SessionMessage>,
    update_tx: mpsc::UnboundedSender<SessionMessage>,
    modes: Option<SessionModeState>,
    meta: Option<serde_json::Map<String, serde_json::Value>>,
    connection: ConnectionTo<Link>,

    /// Registration for the handler that routes session messages to `update_rx`.
    /// This is separate from MCP handlers so it can be dropped independently
    /// when switching to proxy mode.
    session_handler_registration: DynamicHandlerRegistration<Link>,

    /// Registrations for MCP server handlers.
    /// These will be dropped once the active-session struct is dropped
    /// which will cause them to be deregistered.
    mcp_handler_registrations: Vec<DynamicHandlerRegistration<Link>>,

    /// Phantom lifetime representing the responder lifetime.
    _responder: PhantomData<&'responder ()>,
}

/// Incoming message from the agent
#[non_exhaustive]
#[derive(Debug)]
pub enum SessionMessage {
    /// Periodic updates with new content, tool requests, etc.
    /// Use [`MatchDispatch`] to match on the message type.
    SessionMessage(Dispatch),

    /// When a prompt completes, the stop reason.
    StopReason(StopReason),
}

impl<Link> ActiveSession<'_, Link>
where
    Link: HasPeer<Agent>,
{
    /// Access the session ID.
    pub fn session_id(&self) -> &SessionId {
        &self.session_id
    }

    /// Access modes available in this session.
    pub fn modes(&self) -> &Option<SessionModeState> {
        &self.modes
    }

    /// Access meta data from session response.
    pub fn meta(&self) -> &Option<serde_json::Map<String, serde_json::Value>> {
        &self.meta
    }

    /// Build a `NewSessionResponse` from the session information.
    ///
    /// Useful when you need to forward the session response to a client
    /// after doing some processing.
    pub fn response(&self) -> NewSessionResponse {
        NewSessionResponse::new(self.session_id.clone())
            .modes(self.modes.clone())
            .meta(self.meta.clone())
    }

    /// Access the underlying connection context used to communicate with the agent.
    pub fn connection(&self) -> ConnectionTo<Link> {
        self.connection.clone()
    }

    /// Send a prompt to the agent. You can then read messages sent in response.
    pub fn send_prompt(&mut self, prompt: impl ToString) -> Result<(), crate::Error> {
        let update_tx = self.update_tx.clone();
        self.connection
            .send_request_to(
                Agent,
                PromptRequest::new(self.session_id.clone(), vec![prompt.to_string().into()]),
            )
            .on_receiving_result(async move |result| {
                let PromptResponse {
                    stop_reason,
                    meta: _,
                    ..
                } = result?;

                update_tx
                    .unbounded_send(SessionMessage::StopReason(stop_reason))
                    .map_err(crate::util::internal_error)?;

                Ok(())
            })
    }

    /// Read an update from the agent in response to the prompt.
    pub async fn read_update(&mut self) -> Result<SessionMessage, crate::Error> {
        use futures::StreamExt;
        let message =
            self.update_rx.next().await.ok_or_else(|| {
                crate::util::internal_error("session channel closed unexpectedly")
            })?;

        Ok(message)
    }

    /// Read all updates until the end of the turn and create a string.
    /// Ignores non-text updates.
    pub async fn read_to_string(&mut self) -> Result<String, crate::Error> {
        let mut output = String::new();
        loop {
            let update = self.read_update().await?;
            tracing::trace!(?update, "read_to_string update");
            match update {
                SessionMessage::SessionMessage(dispatch) => MatchDispatch::new(dispatch)
                    .if_notification(async |notif: SessionNotification| match notif.update {
                        SessionUpdate::AgentMessageChunk(ContentChunk {
                            content: ContentBlock::Text(text),
                            meta: _,
                            ..
                        }) => {
                            output.push_str(&text.text);
                            Ok(())
                        }
                        _ => Ok(()),
                    })
                    .await
                    .otherwise_ignore()?,
                SessionMessage::StopReason(_stop_reason) => break,
            }
        }
        Ok(output)
    }
}

impl<Link> ActiveSession<'static, Link>
where
    Link: HasPeer<Agent>,
{
    /// Proxy all remaining messages for this session between client and agent.
    ///
    /// Use this when you want to inject MCP servers into a session but don't need
    /// to actively interact with it after setup. The session messages will be proxied
    /// between client and agent automatically.
    ///
    /// This consumes the `ActiveSession` since you're giving up active control.
    ///
    /// This method is only available on `ActiveSession<'static, _>` (from
    /// [`SessionBuilder::start_session`]) because it requires responders to
    /// outlive the method call.
    ///
    /// # Message Ordering Guarantees
    ///
    /// This method ensures proper handoff from active session mode to proxy mode
    /// without losing or reordering messages:
    ///
    /// 1. **Stop the session handler** - Drop the registration that routes messages
    ///    to `update_rx`. After this, no new messages will be queued.
    /// 2. **Close the channel** - Drop `update_tx` so we can detect when the channel
    ///    is fully drained.
    /// 3. **Drain queued messages** - Forward any messages that were already queued
    ///    in `update_rx` to the client, preserving order.
    /// 4. **Install proxy handler** - Now that all queued messages are forwarded,
    ///    install the proxy handler to handle future messages.
    ///
    /// This sequence prevents the race condition where messages could be delivered
    /// out of order or lost during the transition.
    pub fn proxy_remaining_messages(self) -> Result<(), crate::Error>
    where
        Link: HasPeer<Client>,
    {
        // Destructure self to get ownership of all fields
        let ActiveSession {
            session_id,
            mut update_rx,
            update_tx,
            connection,
            session_handler_registration,
            mcp_handler_registrations,
            // These fields are not needed for proxying
            modes: _,
            meta: _,
            _responder,
        } = self;

        // Step 1: Drop the session handler registration.
        // This unregisters the handler that was routing messages to update_rx.
        // After this point, no new messages will be added to the channel.
        drop(session_handler_registration);

        // Step 2: Drop the sender side of the channel.
        // This allows us to detect when the channel is fully drained
        // (recv will return None when empty and sender is dropped).
        drop(update_tx);

        // Step 3: Drain any messages that were already queued and forward to client.
        // These messages arrived before we dropped the handler but haven't been
        // consumed yet. We must forward them to maintain message ordering.
        while let Ok(message) = update_rx.try_recv() {
            match message {
                SessionMessage::SessionMessage(dispatch) => {
                    // Forward the message to the client
                    connection.send_proxied_message_to(Client, dispatch)?;
                }
                SessionMessage::StopReason(_) => {
                    // StopReason is internal bookkeeping, not forwarded
                }
            }
        }

        // Step 4: Install the proxy handler for future messages.
        // Now that all queued messages have been forwarded, the proxy handler
        // can take over. Any new messages will go directly through the proxy.
        connection
            .add_dynamic_handler(ProxySessionMessages::new(session_id))?
            .run_indefinitely();

        // Keep MCP server handlers alive for the lifetime of the proxy
        for registration in mcp_handler_registrations {
            registration.run_indefinitely();
        }

        Ok(())
    }
}

struct ActiveSessionHandler {
    session_id: SessionId,
    update_tx: mpsc::UnboundedSender<SessionMessage>,
}

impl ActiveSessionHandler {
    pub fn new(session_id: SessionId, update_tx: mpsc::UnboundedSender<SessionMessage>) -> Self {
        Self {
            session_id,
            update_tx,
        }
    }
}

impl<Counterpart: Role> HandleDispatchFrom<Counterpart> for ActiveSessionHandler
where
    Counterpart: HasPeer<Agent>,
{
    async fn handle_dispatch_from(
        &mut self,
        message: Dispatch,
        cx: ConnectionTo<Counterpart>,
    ) -> Result<Handled<Dispatch>, crate::Error> {
        // If this is a message for our session, grab it.
        tracing::trace!(
            ?message,
            handler_session_id = ?self.session_id,
            "ActiveSessionHandler::handle_dispatch"
        );
        MatchDispatchFrom::new(message, &cx)
            .if_message_from(Agent, async |message| {
                if let Some(session_id) = message.get_session_id()? {
                    tracing::trace!(
                        message_session_id = ?session_id,
                        handler_session_id = ?self.session_id,
                        "ActiveSessionHandler::handle_dispatch"
                    );
                    if session_id == self.session_id {
                        self.update_tx
                            .unbounded_send(SessionMessage::SessionMessage(message))
                            .map_err(crate::util::internal_error)?;
                        return Ok(Handled::Yes);
                    }
                }

                // Otherwise, pass it through.
                Ok(Handled::No {
                    message,
                    retry: false,
                })
            })
            .await
            .done()
    }

    fn describe_chain(&self) -> impl std::fmt::Debug {
        format!("ActiveSessionHandler({})", self.session_id)
    }
}