hatchet_sdk/clients/rest/features/
crons.rs1use std::sync::Arc;
2
3use super::pagination::{PaginationResponse, hash_map_to_value};
4use crate::clients::rest::apis::workflow_api::{
5 cron_workflow_list, workflow_cron_delete, workflow_cron_get,
6};
7use crate::clients::rest::apis::workflow_run_api::cron_workflow_trigger_create;
8use crate::clients::rest::models::{
9 CronWorkflowList200Response, CronWorkflowTriggerCreate200Response,
10 CronWorkflowTriggerCreateRequest,
11};
12use crate::{Configuration, HatchetError};
13
14#[derive(Clone, Debug)]
19pub struct CronsClient {
20 configuration: Arc<Configuration>,
21 tenant_id: Option<String>,
22}
23
24impl CronsClient {
25 pub(crate) fn new(configuration: Arc<Configuration>, tenant_id: Option<String>) -> Self {
26 Self {
27 configuration,
28 tenant_id,
29 }
30 }
31
32 fn tenant_id(&self) -> Result<&str, HatchetError> {
33 self.tenant_id
34 .as_deref()
35 .ok_or_else(|| HatchetError::MissingTokenField("sub"))
36 }
37
38 pub async fn create(
40 &self,
41 workflow_name: &str,
42 opts: CreateCronOpts,
43 ) -> Result<CronTrigger, HatchetError> {
44 let normalized = if opts.expression.split_whitespace().count() == 5 {
46 format!("0 {}", opts.expression)
47 } else {
48 opts.expression.clone()
49 };
50 normalized
51 .parse::<cron::Schedule>()
52 .map_err(|_| HatchetError::InvalidCronExpression(opts.expression.clone()))?;
53
54 let request = CronWorkflowTriggerCreateRequest {
55 input: opts.input,
56 additional_metadata: opts
57 .additional_metadata
58 .unwrap_or_else(|| serde_json::json!({})),
59 cron_name: opts.name,
60 cron_expression: opts.expression,
61 priority: opts.priority,
62 };
63
64 let tenant = self.tenant_id()?;
65 cron_workflow_trigger_create(&self.configuration, tenant, workflow_name, request)
66 .await
67 .map(Into::into)
68 .map_err(HatchetError::from_rest)
69 }
70
71 pub async fn get(&self, cron_id: &str) -> Result<CronTrigger, HatchetError> {
73 let tenant = self.tenant_id()?;
74 workflow_cron_get(&self.configuration, tenant, cron_id)
75 .await
76 .map(Into::into)
77 .map_err(HatchetError::from_rest)
78 }
79
80 pub async fn list(&self, opts: ListCronsOpts) -> Result<CronTriggerList, HatchetError> {
82 let tenant = self.tenant_id()?;
83 cron_workflow_list(
84 &self.configuration,
85 tenant,
86 opts.offset,
87 opts.limit,
88 opts.workflow_id.as_deref(),
89 opts.workflow_name.as_deref(),
90 opts.cron_name.as_deref(),
91 opts.additional_metadata,
92 opts.order_by_field.as_deref(),
93 opts.order_by_direction.as_deref(),
94 )
95 .await
96 .map(Into::into)
97 .map_err(HatchetError::from_rest)
98 }
99
100 pub async fn delete(&self, cron_id: &str) -> Result<(), HatchetError> {
102 let tenant = self.tenant_id()?;
103 workflow_cron_delete(&self.configuration, tenant, cron_id)
104 .await
105 .map_err(HatchetError::from_rest)
106 }
107}
108
109#[derive(Clone, Debug)]
111pub struct CreateCronOpts {
112 pub name: String,
113 pub expression: String,
114 pub input: serde_json::Value,
115 pub additional_metadata: Option<serde_json::Value>,
116 pub priority: Option<i32>,
117}
118
119#[derive(Clone, Debug, Default)]
121pub struct ListCronsOpts {
122 pub offset: Option<i64>,
123 pub limit: Option<i64>,
124 pub workflow_id: Option<String>,
125 pub workflow_name: Option<String>,
126 pub cron_name: Option<String>,
127 pub additional_metadata: Option<Vec<String>>,
128 pub order_by_field: Option<String>,
129 pub order_by_direction: Option<String>,
130}
131
132#[derive(Clone, Debug)]
134pub struct CronTrigger {
135 pub metadata_id: String,
136 pub cron: String,
137 pub name: Option<String>,
138 pub workflow_id: String,
139 pub workflow_name: String,
140 pub input: serde_json::Value,
141 pub additional_metadata: serde_json::Value,
142 pub enabled: bool,
143 pub priority: Option<i32>,
144}
145
146impl From<CronWorkflowTriggerCreate200Response> for CronTrigger {
147 fn from(r: CronWorkflowTriggerCreate200Response) -> Self {
148 Self {
149 metadata_id: r.metadata.id,
150 cron: r.cron,
151 name: r.name,
152 workflow_id: r.workflow_id,
153 workflow_name: r.workflow_name,
154 input: hash_map_to_value(r.input),
155 additional_metadata: hash_map_to_value(r.additional_metadata),
156 enabled: r.enabled,
157 priority: r.priority,
158 }
159 }
160}
161
162#[derive(Clone, Debug)]
164pub struct CronTriggerList {
165 pub rows: Vec<CronTrigger>,
166 pub pagination: Option<PaginationResponse>,
167}
168
169impl From<CronWorkflowList200Response> for CronTriggerList {
170 fn from(r: CronWorkflowList200Response) -> Self {
171 Self {
172 rows: r
173 .rows
174 .unwrap_or_default()
175 .into_iter()
176 .map(Into::into)
177 .collect(),
178 pagination: r.pagination.map(Into::into),
179 }
180 }
181}
182
183#[derive(Clone, Debug, Default)]
185pub struct CronOptions {
186 pub additional_metadata: Option<serde_json::Value>,
187 pub priority: Option<i32>,
188}
189
190#[cfg(test)]
191mod tests {
192 #[test]
193 fn test_six_field_cron_expression_parses() {
194 assert!("0 */2 * * * *".parse::<cron::Schedule>().is_ok());
195 }
196
197 #[test]
198 fn test_five_field_cron_with_prepended_seconds_parses() {
199 let five_field = "*/2 * * * *";
200 let normalized = format!("0 {}", five_field);
201 assert!(normalized.parse::<cron::Schedule>().is_ok());
202 }
203
204 #[test]
205 fn test_invalid_cron_expression_fails() {
206 assert!("not a cron".parse::<cron::Schedule>().is_err());
207 }
208
209 #[test]
210 fn test_missing_tenant_returns_error() {
211 let client = super::CronsClient::new(
212 std::sync::Arc::new(crate::Configuration {
213 base_path: String::new(),
214 client: reqwest::Client::new(),
215 basic_auth: None,
216 oauth_access_token: None,
217 bearer_access_token: None,
218 user_agent: None,
219 api_key: None,
220 }),
221 None,
222 );
223 let err = client.tenant_id().unwrap_err();
224 assert!(matches!(err, crate::HatchetError::MissingTokenField("sub")));
225 }
226}