1use std::sync::Arc;
2
3use hyper_util::rt::TokioIo;
4use tokio::net::UnixStream;
5use tonic::codegen::async_trait;
6use tonic::transport::{Channel, Endpoint, Uri};
7use tonic::{Request as GrpcRequest, Response as GrpcResponse, Status};
8use tower::service_fn;
9
10use crate::api::RuntimeMetadata;
11use crate::error::Result as ProviderResult;
12use crate::generated::v1::{
13 self as pb, workflow_host_client::WorkflowHostClient as ProtoWorkflowHostClient,
14};
15
16pub const ENV_WORKFLOW_HOST_SOCKET: &str = "GESTALT_WORKFLOW_HOST_SOCKET";
18
19#[derive(Debug, thiserror::Error)]
20pub enum WorkflowHostError {
22 #[error("{0}")]
24 Transport(#[from] tonic::transport::Error),
25 #[error("{0}")]
27 Status(#[from] tonic::Status),
28 #[error("{0}")]
30 Env(String),
31}
32
33pub struct WorkflowHost {
35 client: ProtoWorkflowHostClient<Channel>,
36}
37
38impl WorkflowHost {
39 pub async fn connect() -> std::result::Result<Self, WorkflowHostError> {
41 let socket_path = std::env::var(ENV_WORKFLOW_HOST_SOCKET).map_err(|_| {
42 WorkflowHostError::Env(format!("{ENV_WORKFLOW_HOST_SOCKET} is not set"))
43 })?;
44 let channel = connect_unix(socket_path).await?;
45 Ok(Self {
46 client: ProtoWorkflowHostClient::new(channel),
47 })
48 }
49
50 pub async fn invoke_operation(
52 &mut self,
53 request: pb::InvokeWorkflowOperationRequest,
54 ) -> std::result::Result<pb::InvokeWorkflowOperationResponse, WorkflowHostError> {
55 Ok(self.client.invoke_operation(request).await?.into_inner())
56 }
57}
58
59async fn connect_unix(
60 socket_path: String,
61) -> std::result::Result<Channel, tonic::transport::Error> {
62 Endpoint::try_from("http://[::]:50051")?
63 .connect_with_connector(service_fn(move |_: Uri| {
64 let path = socket_path.clone();
65 async move { UnixStream::connect(path).await.map(TokioIo::new) }
66 }))
67 .await
68}
69
70#[async_trait]
71pub trait WorkflowProvider:
73 pb::workflow_provider_server::WorkflowProvider + Send + Sync + 'static
74{
75 async fn configure(
77 &self,
78 _name: &str,
79 _config: serde_json::Map<String, serde_json::Value>,
80 ) -> ProviderResult<()> {
81 Ok(())
82 }
83
84 fn metadata(&self) -> Option<RuntimeMetadata> {
86 None
87 }
88
89 fn warnings(&self) -> Vec<String> {
91 Vec::new()
92 }
93
94 async fn health_check(&self) -> ProviderResult<()> {
96 Ok(())
97 }
98
99 async fn start(&self) -> ProviderResult<()> {
101 Ok(())
102 }
103
104 async fn close(&self) -> ProviderResult<()> {
106 Ok(())
107 }
108}
109
110#[derive(Clone)]
111pub(crate) struct WorkflowServer<P> {
112 provider: Arc<P>,
113}
114
115impl<P> WorkflowServer<P> {
116 pub(crate) fn new(provider: Arc<P>) -> Self {
117 Self { provider }
118 }
119}
120
121#[async_trait]
122impl<P> pb::workflow_provider_server::WorkflowProvider for WorkflowServer<P>
123where
124 P: WorkflowProvider,
125{
126 async fn start_run(
127 &self,
128 request: GrpcRequest<pb::StartWorkflowProviderRunRequest>,
129 ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowRun>, Status> {
130 self.provider.start_run(request).await
131 }
132
133 async fn get_run(
134 &self,
135 request: GrpcRequest<pb::GetWorkflowProviderRunRequest>,
136 ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowRun>, Status> {
137 self.provider.get_run(request).await
138 }
139
140 async fn list_runs(
141 &self,
142 request: GrpcRequest<pb::ListWorkflowProviderRunsRequest>,
143 ) -> std::result::Result<GrpcResponse<pb::ListWorkflowProviderRunsResponse>, Status> {
144 self.provider.list_runs(request).await
145 }
146
147 async fn cancel_run(
148 &self,
149 request: GrpcRequest<pb::CancelWorkflowProviderRunRequest>,
150 ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowRun>, Status> {
151 self.provider.cancel_run(request).await
152 }
153
154 async fn signal_run(
155 &self,
156 request: GrpcRequest<pb::SignalWorkflowProviderRunRequest>,
157 ) -> std::result::Result<GrpcResponse<pb::SignalWorkflowRunResponse>, Status> {
158 self.provider.signal_run(request).await
159 }
160
161 async fn signal_or_start_run(
162 &self,
163 request: GrpcRequest<pb::SignalOrStartWorkflowProviderRunRequest>,
164 ) -> std::result::Result<GrpcResponse<pb::SignalWorkflowRunResponse>, Status> {
165 self.provider.signal_or_start_run(request).await
166 }
167
168 async fn upsert_schedule(
169 &self,
170 request: GrpcRequest<pb::UpsertWorkflowProviderScheduleRequest>,
171 ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowSchedule>, Status> {
172 self.provider.upsert_schedule(request).await
173 }
174
175 async fn get_schedule(
176 &self,
177 request: GrpcRequest<pb::GetWorkflowProviderScheduleRequest>,
178 ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowSchedule>, Status> {
179 self.provider.get_schedule(request).await
180 }
181
182 async fn list_schedules(
183 &self,
184 request: GrpcRequest<pb::ListWorkflowProviderSchedulesRequest>,
185 ) -> std::result::Result<GrpcResponse<pb::ListWorkflowProviderSchedulesResponse>, Status> {
186 self.provider.list_schedules(request).await
187 }
188
189 async fn delete_schedule(
190 &self,
191 request: GrpcRequest<pb::DeleteWorkflowProviderScheduleRequest>,
192 ) -> std::result::Result<GrpcResponse<()>, Status> {
193 self.provider.delete_schedule(request).await
194 }
195
196 async fn pause_schedule(
197 &self,
198 request: GrpcRequest<pb::PauseWorkflowProviderScheduleRequest>,
199 ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowSchedule>, Status> {
200 self.provider.pause_schedule(request).await
201 }
202
203 async fn resume_schedule(
204 &self,
205 request: GrpcRequest<pb::ResumeWorkflowProviderScheduleRequest>,
206 ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowSchedule>, Status> {
207 self.provider.resume_schedule(request).await
208 }
209
210 async fn upsert_event_trigger(
211 &self,
212 request: GrpcRequest<pb::UpsertWorkflowProviderEventTriggerRequest>,
213 ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowEventTrigger>, Status> {
214 self.provider.upsert_event_trigger(request).await
215 }
216
217 async fn get_event_trigger(
218 &self,
219 request: GrpcRequest<pb::GetWorkflowProviderEventTriggerRequest>,
220 ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowEventTrigger>, Status> {
221 self.provider.get_event_trigger(request).await
222 }
223
224 async fn list_event_triggers(
225 &self,
226 request: GrpcRequest<pb::ListWorkflowProviderEventTriggersRequest>,
227 ) -> std::result::Result<GrpcResponse<pb::ListWorkflowProviderEventTriggersResponse>, Status>
228 {
229 self.provider.list_event_triggers(request).await
230 }
231
232 async fn delete_event_trigger(
233 &self,
234 request: GrpcRequest<pb::DeleteWorkflowProviderEventTriggerRequest>,
235 ) -> std::result::Result<GrpcResponse<()>, Status> {
236 self.provider.delete_event_trigger(request).await
237 }
238
239 async fn pause_event_trigger(
240 &self,
241 request: GrpcRequest<pb::PauseWorkflowProviderEventTriggerRequest>,
242 ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowEventTrigger>, Status> {
243 self.provider.pause_event_trigger(request).await
244 }
245
246 async fn resume_event_trigger(
247 &self,
248 request: GrpcRequest<pb::ResumeWorkflowProviderEventTriggerRequest>,
249 ) -> std::result::Result<GrpcResponse<pb::BoundWorkflowEventTrigger>, Status> {
250 self.provider.resume_event_trigger(request).await
251 }
252
253 async fn publish_event(
254 &self,
255 request: GrpcRequest<pb::PublishWorkflowProviderEventRequest>,
256 ) -> std::result::Result<GrpcResponse<()>, Status> {
257 self.provider.publish_event(request).await
258 }
259
260 async fn put_execution_reference(
261 &self,
262 request: GrpcRequest<pb::PutWorkflowExecutionReferenceRequest>,
263 ) -> std::result::Result<GrpcResponse<pb::WorkflowExecutionReference>, Status> {
264 self.provider.put_execution_reference(request).await
265 }
266
267 async fn get_execution_reference(
268 &self,
269 request: GrpcRequest<pb::GetWorkflowExecutionReferenceRequest>,
270 ) -> std::result::Result<GrpcResponse<pb::WorkflowExecutionReference>, Status> {
271 self.provider.get_execution_reference(request).await
272 }
273
274 async fn list_execution_references(
275 &self,
276 request: GrpcRequest<pb::ListWorkflowExecutionReferencesRequest>,
277 ) -> std::result::Result<GrpcResponse<pb::ListWorkflowExecutionReferencesResponse>, Status>
278 {
279 self.provider.list_execution_references(request).await
280 }
281}