1use hyper_util::rt::TokioIo;
2use tokio::net::UnixStream;
3use tonic::Request;
4use tonic::metadata::MetadataValue;
5use tonic::service::Interceptor;
6use tonic::service::interceptor::InterceptedService;
7use tonic::transport::{Channel, ClientTlsConfig, Endpoint, Uri};
8use tower::service_fn;
9
10use crate::generated::v1::{
11 self as pb, agent_manager_host_client::AgentManagerHostClient as ProtoAgentManagerHostClient,
12};
13
14type AgentManagerTransport = InterceptedService<Channel, RelayTokenInterceptor>;
15
16pub const ENV_AGENT_MANAGER_SOCKET: &str = "GESTALT_AGENT_MANAGER_SOCKET";
18pub const ENV_AGENT_MANAGER_SOCKET_TOKEN: &str = "GESTALT_AGENT_MANAGER_SOCKET_TOKEN";
20const AGENT_MANAGER_RELAY_TOKEN_HEADER: &str = "x-gestalt-host-service-relay-token";
21
22#[derive(Debug, thiserror::Error)]
23pub enum AgentManagerError {
25 #[error("agent manager: invocation token is not available")]
27 MissingInvocationToken,
28 #[error("{0}")]
30 Transport(#[from] tonic::transport::Error),
31 #[error("{0}")]
33 Status(#[from] tonic::Status),
34 #[error("{0}")]
36 Env(String),
37}
38
39pub struct AgentManager {
41 client: ProtoAgentManagerHostClient<AgentManagerTransport>,
42 invocation_token: String,
43}
44
45impl AgentManager {
46 pub async fn connect(
48 invocation_token: impl AsRef<str>,
49 ) -> std::result::Result<Self, AgentManagerError> {
50 let invocation_token = invocation_token.as_ref().trim().to_owned();
51 if invocation_token.is_empty() {
52 return Err(AgentManagerError::MissingInvocationToken);
53 }
54
55 let socket_path = std::env::var(ENV_AGENT_MANAGER_SOCKET).map_err(|_| {
56 AgentManagerError::Env(format!("{ENV_AGENT_MANAGER_SOCKET} is not set"))
57 })?;
58 let relay_token = std::env::var(ENV_AGENT_MANAGER_SOCKET_TOKEN).unwrap_or_default();
59 let channel = match parse_agent_manager_target(&socket_path)? {
60 AgentManagerTarget::Unix(path) => {
61 Endpoint::try_from("http://[::]:50051")?
62 .connect_with_connector(service_fn(move |_: Uri| {
63 let path = path.clone();
64 async move { UnixStream::connect(path).await.map(TokioIo::new) }
65 }))
66 .await?
67 }
68 AgentManagerTarget::Tcp(address) => {
69 Endpoint::from_shared(format!("http://{address}"))?
70 .connect()
71 .await?
72 }
73 AgentManagerTarget::Tls(address) => {
74 Endpoint::from_shared(format!("https://{address}"))?
75 .tls_config(ClientTlsConfig::new().with_native_roots())?
76 .connect()
77 .await?
78 }
79 };
80
81 Ok(Self {
82 client: ProtoAgentManagerHostClient::with_interceptor(
83 channel,
84 relay_token_interceptor(relay_token.trim())?,
85 ),
86 invocation_token,
87 })
88 }
89
90 pub async fn create_session(
92 &mut self,
93 mut request: pb::AgentManagerCreateSessionRequest,
94 ) -> std::result::Result<pb::AgentSession, AgentManagerError> {
95 request.invocation_token = self.invocation_token.clone();
96 Ok(self.client.create_session(request).await?.into_inner())
97 }
98
99 pub async fn get_session(
101 &mut self,
102 mut request: pb::AgentManagerGetSessionRequest,
103 ) -> std::result::Result<pb::AgentSession, AgentManagerError> {
104 request.invocation_token = self.invocation_token.clone();
105 Ok(self.client.get_session(request).await?.into_inner())
106 }
107
108 pub async fn list_sessions(
110 &mut self,
111 mut request: pb::AgentManagerListSessionsRequest,
112 ) -> std::result::Result<pb::AgentManagerListSessionsResponse, AgentManagerError> {
113 request.invocation_token = self.invocation_token.clone();
114 Ok(self.client.list_sessions(request).await?.into_inner())
115 }
116
117 pub async fn update_session(
119 &mut self,
120 mut request: pb::AgentManagerUpdateSessionRequest,
121 ) -> std::result::Result<pb::AgentSession, AgentManagerError> {
122 request.invocation_token = self.invocation_token.clone();
123 Ok(self.client.update_session(request).await?.into_inner())
124 }
125
126 pub async fn create_turn(
128 &mut self,
129 mut request: pb::AgentManagerCreateTurnRequest,
130 ) -> std::result::Result<pb::AgentTurn, AgentManagerError> {
131 request.invocation_token = self.invocation_token.clone();
132 Ok(self.client.create_turn(request).await?.into_inner())
133 }
134
135 pub async fn get_turn(
137 &mut self,
138 mut request: pb::AgentManagerGetTurnRequest,
139 ) -> std::result::Result<pb::AgentTurn, AgentManagerError> {
140 request.invocation_token = self.invocation_token.clone();
141 Ok(self.client.get_turn(request).await?.into_inner())
142 }
143
144 pub async fn list_turns(
146 &mut self,
147 mut request: pb::AgentManagerListTurnsRequest,
148 ) -> std::result::Result<pb::AgentManagerListTurnsResponse, AgentManagerError> {
149 request.invocation_token = self.invocation_token.clone();
150 Ok(self.client.list_turns(request).await?.into_inner())
151 }
152
153 pub async fn cancel_turn(
155 &mut self,
156 mut request: pb::AgentManagerCancelTurnRequest,
157 ) -> std::result::Result<pb::AgentTurn, AgentManagerError> {
158 request.invocation_token = self.invocation_token.clone();
159 Ok(self.client.cancel_turn(request).await?.into_inner())
160 }
161
162 pub async fn list_turn_events(
164 &mut self,
165 mut request: pb::AgentManagerListTurnEventsRequest,
166 ) -> std::result::Result<pb::AgentManagerListTurnEventsResponse, AgentManagerError> {
167 request.invocation_token = self.invocation_token.clone();
168 Ok(self.client.list_turn_events(request).await?.into_inner())
169 }
170
171 pub async fn list_interactions(
173 &mut self,
174 mut request: pb::AgentManagerListInteractionsRequest,
175 ) -> std::result::Result<pb::AgentManagerListInteractionsResponse, AgentManagerError> {
176 request.invocation_token = self.invocation_token.clone();
177 Ok(self.client.list_interactions(request).await?.into_inner())
178 }
179
180 pub async fn resolve_interaction(
182 &mut self,
183 mut request: pb::AgentManagerResolveInteractionRequest,
184 ) -> std::result::Result<pb::AgentInteraction, AgentManagerError> {
185 request.invocation_token = self.invocation_token.clone();
186 Ok(self.client.resolve_interaction(request).await?.into_inner())
187 }
188}
189
190#[derive(Clone)]
191struct RelayTokenInterceptor {
192 token: Option<MetadataValue<tonic::metadata::Ascii>>,
193}
194
195impl Interceptor for RelayTokenInterceptor {
196 fn call(
197 &mut self,
198 mut request: Request<()>,
199 ) -> std::result::Result<Request<()>, tonic::Status> {
200 if let Some(token) = self.token.clone() {
201 request
202 .metadata_mut()
203 .insert(AGENT_MANAGER_RELAY_TOKEN_HEADER, token);
204 }
205 Ok(request)
206 }
207}
208
209fn relay_token_interceptor(
210 token: &str,
211) -> std::result::Result<RelayTokenInterceptor, AgentManagerError> {
212 let trimmed = token.trim();
213 let token = if trimmed.is_empty() {
214 None
215 } else {
216 Some(MetadataValue::try_from(trimmed).map_err(|err| {
217 AgentManagerError::Env(format!(
218 "agent manager: invalid relay token metadata: {err}"
219 ))
220 })?)
221 };
222 Ok(RelayTokenInterceptor { token })
223}
224
225enum AgentManagerTarget {
226 Unix(String),
227 Tcp(String),
228 Tls(String),
229}
230
231fn parse_agent_manager_target(
232 raw: &str,
233) -> std::result::Result<AgentManagerTarget, AgentManagerError> {
234 let target = raw.trim();
235 if target.is_empty() {
236 return Err(AgentManagerError::Env(
237 "agent manager: transport target is required".to_string(),
238 ));
239 }
240 if let Some(address) = target.strip_prefix("tcp://") {
241 let address = address.trim();
242 if address.is_empty() {
243 return Err(AgentManagerError::Env(format!(
244 "agent manager: tcp target {raw:?} is missing host:port"
245 )));
246 }
247 return Ok(AgentManagerTarget::Tcp(address.to_string()));
248 }
249 if let Some(address) = target.strip_prefix("tls://") {
250 let address = address.trim();
251 if address.is_empty() {
252 return Err(AgentManagerError::Env(format!(
253 "agent manager: tls target {raw:?} is missing host:port"
254 )));
255 }
256 return Ok(AgentManagerTarget::Tls(address.to_string()));
257 }
258 if let Some(path) = target.strip_prefix("unix://") {
259 let path = path.trim();
260 if path.is_empty() {
261 return Err(AgentManagerError::Env(format!(
262 "agent manager: unix target {raw:?} is missing a socket path"
263 )));
264 }
265 return Ok(AgentManagerTarget::Unix(path.to_string()));
266 }
267 if target.contains("://") {
268 return Err(AgentManagerError::Env(format!(
269 "agent manager: unsupported target scheme in {raw:?}"
270 )));
271 }
272 Ok(AgentManagerTarget::Unix(target.to_string()))
273}