1use hyper_util::rt::TokioIo;
2use tokio::net::UnixStream;
3use tonic::Request;
4use tonic::metadata::MetadataValue;
5use tonic::service::Interceptor;
6use tonic::service::interceptor::InterceptedService;
7use tonic::transport::{Channel, ClientTlsConfig, Endpoint, Uri};
8use tower::service_fn;
9
10use crate::generated::v1::{
11 self as pb,
12 workflow_manager_host_client::WorkflowManagerHostClient as ProtoWorkflowManagerHostClient,
13};
14
15type WorkflowManagerTransport = InterceptedService<Channel, RelayTokenInterceptor>;
16
17pub const ENV_WORKFLOW_MANAGER_SOCKET: &str = "GESTALT_WORKFLOW_MANAGER_SOCKET";
19pub const ENV_WORKFLOW_MANAGER_SOCKET_TOKEN: &str = "GESTALT_WORKFLOW_MANAGER_SOCKET_TOKEN";
21const WORKFLOW_MANAGER_RELAY_TOKEN_HEADER: &str = "x-gestalt-host-service-relay-token";
22
23#[derive(Debug, thiserror::Error)]
24pub enum WorkflowManagerError {
26 #[error("workflow manager: invocation token is not available")]
28 MissingInvocationToken,
29 #[error("{0}")]
31 Transport(#[from] tonic::transport::Error),
32 #[error("{0}")]
34 Status(#[from] tonic::Status),
35 #[error("{0}")]
37 Env(String),
38}
39
40pub struct WorkflowManager {
42 client: ProtoWorkflowManagerHostClient<WorkflowManagerTransport>,
43 invocation_token: String,
44 idempotency_key: String,
45}
46
47impl WorkflowManager {
48 pub async fn connect(
50 invocation_token: impl AsRef<str>,
51 ) -> std::result::Result<Self, WorkflowManagerError> {
52 Self::connect_with_idempotency_key(invocation_token, "").await
53 }
54
55 pub async fn connect_with_idempotency_key(
57 invocation_token: impl AsRef<str>,
58 idempotency_key: impl AsRef<str>,
59 ) -> std::result::Result<Self, WorkflowManagerError> {
60 let invocation_token = invocation_token.as_ref().trim().to_owned();
61 if invocation_token.is_empty() {
62 return Err(WorkflowManagerError::MissingInvocationToken);
63 }
64
65 let socket_path = std::env::var(ENV_WORKFLOW_MANAGER_SOCKET).map_err(|_| {
66 WorkflowManagerError::Env(format!("{ENV_WORKFLOW_MANAGER_SOCKET} is not set"))
67 })?;
68 let relay_token = std::env::var(ENV_WORKFLOW_MANAGER_SOCKET_TOKEN).unwrap_or_default();
69 let channel = match parse_workflow_manager_target(&socket_path)? {
70 WorkflowManagerTarget::Unix(path) => {
71 Endpoint::try_from("http://[::]:50051")?
72 .connect_with_connector(service_fn(move |_: Uri| {
73 let path = path.clone();
74 async move { UnixStream::connect(path).await.map(TokioIo::new) }
75 }))
76 .await?
77 }
78 WorkflowManagerTarget::Tcp(address) => {
79 Endpoint::from_shared(format!("http://{address}"))?
80 .connect()
81 .await?
82 }
83 WorkflowManagerTarget::Tls(address) => {
84 Endpoint::from_shared(format!("https://{address}"))?
85 .tls_config(ClientTlsConfig::new().with_native_roots())?
86 .connect()
87 .await?
88 }
89 };
90
91 Ok(Self {
92 client: ProtoWorkflowManagerHostClient::with_interceptor(
93 channel,
94 relay_token_interceptor(relay_token.trim())?,
95 ),
96 invocation_token,
97 idempotency_key: idempotency_key.as_ref().trim().to_owned(),
98 })
99 }
100
101 pub async fn create_schedule(
103 &mut self,
104 mut request: pb::WorkflowManagerCreateScheduleRequest,
105 ) -> std::result::Result<pb::ManagedWorkflowSchedule, WorkflowManagerError> {
106 request.invocation_token = self.invocation_token.clone();
107 if request.idempotency_key.trim().is_empty() {
108 request.idempotency_key = self.idempotency_key.clone();
109 }
110 Ok(self.client.create_schedule(request).await?.into_inner())
111 }
112
113 pub async fn start_run(
115 &mut self,
116 mut request: pb::WorkflowManagerStartRunRequest,
117 ) -> std::result::Result<pb::ManagedWorkflowRun, WorkflowManagerError> {
118 request.invocation_token = self.invocation_token.clone();
119 Ok(self.client.start_run(request).await?.into_inner())
120 }
121
122 pub async fn signal_run(
124 &mut self,
125 mut request: pb::WorkflowManagerSignalRunRequest,
126 ) -> std::result::Result<pb::ManagedWorkflowRunSignal, WorkflowManagerError> {
127 request.invocation_token = self.invocation_token.clone();
128 Ok(self.client.signal_run(request).await?.into_inner())
129 }
130
131 pub async fn signal_or_start_run(
133 &mut self,
134 mut request: pb::WorkflowManagerSignalOrStartRunRequest,
135 ) -> std::result::Result<pb::ManagedWorkflowRunSignal, WorkflowManagerError> {
136 request.invocation_token = self.invocation_token.clone();
137 Ok(self.client.signal_or_start_run(request).await?.into_inner())
138 }
139
140 pub async fn get_schedule(
142 &mut self,
143 mut request: pb::WorkflowManagerGetScheduleRequest,
144 ) -> std::result::Result<pb::ManagedWorkflowSchedule, WorkflowManagerError> {
145 request.invocation_token = self.invocation_token.clone();
146 Ok(self.client.get_schedule(request).await?.into_inner())
147 }
148
149 pub async fn update_schedule(
151 &mut self,
152 mut request: pb::WorkflowManagerUpdateScheduleRequest,
153 ) -> std::result::Result<pb::ManagedWorkflowSchedule, WorkflowManagerError> {
154 request.invocation_token = self.invocation_token.clone();
155 Ok(self.client.update_schedule(request).await?.into_inner())
156 }
157
158 pub async fn delete_schedule(
160 &mut self,
161 mut request: pb::WorkflowManagerDeleteScheduleRequest,
162 ) -> std::result::Result<(), WorkflowManagerError> {
163 request.invocation_token = self.invocation_token.clone();
164 self.client.delete_schedule(request).await?;
165 Ok(())
166 }
167
168 pub async fn pause_schedule(
170 &mut self,
171 mut request: pb::WorkflowManagerPauseScheduleRequest,
172 ) -> std::result::Result<pb::ManagedWorkflowSchedule, WorkflowManagerError> {
173 request.invocation_token = self.invocation_token.clone();
174 Ok(self.client.pause_schedule(request).await?.into_inner())
175 }
176
177 pub async fn resume_schedule(
179 &mut self,
180 mut request: pb::WorkflowManagerResumeScheduleRequest,
181 ) -> std::result::Result<pb::ManagedWorkflowSchedule, WorkflowManagerError> {
182 request.invocation_token = self.invocation_token.clone();
183 Ok(self.client.resume_schedule(request).await?.into_inner())
184 }
185
186 pub async fn create_trigger(
188 &mut self,
189 mut request: pb::WorkflowManagerCreateEventTriggerRequest,
190 ) -> std::result::Result<pb::ManagedWorkflowEventTrigger, WorkflowManagerError> {
191 request.invocation_token = self.invocation_token.clone();
192 if request.idempotency_key.trim().is_empty() {
193 request.idempotency_key = self.idempotency_key.clone();
194 }
195 Ok(self
196 .client
197 .create_event_trigger(request)
198 .await?
199 .into_inner())
200 }
201
202 pub async fn get_trigger(
204 &mut self,
205 mut request: pb::WorkflowManagerGetEventTriggerRequest,
206 ) -> std::result::Result<pb::ManagedWorkflowEventTrigger, WorkflowManagerError> {
207 request.invocation_token = self.invocation_token.clone();
208 Ok(self.client.get_event_trigger(request).await?.into_inner())
209 }
210
211 pub async fn update_trigger(
213 &mut self,
214 mut request: pb::WorkflowManagerUpdateEventTriggerRequest,
215 ) -> std::result::Result<pb::ManagedWorkflowEventTrigger, WorkflowManagerError> {
216 request.invocation_token = self.invocation_token.clone();
217 Ok(self
218 .client
219 .update_event_trigger(request)
220 .await?
221 .into_inner())
222 }
223
224 pub async fn delete_trigger(
226 &mut self,
227 mut request: pb::WorkflowManagerDeleteEventTriggerRequest,
228 ) -> std::result::Result<(), WorkflowManagerError> {
229 request.invocation_token = self.invocation_token.clone();
230 self.client.delete_event_trigger(request).await?;
231 Ok(())
232 }
233
234 pub async fn pause_trigger(
236 &mut self,
237 mut request: pb::WorkflowManagerPauseEventTriggerRequest,
238 ) -> std::result::Result<pb::ManagedWorkflowEventTrigger, WorkflowManagerError> {
239 request.invocation_token = self.invocation_token.clone();
240 Ok(self.client.pause_event_trigger(request).await?.into_inner())
241 }
242
243 pub async fn resume_trigger(
245 &mut self,
246 mut request: pb::WorkflowManagerResumeEventTriggerRequest,
247 ) -> std::result::Result<pb::ManagedWorkflowEventTrigger, WorkflowManagerError> {
248 request.invocation_token = self.invocation_token.clone();
249 Ok(self
250 .client
251 .resume_event_trigger(request)
252 .await?
253 .into_inner())
254 }
255
256 pub async fn publish_event(
258 &mut self,
259 mut request: pb::WorkflowManagerPublishEventRequest,
260 ) -> std::result::Result<pb::WorkflowEvent, WorkflowManagerError> {
261 request.invocation_token = self.invocation_token.clone();
262 Ok(self.client.publish_event(request).await?.into_inner())
263 }
264}
265
266#[derive(Clone)]
267struct RelayTokenInterceptor {
268 token: Option<MetadataValue<tonic::metadata::Ascii>>,
269}
270
271impl Interceptor for RelayTokenInterceptor {
272 fn call(
273 &mut self,
274 mut request: Request<()>,
275 ) -> std::result::Result<Request<()>, tonic::Status> {
276 if let Some(token) = self.token.clone() {
277 request
278 .metadata_mut()
279 .insert(WORKFLOW_MANAGER_RELAY_TOKEN_HEADER, token);
280 }
281 Ok(request)
282 }
283}
284
285fn relay_token_interceptor(
286 token: &str,
287) -> std::result::Result<RelayTokenInterceptor, WorkflowManagerError> {
288 let trimmed = token.trim();
289 let token = if trimmed.is_empty() {
290 None
291 } else {
292 Some(MetadataValue::try_from(trimmed).map_err(|err| {
293 WorkflowManagerError::Env(format!(
294 "workflow manager: invalid relay token metadata: {err}"
295 ))
296 })?)
297 };
298 Ok(RelayTokenInterceptor { token })
299}
300
301enum WorkflowManagerTarget {
302 Unix(String),
303 Tcp(String),
304 Tls(String),
305}
306
307fn parse_workflow_manager_target(
308 raw: &str,
309) -> std::result::Result<WorkflowManagerTarget, WorkflowManagerError> {
310 let target = raw.trim();
311 if target.is_empty() {
312 return Err(WorkflowManagerError::Env(
313 "workflow manager: transport target is required".to_string(),
314 ));
315 }
316 if let Some(address) = target.strip_prefix("tcp://") {
317 let address = address.trim();
318 if address.is_empty() {
319 return Err(WorkflowManagerError::Env(format!(
320 "workflow manager: tcp target {raw:?} is missing host:port"
321 )));
322 }
323 return Ok(WorkflowManagerTarget::Tcp(address.to_string()));
324 }
325 if let Some(address) = target.strip_prefix("tls://") {
326 let address = address.trim();
327 if address.is_empty() {
328 return Err(WorkflowManagerError::Env(format!(
329 "workflow manager: tls target {raw:?} is missing host:port"
330 )));
331 }
332 return Ok(WorkflowManagerTarget::Tls(address.to_string()));
333 }
334 if let Some(path) = target.strip_prefix("unix://") {
335 let path = path.trim();
336 if path.is_empty() {
337 return Err(WorkflowManagerError::Env(format!(
338 "workflow manager: unix target {raw:?} is missing a socket path"
339 )));
340 }
341 return Ok(WorkflowManagerTarget::Unix(path.to_string()));
342 }
343 if target.contains("://") {
344 return Err(WorkflowManagerError::Env(format!(
345 "workflow manager: unsupported target scheme in {raw:?}"
346 )));
347 }
348 Ok(WorkflowManagerTarget::Unix(target.to_string()))
349}