Skip to main content

gestalt/
workflow.rs

1use std::sync::Arc;
2
3use hyper_util::rt::TokioIo;
4use tokio::net::UnixStream;
5use tonic::codegen::async_trait;
6use tonic::transport::{Channel, Endpoint, Uri};
7use tonic::{Request as GrpcRequest, Response as GrpcResponse, Status};
8use tower::service_fn;
9
10use crate::api::RuntimeMetadata;
11use crate::error::Result as ProviderResult;
12use crate::generated::v1::{
13    self as pb, workflow_host_client::WorkflowHostClient as ProtoWorkflowHostClient,
14};
15
16/// Environment variable containing the workflow-host service socket path.
17pub const ENV_WORKFLOW_HOST_SOCKET: &str = "GESTALT_WORKFLOW_HOST_SOCKET";
18
19#[derive(Debug, thiserror::Error)]
20/// Errors returned by [`WorkflowHost`].
21pub enum WorkflowHostError {
22    /// The host-service transport could not be created.
23    #[error("{0}")]
24    Transport(#[from] tonic::transport::Error),
25    /// The host-service RPC returned a gRPC status.
26    #[error("{0}")]
27    Status(#[from] tonic::Status),
28    /// Required environment or target configuration was invalid.
29    #[error("{0}")]
30    Env(String),
31}
32
33/// Client for invoking operations from workflow provider code.
34pub struct WorkflowHost {
35    client: ProtoWorkflowHostClient<Channel>,
36}
37
38impl WorkflowHost {
39    /// Connects to the workflow host service described by the environment.
40    pub async fn connect() -> std::result::Result<Self, WorkflowHostError> {
41        let socket_path = std::env::var(ENV_WORKFLOW_HOST_SOCKET).map_err(|_| {
42            WorkflowHostError::Env(format!("{ENV_WORKFLOW_HOST_SOCKET} is not set"))
43        })?;
44        let channel = connect_unix(socket_path).await?;
45        Ok(Self {
46            client: ProtoWorkflowHostClient::new(channel),
47        })
48    }
49
50    /// Invokes an operation through the workflow host service.
51    pub async fn invoke_operation(
52        &mut self,
53        request: pb::InvokeWorkflowOperationRequest,
54    ) -> std::result::Result<pb::InvokeWorkflowOperationResponse, WorkflowHostError> {
55        Ok(self.client.invoke_operation(request).await?.into_inner())
56    }
57}
58
59async fn connect_unix(
60    socket_path: String,
61) -> std::result::Result<Channel, tonic::transport::Error> {
62    Endpoint::try_from("http://[::]:50051")?
63        .connect_with_connector(service_fn(move |_: Uri| {
64            let path = socket_path.clone();
65            async move { UnixStream::connect(path).await.map(TokioIo::new) }
66        }))
67        .await
68}
69
70#[async_trait]
71/// Provider trait for serving the Gestalt workflow-provider protocol.
72pub trait WorkflowProvider:
73    pb::workflow_provider_server::WorkflowProvider + Send + Sync + 'static
74{
75    /// Configures the provider before it starts serving requests.
76    async fn configure(
77        &self,
78        _name: &str,
79        _config: serde_json::Map<String, serde_json::Value>,
80    ) -> ProviderResult<()> {
81        Ok(())
82    }
83
84    /// Returns runtime metadata that should augment the static manifest.
85    fn metadata(&self) -> Option<RuntimeMetadata> {
86        None
87    }
88
89    /// Returns non-fatal warnings the host should surface to users.
90    fn warnings(&self) -> Vec<String> {
91        Vec::new()
92    }
93
94    /// Performs an optional health check.
95    async fn health_check(&self) -> ProviderResult<()> {
96        Ok(())
97    }
98
99    /// Starts provider-owned background work after configuration.
100    async fn start(&self) -> ProviderResult<()> {
101        Ok(())
102    }
103
104    /// Shuts the provider down before the runtime exits.
105    async fn close(&self) -> ProviderResult<()> {
106        Ok(())
107    }
108}
109
110#[derive(Clone)]
111pub(crate) struct WorkflowServer<P> {
112    provider: Arc<P>,
113}
114
115impl<P> WorkflowServer<P> {
116    pub(crate) fn new(provider: Arc<P>) -> Self {
117        Self { provider }
118    }
119}
120
121#[async_trait]
122impl<P> pb::workflow_provider_server::WorkflowProvider for WorkflowServer<P>
123where
124    P: WorkflowProvider,
125{
126    async fn start_run(
127        &self,
128        request: GrpcRequest<pb::StartWorkflowProviderRunRequest>,
129    ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowRun>, Status> {
130        self.provider.start_run(request).await
131    }
132
133    async fn get_run(
134        &self,
135        request: GrpcRequest<pb::GetWorkflowProviderRunRequest>,
136    ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowRun>, Status> {
137        self.provider.get_run(request).await
138    }
139
140    async fn list_runs(
141        &self,
142        request: GrpcRequest<pb::ListWorkflowProviderRunsRequest>,
143    ) -> std::result::Result<GrpcResponse<pb::ListWorkflowProviderRunsResponse>, Status> {
144        self.provider.list_runs(request).await
145    }
146
147    async fn cancel_run(
148        &self,
149        request: GrpcRequest<pb::CancelWorkflowProviderRunRequest>,
150    ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowRun>, Status> {
151        self.provider.cancel_run(request).await
152    }
153
154    async fn signal_run(
155        &self,
156        request: GrpcRequest<pb::SignalWorkflowProviderRunRequest>,
157    ) -> std::result::Result<GrpcResponse<pb::SignalWorkflowRunResponse>, Status> {
158        self.provider.signal_run(request).await
159    }
160
161    async fn signal_or_start_run(
162        &self,
163        request: GrpcRequest<pb::SignalOrStartWorkflowProviderRunRequest>,
164    ) -> std::result::Result<GrpcResponse<pb::SignalWorkflowRunResponse>, Status> {
165        self.provider.signal_or_start_run(request).await
166    }
167
168    async fn upsert_schedule(
169        &self,
170        request: GrpcRequest<pb::UpsertWorkflowProviderScheduleRequest>,
171    ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowSchedule>, Status> {
172        self.provider.upsert_schedule(request).await
173    }
174
175    async fn get_schedule(
176        &self,
177        request: GrpcRequest<pb::GetWorkflowProviderScheduleRequest>,
178    ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowSchedule>, Status> {
179        self.provider.get_schedule(request).await
180    }
181
182    async fn list_schedules(
183        &self,
184        request: GrpcRequest<pb::ListWorkflowProviderSchedulesRequest>,
185    ) -> std::result::Result<GrpcResponse<pb::ListWorkflowProviderSchedulesResponse>, Status> {
186        self.provider.list_schedules(request).await
187    }
188
189    async fn delete_schedule(
190        &self,
191        request: GrpcRequest<pb::DeleteWorkflowProviderScheduleRequest>,
192    ) -> std::result::Result<GrpcResponse<()>, Status> {
193        self.provider.delete_schedule(request).await
194    }
195
196    async fn pause_schedule(
197        &self,
198        request: GrpcRequest<pb::PauseWorkflowProviderScheduleRequest>,
199    ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowSchedule>, Status> {
200        self.provider.pause_schedule(request).await
201    }
202
203    async fn resume_schedule(
204        &self,
205        request: GrpcRequest<pb::ResumeWorkflowProviderScheduleRequest>,
206    ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowSchedule>, Status> {
207        self.provider.resume_schedule(request).await
208    }
209
210    async fn upsert_event_trigger(
211        &self,
212        request: GrpcRequest<pb::UpsertWorkflowProviderEventTriggerRequest>,
213    ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowEventTrigger>, Status> {
214        self.provider.upsert_event_trigger(request).await
215    }
216
217    async fn get_event_trigger(
218        &self,
219        request: GrpcRequest<pb::GetWorkflowProviderEventTriggerRequest>,
220    ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowEventTrigger>, Status> {
221        self.provider.get_event_trigger(request).await
222    }
223
224    async fn list_event_triggers(
225        &self,
226        request: GrpcRequest<pb::ListWorkflowProviderEventTriggersRequest>,
227    ) -> std::result::Result<GrpcResponse<pb::ListWorkflowProviderEventTriggersResponse>, Status>
228    {
229        self.provider.list_event_triggers(request).await
230    }
231
232    async fn delete_event_trigger(
233        &self,
234        request: GrpcRequest<pb::DeleteWorkflowProviderEventTriggerRequest>,
235    ) -> std::result::Result<GrpcResponse<()>, Status> {
236        self.provider.delete_event_trigger(request).await
237    }
238
239    async fn pause_event_trigger(
240        &self,
241        request: GrpcRequest<pb::PauseWorkflowProviderEventTriggerRequest>,
242    ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowEventTrigger>, Status> {
243        self.provider.pause_event_trigger(request).await
244    }
245
246    async fn resume_event_trigger(
247        &self,
248        request: GrpcRequest<pb::ResumeWorkflowProviderEventTriggerRequest>,
249    ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowEventTrigger>, Status> {
250        self.provider.resume_event_trigger(request).await
251    }
252
253    async fn publish_event(
254        &self,
255        request: GrpcRequest<pb::PublishWorkflowProviderEventRequest>,
256    ) -> std::result::Result<GrpcResponse<()>, Status> {
257        self.provider.publish_event(request).await
258    }
259
260    async fn put_execution_reference(
261        &self,
262        request: GrpcRequest<pb::PutWorkflowExecutionReferenceRequest>,
263    ) -> std::result::Result<GrpcResponse<pb::WorkflowExecutionReference>, Status> {
264        self.provider.put_execution_reference(request).await
265    }
266
267    async fn get_execution_reference(
268        &self,
269        request: GrpcRequest<pb::GetWorkflowExecutionReferenceRequest>,
270    ) -> std::result::Result<GrpcResponse<pb::WorkflowExecutionReference>, Status> {
271        self.provider.get_execution_reference(request).await
272    }
273
274    async fn list_execution_references(
275        &self,
276        request: GrpcRequest<pb::ListWorkflowExecutionReferencesRequest>,
277    ) -> std::result::Result<GrpcResponse<pb::ListWorkflowExecutionReferencesResponse>, Status>
278    {
279        self.provider.list_execution_references(request).await
280    }
281}