Skip to main content

agent_client_protocol/jsonrpc/
run.rs

1//! Run trait for background tasks that run alongside a connection.
2//!
3//! Run implementations are composable background tasks that run while a connection is active.
4//! They're used for things like MCP tool handlers that need to receive calls through
5//! channels and invoke user-provided closures.
6
7use std::future::Future;
8
9use crate::{ConnectionTo, role::Role};
10
11/// A background task that runs alongside a connection.
12///
13/// `RunIn<R>` means "run in the context of being role R". The task receives
14/// a `ConnectionTo<R::Counterpart>` for communicating with the other side.
15///
16/// Implementations are composed using [`ChainRun`] and run in parallel
17/// when the connection is active.
18pub trait RunWithConnectionTo<Counterpart: Role>: Send {
19    /// Run this task to completion.
20    fn run_with_connection_to(
21        self,
22        cx: ConnectionTo<Counterpart>,
23    ) -> impl Future<Output = Result<(), crate::Error>> + Send;
24}
25
26/// A no-op RunIn that completes immediately.
27#[derive(Debug, Default)]
28pub struct NullRun;
29
30impl<Counterpart: Role> RunWithConnectionTo<Counterpart> for NullRun {
31    async fn run_with_connection_to(
32        self,
33        _cx: ConnectionTo<Counterpart>,
34    ) -> Result<(), crate::Error> {
35        Ok(())
36    }
37}
38
39/// Chains two RunIn implementations to run in parallel.
40#[derive(Debug)]
41pub struct ChainRun<A, B> {
42    a: A,
43    b: B,
44}
45
46impl<A, B> ChainRun<A, B> {
47    /// Create a new chained RunIn from two RunIn implementations.
48    pub fn new(a: A, b: B) -> Self {
49        Self { a, b }
50    }
51}
52
53impl<Counterpart: Role, A, B> RunWithConnectionTo<Counterpart> for ChainRun<A, B>
54where
55    A: RunWithConnectionTo<Counterpart>,
56    B: RunWithConnectionTo<Counterpart>,
57{
58    async fn run_with_connection_to(
59        self,
60        cx: ConnectionTo<Counterpart>,
61    ) -> Result<(), crate::Error> {
62        // Box the futures to avoid stack overflow with deeply nested RunIn chains
63        let a_fut = Box::pin(self.a.run_with_connection_to(cx.clone()));
64        let b_fut = Box::pin(self.b.run_with_connection_to(cx.clone()));
65        let ((), ()) = futures::future::try_join(a_fut, b_fut).await?;
66        Ok(())
67    }
68}
69
70/// A RunIn created from a closure via [`with_spawned`](crate::Builder::with_spawned).
71pub struct SpawnedRun<F> {
72    task_fn: F,
73    location: &'static std::panic::Location<'static>,
74}
75
76impl<F> SpawnedRun<F> {
77    /// Create a new spawned RunIn from a closure.
78    pub fn new(location: &'static std::panic::Location<'static>, task_fn: F) -> Self {
79        Self { task_fn, location }
80    }
81}
82
83impl<Counterpart, F, Fut> RunWithConnectionTo<Counterpart> for SpawnedRun<F>
84where
85    Counterpart: Role,
86    F: FnOnce(ConnectionTo<Counterpart>) -> Fut + Send,
87    Fut: Future<Output = Result<(), crate::Error>> + Send,
88{
89    async fn run_with_connection_to(
90        self,
91        connection: ConnectionTo<Counterpart>,
92    ) -> Result<(), crate::Error> {
93        let location = self.location;
94        (self.task_fn)(connection).await.map_err(|err| {
95            let data = err.data.clone();
96            err.data(serde_json::json! {
97                {
98                    "spawned_at": format!("{}:{}:{}", location.file(), location.line(), location.column()),
99                    "data": data,
100                }
101            })
102        })
103    }
104}