hatchet_sdk/clients/rest/features/
runs.rs1use super::super::apis::workflow_runs_api::v1_workflow_run_get;
2use crate::Configuration;
3use crate::HatchetError;
4use crate::clients::grpc::dispatcher_client::DispatcherClient;
5use crate::clients::grpc::v0::dispatcher::ResourceEventType;
6use futures::stream::Stream;
7use models::*;
8use std::sync::Arc;
9
10#[derive(Clone, Debug)]
12pub struct RunsClient {
13 configuration: Arc<Configuration>,
14 dispatcher_client: DispatcherClient,
15}
16
17impl RunsClient {
18 pub(crate) fn new(
19 configuration: Arc<Configuration>,
20 dispatcher_client: DispatcherClient,
21 ) -> Self {
22 Self {
23 configuration,
24 dispatcher_client,
25 }
26 }
27
28 pub async fn get(&self, workflow_run_id: &str) -> Result<GetWorkflowRunResponse, HatchetError> {
39 v1_workflow_run_get(&self.configuration, workflow_run_id)
40 .await
41 .map(GetWorkflowRunResponse::from)
42 .map_err(HatchetError::from_rest)
43 }
44
45 pub async fn subscribe_to_stream(
61 &mut self,
62 workflow_run_id: &str,
63 ) -> Result<
64 std::pin::Pin<Box<dyn Stream<Item = Result<Vec<u8>, HatchetError>> + Send>>,
65 HatchetError,
66 > {
67 let (tx, rx) = tokio::sync::mpsc::channel::<Result<Vec<u8>, HatchetError>>(100);
68
69 let mut dispatcher = self.dispatcher_client.clone();
70 let run_id = workflow_run_id.to_string();
71
72 tokio::spawn(async move {
73 let grpc_stream = match dispatcher.subscribe_to_workflow_events(&run_id).await {
74 Ok(s) => s,
75 Err(e) => {
76 let _ = tx.send(Err(e)).await;
77 return;
78 }
79 };
80
81 let mut grpc_stream = grpc_stream;
82 loop {
83 match grpc_stream.message().await {
84 Ok(Some(event)) => {
85 if event.hangup {
86 break;
87 }
88 if event.event_type == ResourceEventType::Stream as i32 {
89 let payload = event.event_payload.into_bytes();
90 if tx.send(Ok(payload)).await.is_err() {
91 break;
92 }
93 }
94 }
95 Ok(None) => break,
96 Err(e) => {
97 let _ = tx
98 .send(Err(HatchetError::GrpcErrorStatus(e.message().to_string())))
99 .await;
100 break;
101 }
102 }
103 }
104 });
105
106 Ok(Box::pin(futures::stream::unfold(rx, |mut rx| async {
107 rx.recv().await.map(|item| (item, rx))
108 })))
109 }
110}
111
112pub mod models {
113 use crate::clients::rest::models::V1WorkflowRunCreate200Response;
114 use serde::Deserialize;
115 use serde_json::Value;
116 use std::collections::HashMap;
117
118 #[derive(Debug, Deserialize)]
119 pub struct GetWorkflowRunResponse {
120 pub tasks: Vec<Task>,
121 pub run: Run,
122 }
123
124 #[derive(Debug, Deserialize)]
125 pub struct TaskParent(pub Value);
126
127 #[derive(Debug, Deserialize)]
128 pub struct Triggers {
129 pub filter_payload: serde_json::Value,
130 }
131
132 #[derive(Debug, Deserialize)]
133 pub struct TaskInput {
134 pub parents: HashMap<String, TaskParent>,
135 pub triggers: Triggers,
136 }
137
138 #[derive(Debug, Deserialize)]
139 pub struct Task {
140 pub output: Option<Value>,
141 pub input: TaskInput,
142 #[serde(rename = "taskExternalId")]
143 pub task_external_id: String,
144 #[serde(rename = "actionId")]
145 pub action_id: Option<String>,
146 }
147
148 #[derive(Debug, Deserialize)]
149 pub struct Run {
150 pub status: WorkflowStatus,
151 #[serde(rename = "errorMessage")]
152 pub error_message: String,
153 }
154
155 #[derive(Debug, Deserialize)]
156 #[serde(rename_all = "UPPERCASE")]
157 pub enum WorkflowStatus {
158 Running,
159 Failed,
160 Completed,
161 Queued,
162 Cancelled,
163 #[serde(other)]
164 Unknown,
165 }
166
167 #[derive(Debug, Deserialize)]
168 pub struct Workflow;
169
170 impl From<V1WorkflowRunCreate200Response> for GetWorkflowRunResponse {
171 fn from(response: V1WorkflowRunCreate200Response) -> Self {
172 let json_str = serde_json::to_string(&response)
173 .expect("Failed to serialize V1WorkflowRunCreate200Response");
174 serde_json::from_str(&json_str)
175 .expect("Failed to deserialize to GetWorkflowRunResponse")
176 }
177 }
178}