Skip to main content

hatchet_sdk/runnables/
runnable.rs

1use serde::Serialize;
2use serde::de::DeserializeOwned;
3
4use super::TriggerWorkflowOptions;
5use crate::error::HatchetError;
6use crate::{GetWorkflowRunResponse, WorkflowStatus};
7
8#[async_trait::async_trait]
9pub trait Runnable<I, O>: ExtractRunnableOutput<O> + Send + Sync
10where
11    I: Serialize + Send + Sync + DeserializeOwned + 'static,
12    O: DeserializeOwned + Send + Sync + 'static,
13{
14    async fn get_run(&self, run_id: &str) -> Result<GetWorkflowRunResponse, HatchetError>;
15
16    async fn run(
17        &self,
18        input: &I,
19        options: Option<&TriggerWorkflowOptions>,
20    ) -> Result<O, HatchetError> {
21        let run_id = self.run_no_wait(input, options).await?;
22
23        // Wait 2 seconds for eventual consistency
24        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
25
26        loop {
27            let workflow = self.get_run(&run_id).await?;
28            match workflow.run.status {
29                WorkflowStatus::Running => {}
30                WorkflowStatus::Completed => {
31                    return self.extract_output(workflow);
32                }
33                WorkflowStatus::Failed => {
34                    return Err(HatchetError::WorkflowFailed(
35                        workflow.run.error_message.clone(),
36                    ));
37                }
38                WorkflowStatus::Cancelled => {
39                    return Err(HatchetError::WorkflowCancelled);
40                }
41                _ => {}
42            }
43
44            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
45        }
46    }
47
48    async fn run_no_wait(
49        &self,
50        input: &I,
51        options: Option<&TriggerWorkflowOptions>,
52    ) -> Result<String, HatchetError>;
53}
54
55pub trait ExtractRunnableOutput<O> {
56    fn extract_output(&self, runnable: GetWorkflowRunResponse) -> Result<O, HatchetError>;
57}