Skip to main content

gestalt/
workflow_manager.rs

1use hyper_util::rt::TokioIo;
2use tokio::net::UnixStream;
3use tonic::Request;
4use tonic::metadata::MetadataValue;
5use tonic::service::Interceptor;
6use tonic::service::interceptor::InterceptedService;
7use tonic::transport::{Channel, ClientTlsConfig, Endpoint, Uri};
8use tower::service_fn;
9
10use crate::generated::v1::{
11    self as pb,
12    workflow_manager_host_client::WorkflowManagerHostClient as ProtoWorkflowManagerHostClient,
13};
14
15type WorkflowManagerTransport = InterceptedService<Channel, RelayTokenInterceptor>;
16
17/// Environment variable containing the workflow-manager host-service target.
18pub const ENV_WORKFLOW_MANAGER_SOCKET: &str = "GESTALT_WORKFLOW_MANAGER_SOCKET";
19/// Environment variable containing the optional workflow-manager relay token.
20pub const ENV_WORKFLOW_MANAGER_SOCKET_TOKEN: &str = "GESTALT_WORKFLOW_MANAGER_SOCKET_TOKEN";
21const WORKFLOW_MANAGER_RELAY_TOKEN_HEADER: &str = "x-gestalt-host-service-relay-token";
22
23#[derive(Debug, thiserror::Error)]
24/// Errors returned by [`WorkflowManager`].
25pub enum WorkflowManagerError {
26    /// The invocation token was empty.
27    #[error("workflow manager: invocation token is not available")]
28    MissingInvocationToken,
29    /// The host-service transport could not be created.
30    #[error("{0}")]
31    Transport(#[from] tonic::transport::Error),
32    /// The host-service RPC returned a gRPC status.
33    #[error("{0}")]
34    Status(#[from] tonic::Status),
35    /// Required environment or target configuration was invalid.
36    #[error("{0}")]
37    Env(String),
38}
39
40/// Client for starting workflow runs and managing schedules or triggers.
41pub struct WorkflowManager {
42    client: ProtoWorkflowManagerHostClient<WorkflowManagerTransport>,
43    invocation_token: String,
44    idempotency_key: String,
45}
46
47impl WorkflowManager {
48    /// Connects to the workflow manager with an invocation token from the host.
49    pub async fn connect(
50        invocation_token: impl AsRef<str>,
51    ) -> std::result::Result<Self, WorkflowManagerError> {
52        Self::connect_with_idempotency_key(invocation_token, "").await
53    }
54
55    /// Connects with a default idempotency key for create requests.
56    pub async fn connect_with_idempotency_key(
57        invocation_token: impl AsRef<str>,
58        idempotency_key: impl AsRef<str>,
59    ) -> std::result::Result<Self, WorkflowManagerError> {
60        let invocation_token = invocation_token.as_ref().trim().to_owned();
61        if invocation_token.is_empty() {
62            return Err(WorkflowManagerError::MissingInvocationToken);
63        }
64
65        let socket_path = std::env::var(ENV_WORKFLOW_MANAGER_SOCKET).map_err(|_| {
66            WorkflowManagerError::Env(format!("{ENV_WORKFLOW_MANAGER_SOCKET} is not set"))
67        })?;
68        let relay_token = std::env::var(ENV_WORKFLOW_MANAGER_SOCKET_TOKEN).unwrap_or_default();
69        let channel = match parse_workflow_manager_target(&socket_path)? {
70            WorkflowManagerTarget::Unix(path) => {
71                Endpoint::try_from("http://[::]:50051")?
72                    .connect_with_connector(service_fn(move |_: Uri| {
73                        let path = path.clone();
74                        async move { UnixStream::connect(path).await.map(TokioIo::new) }
75                    }))
76                    .await?
77            }
78            WorkflowManagerTarget::Tcp(address) => {
79                Endpoint::from_shared(format!("http://{address}"))?
80                    .connect()
81                    .await?
82            }
83            WorkflowManagerTarget::Tls(address) => {
84                Endpoint::from_shared(format!("https://{address}"))?
85                    .tls_config(ClientTlsConfig::new().with_native_roots())?
86                    .connect()
87                    .await?
88            }
89        };
90
91        Ok(Self {
92            client: ProtoWorkflowManagerHostClient::with_interceptor(
93                channel,
94                relay_token_interceptor(relay_token.trim())?,
95            ),
96            invocation_token,
97            idempotency_key: idempotency_key.as_ref().trim().to_owned(),
98        })
99    }
100
101    /// Creates a workflow schedule.
102    pub async fn create_schedule(
103        &mut self,
104        mut request: pb::WorkflowManagerCreateScheduleRequest,
105    ) -> std::result::Result<pb::ManagedWorkflowSchedule, WorkflowManagerError> {
106        request.invocation_token = self.invocation_token.clone();
107        if request.idempotency_key.trim().is_empty() {
108            request.idempotency_key = self.idempotency_key.clone();
109        }
110        Ok(self.client.create_schedule(request).await?.into_inner())
111    }
112
113    /// Starts a workflow run.
114    pub async fn start_run(
115        &mut self,
116        mut request: pb::WorkflowManagerStartRunRequest,
117    ) -> std::result::Result<pb::ManagedWorkflowRun, WorkflowManagerError> {
118        request.invocation_token = self.invocation_token.clone();
119        Ok(self.client.start_run(request).await?.into_inner())
120    }
121
122    /// Signals an existing workflow run.
123    pub async fn signal_run(
124        &mut self,
125        mut request: pb::WorkflowManagerSignalRunRequest,
126    ) -> std::result::Result<pb::ManagedWorkflowRunSignal, WorkflowManagerError> {
127        request.invocation_token = self.invocation_token.clone();
128        Ok(self.client.signal_run(request).await?.into_inner())
129    }
130
131    /// Signals a run or starts it when no matching run exists.
132    pub async fn signal_or_start_run(
133        &mut self,
134        mut request: pb::WorkflowManagerSignalOrStartRunRequest,
135    ) -> std::result::Result<pb::ManagedWorkflowRunSignal, WorkflowManagerError> {
136        request.invocation_token = self.invocation_token.clone();
137        Ok(self.client.signal_or_start_run(request).await?.into_inner())
138    }
139
140    /// Fetches one workflow schedule.
141    pub async fn get_schedule(
142        &mut self,
143        mut request: pb::WorkflowManagerGetScheduleRequest,
144    ) -> std::result::Result<pb::ManagedWorkflowSchedule, WorkflowManagerError> {
145        request.invocation_token = self.invocation_token.clone();
146        Ok(self.client.get_schedule(request).await?.into_inner())
147    }
148
149    /// Updates a workflow schedule.
150    pub async fn update_schedule(
151        &mut self,
152        mut request: pb::WorkflowManagerUpdateScheduleRequest,
153    ) -> std::result::Result<pb::ManagedWorkflowSchedule, WorkflowManagerError> {
154        request.invocation_token = self.invocation_token.clone();
155        Ok(self.client.update_schedule(request).await?.into_inner())
156    }
157
158    /// Deletes a workflow schedule.
159    pub async fn delete_schedule(
160        &mut self,
161        mut request: pb::WorkflowManagerDeleteScheduleRequest,
162    ) -> std::result::Result<(), WorkflowManagerError> {
163        request.invocation_token = self.invocation_token.clone();
164        self.client.delete_schedule(request).await?;
165        Ok(())
166    }
167
168    /// Pauses a workflow schedule.
169    pub async fn pause_schedule(
170        &mut self,
171        mut request: pb::WorkflowManagerPauseScheduleRequest,
172    ) -> std::result::Result<pb::ManagedWorkflowSchedule, WorkflowManagerError> {
173        request.invocation_token = self.invocation_token.clone();
174        Ok(self.client.pause_schedule(request).await?.into_inner())
175    }
176
177    /// Resumes a workflow schedule.
178    pub async fn resume_schedule(
179        &mut self,
180        mut request: pb::WorkflowManagerResumeScheduleRequest,
181    ) -> std::result::Result<pb::ManagedWorkflowSchedule, WorkflowManagerError> {
182        request.invocation_token = self.invocation_token.clone();
183        Ok(self.client.resume_schedule(request).await?.into_inner())
184    }
185
186    /// Creates an event trigger.
187    pub async fn create_trigger(
188        &mut self,
189        mut request: pb::WorkflowManagerCreateEventTriggerRequest,
190    ) -> std::result::Result<pb::ManagedWorkflowEventTrigger, WorkflowManagerError> {
191        request.invocation_token = self.invocation_token.clone();
192        if request.idempotency_key.trim().is_empty() {
193            request.idempotency_key = self.idempotency_key.clone();
194        }
195        Ok(self
196            .client
197            .create_event_trigger(request)
198            .await?
199            .into_inner())
200    }
201
202    /// Fetches one event trigger.
203    pub async fn get_trigger(
204        &mut self,
205        mut request: pb::WorkflowManagerGetEventTriggerRequest,
206    ) -> std::result::Result<pb::ManagedWorkflowEventTrigger, WorkflowManagerError> {
207        request.invocation_token = self.invocation_token.clone();
208        Ok(self.client.get_event_trigger(request).await?.into_inner())
209    }
210
211    /// Updates an event trigger.
212    pub async fn update_trigger(
213        &mut self,
214        mut request: pb::WorkflowManagerUpdateEventTriggerRequest,
215    ) -> std::result::Result<pb::ManagedWorkflowEventTrigger, WorkflowManagerError> {
216        request.invocation_token = self.invocation_token.clone();
217        Ok(self
218            .client
219            .update_event_trigger(request)
220            .await?
221            .into_inner())
222    }
223
224    /// Deletes an event trigger.
225    pub async fn delete_trigger(
226        &mut self,
227        mut request: pb::WorkflowManagerDeleteEventTriggerRequest,
228    ) -> std::result::Result<(), WorkflowManagerError> {
229        request.invocation_token = self.invocation_token.clone();
230        self.client.delete_event_trigger(request).await?;
231        Ok(())
232    }
233
234    /// Pauses an event trigger.
235    pub async fn pause_trigger(
236        &mut self,
237        mut request: pb::WorkflowManagerPauseEventTriggerRequest,
238    ) -> std::result::Result<pb::ManagedWorkflowEventTrigger, WorkflowManagerError> {
239        request.invocation_token = self.invocation_token.clone();
240        Ok(self.client.pause_event_trigger(request).await?.into_inner())
241    }
242
243    /// Resumes an event trigger.
244    pub async fn resume_trigger(
245        &mut self,
246        mut request: pb::WorkflowManagerResumeEventTriggerRequest,
247    ) -> std::result::Result<pb::ManagedWorkflowEventTrigger, WorkflowManagerError> {
248        request.invocation_token = self.invocation_token.clone();
249        Ok(self
250            .client
251            .resume_event_trigger(request)
252            .await?
253            .into_inner())
254    }
255
256    /// Publishes an event into the workflow manager.
257    pub async fn publish_event(
258        &mut self,
259        mut request: pb::WorkflowManagerPublishEventRequest,
260    ) -> std::result::Result<pb::WorkflowEvent, WorkflowManagerError> {
261        request.invocation_token = self.invocation_token.clone();
262        Ok(self.client.publish_event(request).await?.into_inner())
263    }
264}
265
266#[derive(Clone)]
267struct RelayTokenInterceptor {
268    token: Option<MetadataValue<tonic::metadata::Ascii>>,
269}
270
271impl Interceptor for RelayTokenInterceptor {
272    fn call(
273        &mut self,
274        mut request: Request<()>,
275    ) -> std::result::Result<Request<()>, tonic::Status> {
276        if let Some(token) = self.token.clone() {
277            request
278                .metadata_mut()
279                .insert(WORKFLOW_MANAGER_RELAY_TOKEN_HEADER, token);
280        }
281        Ok(request)
282    }
283}
284
285fn relay_token_interceptor(
286    token: &str,
287) -> std::result::Result<RelayTokenInterceptor, WorkflowManagerError> {
288    let trimmed = token.trim();
289    let token = if trimmed.is_empty() {
290        None
291    } else {
292        Some(MetadataValue::try_from(trimmed).map_err(|err| {
293            WorkflowManagerError::Env(format!(
294                "workflow manager: invalid relay token metadata: {err}"
295            ))
296        })?)
297    };
298    Ok(RelayTokenInterceptor { token })
299}
300
301enum WorkflowManagerTarget {
302    Unix(String),
303    Tcp(String),
304    Tls(String),
305}
306
307fn parse_workflow_manager_target(
308    raw: &str,
309) -> std::result::Result<WorkflowManagerTarget, WorkflowManagerError> {
310    let target = raw.trim();
311    if target.is_empty() {
312        return Err(WorkflowManagerError::Env(
313            "workflow manager: transport target is required".to_string(),
314        ));
315    }
316    if let Some(address) = target.strip_prefix("tcp://") {
317        let address = address.trim();
318        if address.is_empty() {
319            return Err(WorkflowManagerError::Env(format!(
320                "workflow manager: tcp target {raw:?} is missing host:port"
321            )));
322        }
323        return Ok(WorkflowManagerTarget::Tcp(address.to_string()));
324    }
325    if let Some(address) = target.strip_prefix("tls://") {
326        let address = address.trim();
327        if address.is_empty() {
328            return Err(WorkflowManagerError::Env(format!(
329                "workflow manager: tls target {raw:?} is missing host:port"
330            )));
331        }
332        return Ok(WorkflowManagerTarget::Tls(address.to_string()));
333    }
334    if let Some(path) = target.strip_prefix("unix://") {
335        let path = path.trim();
336        if path.is_empty() {
337            return Err(WorkflowManagerError::Env(format!(
338                "workflow manager: unix target {raw:?} is missing a socket path"
339            )));
340        }
341        return Ok(WorkflowManagerTarget::Unix(path.to_string()));
342    }
343    if target.contains("://") {
344        return Err(WorkflowManagerError::Env(format!(
345            "workflow manager: unsupported target scheme in {raw:?}"
346        )));
347    }
348    Ok(WorkflowManagerTarget::Unix(target.to_string()))
349}