Skip to main content

hatchet_sdk/clients/rest/features/
runs.rs

1use super::super::apis::workflow_runs_api::v1_workflow_run_get;
2use crate::Configuration;
3use crate::HatchetError;
4use crate::clients::grpc::dispatcher_client::DispatcherClient;
5use crate::clients::grpc::v0::dispatcher::ResourceEventType;
6use futures::stream::Stream;
7use models::*;
8use std::sync::Arc;
9
10///The runs client is a client for interacting with task and workflow runs within Hatchet.
11#[derive(Clone, Debug)]
12pub struct RunsClient {
13    configuration: Arc<Configuration>,
14    dispatcher_client: DispatcherClient,
15}
16
17impl RunsClient {
18    pub(crate) fn new(
19        configuration: Arc<Configuration>,
20        dispatcher_client: DispatcherClient,
21    ) -> Self {
22        Self {
23            configuration,
24            dispatcher_client,
25        }
26    }
27
28    /// Get a workflow run by its ID.
29    ///
30    /// ```no_run
31    /// use hatchet_sdk::{Hatchet, EmptyModel};
32    /// #[tokio::main]
33    /// async fn main() {
34    ///     let hatchet = Hatchet::from_env().await.unwrap();
35    ///     let workflow_run = hatchet.workflow_rest_client.get("123").await.unwrap();
36    /// }
37    /// ```
38    pub async fn get(&self, workflow_run_id: &str) -> Result<GetWorkflowRunResponse, HatchetError> {
39        v1_workflow_run_get(&self.configuration, workflow_run_id)
40            .await
41            .map(GetWorkflowRunResponse::from)
42            .map_err(HatchetError::from_rest)
43    }
44
45    /// Subscribe to stream events for a workflow run. Returns an async Stream of byte chunks
46    /// emitted by tasks via `ctx.put_stream()`.
47    ///
48    /// ```no_run
49    /// use hatchet_sdk::Hatchet;
50    /// use futures::StreamExt;
51    /// #[tokio::main]
52    /// async fn main() {
53    ///     let mut hatchet = Hatchet::from_env().await.unwrap();
54    ///     let mut stream = hatchet.workflow_rest_client.subscribe_to_stream("run-id").await.unwrap();
55    ///     while let Some(chunk) = stream.next().await {
56    ///         println!("Got chunk: {:?}", chunk.unwrap());
57    ///     }
58    /// }
59    /// ```
60    pub async fn subscribe_to_stream(
61        &mut self,
62        workflow_run_id: &str,
63    ) -> Result<
64        std::pin::Pin<Box<dyn Stream<Item = Result<Vec<u8>, HatchetError>> + Send>>,
65        HatchetError,
66    > {
67        let (tx, rx) = tokio::sync::mpsc::channel::<Result<Vec<u8>, HatchetError>>(100);
68
69        let mut dispatcher = self.dispatcher_client.clone();
70        let run_id = workflow_run_id.to_string();
71
72        tokio::spawn(async move {
73            let grpc_stream = match dispatcher.subscribe_to_workflow_events(&run_id).await {
74                Ok(s) => s,
75                Err(e) => {
76                    let _ = tx.send(Err(e)).await;
77                    return;
78                }
79            };
80
81            let mut grpc_stream = grpc_stream;
82            loop {
83                match grpc_stream.message().await {
84                    Ok(Some(event)) => {
85                        if event.hangup {
86                            break;
87                        }
88                        if event.event_type == ResourceEventType::Stream as i32 {
89                            let payload = event.event_payload.into_bytes();
90                            if tx.send(Ok(payload)).await.is_err() {
91                                break;
92                            }
93                        }
94                    }
95                    Ok(None) => break,
96                    Err(e) => {
97                        let _ = tx
98                            .send(Err(HatchetError::GrpcErrorStatus(e.message().to_string())))
99                            .await;
100                        break;
101                    }
102                }
103            }
104        });
105
106        Ok(Box::pin(futures::stream::unfold(rx, |mut rx| async {
107            rx.recv().await.map(|item| (item, rx))
108        })))
109    }
110}
111
112pub mod models {
113    use crate::clients::rest::models::V1WorkflowRunCreate200Response;
114    use serde::Deserialize;
115    use serde_json::Value;
116    use std::collections::HashMap;
117
118    #[derive(Debug, Deserialize)]
119    pub struct GetWorkflowRunResponse {
120        pub tasks: Vec<Task>,
121        pub run: Run,
122    }
123
124    #[derive(Debug, Deserialize)]
125    pub struct TaskParent(pub Value);
126
127    #[derive(Debug, Deserialize)]
128    pub struct Triggers {
129        pub filter_payload: serde_json::Value,
130    }
131
132    #[derive(Debug, Deserialize)]
133    pub struct TaskInput {
134        pub parents: HashMap<String, TaskParent>,
135        pub triggers: Triggers,
136    }
137
138    #[derive(Debug, Deserialize)]
139    pub struct Task {
140        pub output: Option<Value>,
141        pub input: TaskInput,
142        #[serde(rename = "taskExternalId")]
143        pub task_external_id: String,
144        #[serde(rename = "actionId")]
145        pub action_id: Option<String>,
146    }
147
148    #[derive(Debug, Deserialize)]
149    pub struct Run {
150        pub status: WorkflowStatus,
151        #[serde(rename = "errorMessage")]
152        pub error_message: String,
153    }
154
155    #[derive(Debug, Deserialize)]
156    #[serde(rename_all = "UPPERCASE")]
157    pub enum WorkflowStatus {
158        Running,
159        Failed,
160        Completed,
161        Queued,
162        Cancelled,
163        #[serde(other)]
164        Unknown,
165    }
166
167    #[derive(Debug, Deserialize)]
168    pub struct Workflow;
169
170    impl From<V1WorkflowRunCreate200Response> for GetWorkflowRunResponse {
171        fn from(response: V1WorkflowRunCreate200Response) -> Self {
172            let json_str = serde_json::to_string(&response)
173                .expect("Failed to serialize V1WorkflowRunCreate200Response");
174            serde_json::from_str(&json_str)
175                .expect("Failed to deserialize to GetWorkflowRunResponse")
176        }
177    }
178}