hatchet_sdk/clients/rest/features/
schedules.rs1use std::sync::Arc;
2
3use chrono::{DateTime, Utc};
4
5use super::pagination::{PaginationResponse, hash_map_to_value};
6use crate::clients::rest::apis::workflow_api::{
7 workflow_scheduled_delete, workflow_scheduled_get, workflow_scheduled_list,
8};
9use crate::clients::rest::apis::workflow_run_api::scheduled_workflow_run_create;
10use crate::clients::rest::models::{
11 ScheduledWorkflowRunCreate200Response, ScheduledWorkflowRunCreateRequest,
12 WorkflowScheduledList200Response,
13};
14use crate::{Configuration, HatchetError};
15
16#[derive(Clone, Debug)]
20pub struct SchedulesClient {
21 configuration: Arc<Configuration>,
22 tenant_id: Option<String>,
23}
24
25impl SchedulesClient {
26 pub(crate) fn new(configuration: Arc<Configuration>, tenant_id: Option<String>) -> Self {
27 Self {
28 configuration,
29 tenant_id,
30 }
31 }
32
33 fn tenant_id(&self) -> Result<&str, HatchetError> {
34 self.tenant_id
35 .as_deref()
36 .ok_or_else(|| HatchetError::MissingTokenField("sub"))
37 }
38
39 pub async fn create(
41 &self,
42 workflow_name: &str,
43 opts: CreateScheduleOpts,
44 ) -> Result<ScheduledRun, HatchetError> {
45 let tenant = self.tenant_id()?;
46 let request = ScheduledWorkflowRunCreateRequest {
47 input: opts.input,
48 additional_metadata: opts
49 .additional_metadata
50 .unwrap_or_else(|| serde_json::json!({})),
51 trigger_at: opts.trigger_at.to_rfc3339(),
52 priority: opts.priority,
53 };
54
55 scheduled_workflow_run_create(&self.configuration, tenant, workflow_name, request)
56 .await
57 .map(Into::into)
58 .map_err(HatchetError::from_rest)
59 }
60
61 pub async fn get(&self, scheduled_id: &str) -> Result<ScheduledRun, HatchetError> {
63 let tenant = self.tenant_id()?;
64 workflow_scheduled_get(&self.configuration, tenant, scheduled_id)
65 .await
66 .map(Into::into)
67 .map_err(HatchetError::from_rest)
68 }
69
70 pub async fn list(&self, opts: ListSchedulesOpts) -> Result<ScheduledRunList, HatchetError> {
72 let tenant = self.tenant_id()?;
73 workflow_scheduled_list(
74 &self.configuration,
75 tenant,
76 opts.offset,
77 opts.limit,
78 opts.order_by_field.as_deref(),
79 opts.order_by_direction.as_deref(),
80 opts.workflow_id.as_deref(),
81 opts.parent_workflow_run_id.as_deref(),
82 opts.parent_task_run_external_id.as_deref(),
83 opts.additional_metadata,
84 opts.statuses,
85 )
86 .await
87 .map(Into::into)
88 .map_err(HatchetError::from_rest)
89 }
90
91 pub async fn delete(&self, scheduled_id: &str) -> Result<(), HatchetError> {
93 let tenant = self.tenant_id()?;
94 workflow_scheduled_delete(&self.configuration, tenant, scheduled_id)
95 .await
96 .map_err(HatchetError::from_rest)
97 }
98}
99
100#[derive(Clone, Debug)]
102pub struct CreateScheduleOpts {
103 pub trigger_at: DateTime<Utc>,
104 pub input: serde_json::Value,
105 pub additional_metadata: Option<serde_json::Value>,
106 pub priority: Option<i32>,
107}
108
109#[derive(Clone, Debug, Default)]
111pub struct ListSchedulesOpts {
112 pub offset: Option<i64>,
113 pub limit: Option<i64>,
114 pub workflow_id: Option<String>,
115 pub parent_workflow_run_id: Option<String>,
116 pub parent_task_run_external_id: Option<String>,
117 pub additional_metadata: Option<Vec<String>>,
118 pub statuses: Option<Vec<String>>,
119 pub order_by_field: Option<String>,
120 pub order_by_direction: Option<String>,
121}
122
123#[derive(Clone, Debug)]
125pub struct ScheduledRun {
126 pub metadata_id: String,
127 pub trigger_at: String,
128 pub workflow_id: String,
129 pub workflow_name: String,
130 pub workflow_version_id: String,
131 pub input: serde_json::Value,
132 pub additional_metadata: serde_json::Value,
133 pub workflow_run_id: Option<String>,
134 pub workflow_run_status: Option<String>,
135 pub priority: Option<i32>,
136}
137
138impl From<ScheduledWorkflowRunCreate200Response> for ScheduledRun {
139 fn from(r: ScheduledWorkflowRunCreate200Response) -> Self {
140 Self {
141 metadata_id: r.metadata.id,
142 trigger_at: r.trigger_at,
143 workflow_id: r.workflow_id,
144 workflow_name: r.workflow_name,
145 workflow_version_id: r.workflow_version_id,
146 input: hash_map_to_value(r.input),
147 additional_metadata: hash_map_to_value(r.additional_metadata),
148 workflow_run_id: r.workflow_run_id.map(|id| id.to_string()),
149 workflow_run_status: r.workflow_run_status.and_then(|status| {
150 serde_json::to_value(status)
151 .ok()
152 .and_then(|v| v.as_str().map(str::to_string))
153 }),
154 priority: r.priority,
155 }
156 }
157}
158
159#[derive(Clone, Debug)]
161pub struct ScheduledRunList {
162 pub rows: Vec<ScheduledRun>,
163 pub pagination: Option<PaginationResponse>,
164}
165
166impl From<WorkflowScheduledList200Response> for ScheduledRunList {
167 fn from(r: WorkflowScheduledList200Response) -> Self {
168 Self {
169 rows: r
170 .rows
171 .unwrap_or_default()
172 .into_iter()
173 .map(Into::into)
174 .collect(),
175 pagination: r.pagination.map(Into::into),
176 }
177 }
178}
179
180#[derive(Clone, Debug, Default)]
182pub struct ScheduleOptions {
183 pub additional_metadata: Option<serde_json::Value>,
184 pub priority: Option<i32>,
185}