Skip to main content

hatchet_sdk/clients/rest/features/
schedules.rs

1use std::sync::Arc;
2
3use chrono::{DateTime, Utc};
4
5use super::pagination::{PaginationResponse, hash_map_to_value};
6use crate::clients::rest::apis::workflow_api::{
7    workflow_scheduled_delete, workflow_scheduled_get, workflow_scheduled_list,
8};
9use crate::clients::rest::apis::workflow_run_api::scheduled_workflow_run_create;
10use crate::clients::rest::models::{
11    ScheduledWorkflowRunCreate200Response, ScheduledWorkflowRunCreateRequest,
12    WorkflowScheduledList200Response,
13};
14use crate::{Configuration, HatchetError};
15
16/// Client for managing scheduled (one-time) workflow runs. Accessed via [`Hatchet::schedules`](crate::Hatchet).
17///
18/// Requires a token with a `sub` (tenant ID) claim.
19#[derive(Clone, Debug)]
20pub struct SchedulesClient {
21    configuration: Arc<Configuration>,
22    tenant_id: Option<String>,
23}
24
25impl SchedulesClient {
26    pub(crate) fn new(configuration: Arc<Configuration>, tenant_id: Option<String>) -> Self {
27        Self {
28            configuration,
29            tenant_id,
30        }
31    }
32
33    fn tenant_id(&self) -> Result<&str, HatchetError> {
34        self.tenant_id
35            .as_deref()
36            .ok_or_else(|| HatchetError::MissingTokenField("sub"))
37    }
38
39    /// Schedule a workflow to run at a specific future time.
40    pub async fn create(
41        &self,
42        workflow_name: &str,
43        opts: CreateScheduleOpts,
44    ) -> Result<ScheduledRun, HatchetError> {
45        let tenant = self.tenant_id()?;
46        let request = ScheduledWorkflowRunCreateRequest {
47            input: opts.input,
48            additional_metadata: opts
49                .additional_metadata
50                .unwrap_or_else(|| serde_json::json!({})),
51            trigger_at: opts.trigger_at.to_rfc3339(),
52            priority: opts.priority,
53        };
54
55        scheduled_workflow_run_create(&self.configuration, tenant, workflow_name, request)
56            .await
57            .map(Into::into)
58            .map_err(HatchetError::from_rest)
59    }
60
61    /// Retrieve a scheduled run by ID.
62    pub async fn get(&self, scheduled_id: &str) -> Result<ScheduledRun, HatchetError> {
63        let tenant = self.tenant_id()?;
64        workflow_scheduled_get(&self.configuration, tenant, scheduled_id)
65            .await
66            .map(Into::into)
67            .map_err(HatchetError::from_rest)
68    }
69
70    /// List scheduled runs, optionally filtered by workflow, status, or metadata.
71    pub async fn list(&self, opts: ListSchedulesOpts) -> Result<ScheduledRunList, HatchetError> {
72        let tenant = self.tenant_id()?;
73        workflow_scheduled_list(
74            &self.configuration,
75            tenant,
76            opts.offset,
77            opts.limit,
78            opts.order_by_field.as_deref(),
79            opts.order_by_direction.as_deref(),
80            opts.workflow_id.as_deref(),
81            opts.parent_workflow_run_id.as_deref(),
82            opts.parent_task_run_external_id.as_deref(),
83            opts.additional_metadata,
84            opts.statuses,
85        )
86        .await
87        .map(Into::into)
88        .map_err(HatchetError::from_rest)
89    }
90
91    /// Delete a scheduled run by ID.
92    pub async fn delete(&self, scheduled_id: &str) -> Result<(), HatchetError> {
93        let tenant = self.tenant_id()?;
94        workflow_scheduled_delete(&self.configuration, tenant, scheduled_id)
95            .await
96            .map_err(HatchetError::from_rest)
97    }
98}
99
100/// Options for creating a scheduled run via [`SchedulesClient::create`].
101#[derive(Clone, Debug)]
102pub struct CreateScheduleOpts {
103    pub trigger_at: DateTime<Utc>,
104    pub input: serde_json::Value,
105    pub additional_metadata: Option<serde_json::Value>,
106    pub priority: Option<i32>,
107}
108
109/// Filter and pagination options for [`SchedulesClient::list`].
110#[derive(Clone, Debug, Default)]
111pub struct ListSchedulesOpts {
112    pub offset: Option<i64>,
113    pub limit: Option<i64>,
114    pub workflow_id: Option<String>,
115    pub parent_workflow_run_id: Option<String>,
116    pub parent_task_run_external_id: Option<String>,
117    pub additional_metadata: Option<Vec<String>>,
118    pub statuses: Option<Vec<String>>,
119    pub order_by_field: Option<String>,
120    pub order_by_direction: Option<String>,
121}
122
123/// A scheduled workflow run returned by the Hatchet API. The run will be enqueued at `trigger_at`.
124#[derive(Clone, Debug)]
125pub struct ScheduledRun {
126    pub metadata_id: String,
127    pub trigger_at: String,
128    pub workflow_id: String,
129    pub workflow_name: String,
130    pub workflow_version_id: String,
131    pub input: serde_json::Value,
132    pub additional_metadata: serde_json::Value,
133    pub workflow_run_id: Option<String>,
134    pub workflow_run_status: Option<String>,
135    pub priority: Option<i32>,
136}
137
138impl From<ScheduledWorkflowRunCreate200Response> for ScheduledRun {
139    fn from(r: ScheduledWorkflowRunCreate200Response) -> Self {
140        Self {
141            metadata_id: r.metadata.id,
142            trigger_at: r.trigger_at,
143            workflow_id: r.workflow_id,
144            workflow_name: r.workflow_name,
145            workflow_version_id: r.workflow_version_id,
146            input: hash_map_to_value(r.input),
147            additional_metadata: hash_map_to_value(r.additional_metadata),
148            workflow_run_id: r.workflow_run_id.map(|id| id.to_string()),
149            workflow_run_status: r.workflow_run_status.and_then(|status| {
150                serde_json::to_value(status)
151                    .ok()
152                    .and_then(|v| v.as_str().map(str::to_string))
153            }),
154            priority: r.priority,
155        }
156    }
157}
158
159/// Paginated list of scheduled runs returned by [`SchedulesClient::list`].
160#[derive(Clone, Debug)]
161pub struct ScheduledRunList {
162    pub rows: Vec<ScheduledRun>,
163    pub pagination: Option<PaginationResponse>,
164}
165
166impl From<WorkflowScheduledList200Response> for ScheduledRunList {
167    fn from(r: WorkflowScheduledList200Response) -> Self {
168        Self {
169            rows: r
170                .rows
171                .unwrap_or_default()
172                .into_iter()
173                .map(Into::into)
174                .collect(),
175            pagination: r.pagination.map(Into::into),
176        }
177    }
178}
179
180/// Optional parameters for [`Task::schedule`](crate::Task::schedule) and [`Workflow::schedule`](crate::Workflow::schedule).
181#[derive(Clone, Debug, Default)]
182pub struct ScheduleOptions {
183    pub additional_metadata: Option<serde_json::Value>,
184    pub priority: Option<i32>,
185}