Skip to main content

gestalt/
agent_manager.rs

1use hyper_util::rt::TokioIo;
2use tokio::net::UnixStream;
3use tonic::Request;
4use tonic::metadata::MetadataValue;
5use tonic::service::Interceptor;
6use tonic::service::interceptor::InterceptedService;
7use tonic::transport::{Channel, ClientTlsConfig, Endpoint, Uri};
8use tower::service_fn;
9
10use crate::generated::v1::{
11    self as pb, agent_manager_host_client::AgentManagerHostClient as ProtoAgentManagerHostClient,
12};
13
14type AgentManagerTransport = InterceptedService<Channel, RelayTokenInterceptor>;
15
16/// Environment variable containing the agent-manager host-service target.
17pub const ENV_AGENT_MANAGER_SOCKET: &str = "GESTALT_AGENT_MANAGER_SOCKET";
18/// Environment variable containing the optional agent-manager relay token.
19pub const ENV_AGENT_MANAGER_SOCKET_TOKEN: &str = "GESTALT_AGENT_MANAGER_SOCKET_TOKEN";
20const AGENT_MANAGER_RELAY_TOKEN_HEADER: &str = "x-gestalt-host-service-relay-token";
21
22#[derive(Debug, thiserror::Error)]
23/// Errors returned by [`AgentManager`].
24pub enum AgentManagerError {
25    /// The invocation token was empty.
26    #[error("agent manager: invocation token is not available")]
27    MissingInvocationToken,
28    /// The host-service transport could not be created.
29    #[error("{0}")]
30    Transport(#[from] tonic::transport::Error),
31    /// The host-service RPC returned a gRPC status.
32    #[error("{0}")]
33    Status(#[from] tonic::Status),
34    /// Required environment or target configuration was invalid.
35    #[error("{0}")]
36    Env(String),
37}
38
39/// Client for managing agent sessions, turns, events, and interactions.
40pub struct AgentManager {
41    client: ProtoAgentManagerHostClient<AgentManagerTransport>,
42    invocation_token: String,
43}
44
45impl AgentManager {
46    /// Connects to the agent manager with an invocation token from the host.
47    pub async fn connect(
48        invocation_token: impl AsRef<str>,
49    ) -> std::result::Result<Self, AgentManagerError> {
50        let invocation_token = invocation_token.as_ref().trim().to_owned();
51        if invocation_token.is_empty() {
52            return Err(AgentManagerError::MissingInvocationToken);
53        }
54
55        let socket_path = std::env::var(ENV_AGENT_MANAGER_SOCKET).map_err(|_| {
56            AgentManagerError::Env(format!("{ENV_AGENT_MANAGER_SOCKET} is not set"))
57        })?;
58        let relay_token = std::env::var(ENV_AGENT_MANAGER_SOCKET_TOKEN).unwrap_or_default();
59        let channel = match parse_agent_manager_target(&socket_path)? {
60            AgentManagerTarget::Unix(path) => {
61                Endpoint::try_from("http://[::]:50051")?
62                    .connect_with_connector(service_fn(move |_: Uri| {
63                        let path = path.clone();
64                        async move { UnixStream::connect(path).await.map(TokioIo::new) }
65                    }))
66                    .await?
67            }
68            AgentManagerTarget::Tcp(address) => {
69                Endpoint::from_shared(format!("http://{address}"))?
70                    .connect()
71                    .await?
72            }
73            AgentManagerTarget::Tls(address) => {
74                Endpoint::from_shared(format!("https://{address}"))?
75                    .tls_config(ClientTlsConfig::new().with_native_roots())?
76                    .connect()
77                    .await?
78            }
79        };
80
81        Ok(Self {
82            client: ProtoAgentManagerHostClient::with_interceptor(
83                channel,
84                relay_token_interceptor(relay_token.trim())?,
85            ),
86            invocation_token,
87        })
88    }
89
90    /// Creates an agent session.
91    pub async fn create_session(
92        &mut self,
93        mut request: pb::AgentManagerCreateSessionRequest,
94    ) -> std::result::Result<pb::AgentSession, AgentManagerError> {
95        request.invocation_token = self.invocation_token.clone();
96        Ok(self.client.create_session(request).await?.into_inner())
97    }
98
99    /// Fetches one agent session.
100    pub async fn get_session(
101        &mut self,
102        mut request: pb::AgentManagerGetSessionRequest,
103    ) -> std::result::Result<pb::AgentSession, AgentManagerError> {
104        request.invocation_token = self.invocation_token.clone();
105        Ok(self.client.get_session(request).await?.into_inner())
106    }
107
108    /// Lists agent sessions visible to the invocation token.
109    pub async fn list_sessions(
110        &mut self,
111        mut request: pb::AgentManagerListSessionsRequest,
112    ) -> std::result::Result<pb::AgentManagerListSessionsResponse, AgentManagerError> {
113        request.invocation_token = self.invocation_token.clone();
114        Ok(self.client.list_sessions(request).await?.into_inner())
115    }
116
117    /// Updates mutable fields on an agent session.
118    pub async fn update_session(
119        &mut self,
120        mut request: pb::AgentManagerUpdateSessionRequest,
121    ) -> std::result::Result<pb::AgentSession, AgentManagerError> {
122        request.invocation_token = self.invocation_token.clone();
123        Ok(self.client.update_session(request).await?.into_inner())
124    }
125
126    /// Creates an agent turn.
127    pub async fn create_turn(
128        &mut self,
129        mut request: pb::AgentManagerCreateTurnRequest,
130    ) -> std::result::Result<pb::AgentTurn, AgentManagerError> {
131        request.invocation_token = self.invocation_token.clone();
132        Ok(self.client.create_turn(request).await?.into_inner())
133    }
134
135    /// Fetches one agent turn.
136    pub async fn get_turn(
137        &mut self,
138        mut request: pb::AgentManagerGetTurnRequest,
139    ) -> std::result::Result<pb::AgentTurn, AgentManagerError> {
140        request.invocation_token = self.invocation_token.clone();
141        Ok(self.client.get_turn(request).await?.into_inner())
142    }
143
144    /// Lists turns for an agent session.
145    pub async fn list_turns(
146        &mut self,
147        mut request: pb::AgentManagerListTurnsRequest,
148    ) -> std::result::Result<pb::AgentManagerListTurnsResponse, AgentManagerError> {
149        request.invocation_token = self.invocation_token.clone();
150        Ok(self.client.list_turns(request).await?.into_inner())
151    }
152
153    /// Cancels an in-progress agent turn.
154    pub async fn cancel_turn(
155        &mut self,
156        mut request: pb::AgentManagerCancelTurnRequest,
157    ) -> std::result::Result<pb::AgentTurn, AgentManagerError> {
158        request.invocation_token = self.invocation_token.clone();
159        Ok(self.client.cancel_turn(request).await?.into_inner())
160    }
161
162    /// Lists events emitted for an agent turn.
163    pub async fn list_turn_events(
164        &mut self,
165        mut request: pb::AgentManagerListTurnEventsRequest,
166    ) -> std::result::Result<pb::AgentManagerListTurnEventsResponse, AgentManagerError> {
167        request.invocation_token = self.invocation_token.clone();
168        Ok(self.client.list_turn_events(request).await?.into_inner())
169    }
170
171    /// Lists pending or completed agent interactions.
172    pub async fn list_interactions(
173        &mut self,
174        mut request: pb::AgentManagerListInteractionsRequest,
175    ) -> std::result::Result<pb::AgentManagerListInteractionsResponse, AgentManagerError> {
176        request.invocation_token = self.invocation_token.clone();
177        Ok(self.client.list_interactions(request).await?.into_inner())
178    }
179
180    /// Resolves an agent interaction with a host response.
181    pub async fn resolve_interaction(
182        &mut self,
183        mut request: pb::AgentManagerResolveInteractionRequest,
184    ) -> std::result::Result<pb::AgentInteraction, AgentManagerError> {
185        request.invocation_token = self.invocation_token.clone();
186        Ok(self.client.resolve_interaction(request).await?.into_inner())
187    }
188}
189
190#[derive(Clone)]
191struct RelayTokenInterceptor {
192    token: Option<MetadataValue<tonic::metadata::Ascii>>,
193}
194
195impl Interceptor for RelayTokenInterceptor {
196    fn call(
197        &mut self,
198        mut request: Request<()>,
199    ) -> std::result::Result<Request<()>, tonic::Status> {
200        if let Some(token) = self.token.clone() {
201            request
202                .metadata_mut()
203                .insert(AGENT_MANAGER_RELAY_TOKEN_HEADER, token);
204        }
205        Ok(request)
206    }
207}
208
209fn relay_token_interceptor(
210    token: &str,
211) -> std::result::Result<RelayTokenInterceptor, AgentManagerError> {
212    let trimmed = token.trim();
213    let token = if trimmed.is_empty() {
214        None
215    } else {
216        Some(MetadataValue::try_from(trimmed).map_err(|err| {
217            AgentManagerError::Env(format!(
218                "agent manager: invalid relay token metadata: {err}"
219            ))
220        })?)
221    };
222    Ok(RelayTokenInterceptor { token })
223}
224
225enum AgentManagerTarget {
226    Unix(String),
227    Tcp(String),
228    Tls(String),
229}
230
231fn parse_agent_manager_target(
232    raw: &str,
233) -> std::result::Result<AgentManagerTarget, AgentManagerError> {
234    let target = raw.trim();
235    if target.is_empty() {
236        return Err(AgentManagerError::Env(
237            "agent manager: transport target is required".to_string(),
238        ));
239    }
240    if let Some(address) = target.strip_prefix("tcp://") {
241        let address = address.trim();
242        if address.is_empty() {
243            return Err(AgentManagerError::Env(format!(
244                "agent manager: tcp target {raw:?} is missing host:port"
245            )));
246        }
247        return Ok(AgentManagerTarget::Tcp(address.to_string()));
248    }
249    if let Some(address) = target.strip_prefix("tls://") {
250        let address = address.trim();
251        if address.is_empty() {
252            return Err(AgentManagerError::Env(format!(
253                "agent manager: tls target {raw:?} is missing host:port"
254            )));
255        }
256        return Ok(AgentManagerTarget::Tls(address.to_string()));
257    }
258    if let Some(path) = target.strip_prefix("unix://") {
259        let path = path.trim();
260        if path.is_empty() {
261            return Err(AgentManagerError::Env(format!(
262                "agent manager: unix target {raw:?} is missing a socket path"
263            )));
264        }
265        return Ok(AgentManagerTarget::Unix(path.to_string()));
266    }
267    if target.contains("://") {
268        return Err(AgentManagerError::Env(format!(
269            "agent manager: unsupported target scheme in {raw:?}"
270        )));
271    }
272    Ok(AgentManagerTarget::Unix(target.to_string()))
273}