Skip to main content

hatchet_sdk/clients/rest/features/
crons.rs

1use std::sync::Arc;
2
3use super::pagination::{PaginationResponse, hash_map_to_value};
4use crate::clients::rest::apis::workflow_api::{
5    cron_workflow_list, workflow_cron_delete, workflow_cron_get,
6};
7use crate::clients::rest::apis::workflow_run_api::cron_workflow_trigger_create;
8use crate::clients::rest::models::{
9    CronWorkflowList200Response, CronWorkflowTriggerCreate200Response,
10    CronWorkflowTriggerCreateRequest,
11};
12use crate::{Configuration, HatchetError};
13
14/// Client for managing cron workflow triggers. Accessed via [`Hatchet::crons`](crate::Hatchet).
15///
16/// Supports both 5-field (`* * * * *`) and 6-field (`* * * * * *`) cron expressions.
17/// Requires a token with a `sub` (tenant ID) claim.
18#[derive(Clone, Debug)]
19pub struct CronsClient {
20    configuration: Arc<Configuration>,
21    tenant_id: Option<String>,
22}
23
24impl CronsClient {
25    pub(crate) fn new(configuration: Arc<Configuration>, tenant_id: Option<String>) -> Self {
26        Self {
27            configuration,
28            tenant_id,
29        }
30    }
31
32    fn tenant_id(&self) -> Result<&str, HatchetError> {
33        self.tenant_id
34            .as_deref()
35            .ok_or_else(|| HatchetError::MissingTokenField("sub"))
36    }
37
38    /// Create a cron trigger for the given workflow. Validates the expression client-side before calling the API.
39    pub async fn create(
40        &self,
41        workflow_name: &str,
42        opts: CreateCronOpts,
43    ) -> Result<CronTrigger, HatchetError> {
44        // cron crate requires seconds field; prepend "0" for 5-field expressions
45        let normalized = if opts.expression.split_whitespace().count() == 5 {
46            format!("0 {}", opts.expression)
47        } else {
48            opts.expression.clone()
49        };
50        normalized
51            .parse::<cron::Schedule>()
52            .map_err(|_| HatchetError::InvalidCronExpression(opts.expression.clone()))?;
53
54        let request = CronWorkflowTriggerCreateRequest {
55            input: opts.input,
56            additional_metadata: opts
57                .additional_metadata
58                .unwrap_or_else(|| serde_json::json!({})),
59            cron_name: opts.name,
60            cron_expression: opts.expression,
61            priority: opts.priority,
62        };
63
64        let tenant = self.tenant_id()?;
65        cron_workflow_trigger_create(&self.configuration, tenant, workflow_name, request)
66            .await
67            .map(Into::into)
68            .map_err(HatchetError::from_rest)
69    }
70
71    /// Retrieve a cron trigger by ID.
72    pub async fn get(&self, cron_id: &str) -> Result<CronTrigger, HatchetError> {
73        let tenant = self.tenant_id()?;
74        workflow_cron_get(&self.configuration, tenant, cron_id)
75            .await
76            .map(Into::into)
77            .map_err(HatchetError::from_rest)
78    }
79
80    /// List cron triggers, optionally filtered by workflow, name, or metadata.
81    pub async fn list(&self, opts: ListCronsOpts) -> Result<CronTriggerList, HatchetError> {
82        let tenant = self.tenant_id()?;
83        cron_workflow_list(
84            &self.configuration,
85            tenant,
86            opts.offset,
87            opts.limit,
88            opts.workflow_id.as_deref(),
89            opts.workflow_name.as_deref(),
90            opts.cron_name.as_deref(),
91            opts.additional_metadata,
92            opts.order_by_field.as_deref(),
93            opts.order_by_direction.as_deref(),
94        )
95        .await
96        .map(Into::into)
97        .map_err(HatchetError::from_rest)
98    }
99
100    /// Delete a cron trigger by ID.
101    pub async fn delete(&self, cron_id: &str) -> Result<(), HatchetError> {
102        let tenant = self.tenant_id()?;
103        workflow_cron_delete(&self.configuration, tenant, cron_id)
104            .await
105            .map_err(HatchetError::from_rest)
106    }
107}
108
109/// Options for creating a cron trigger via [`CronsClient::create`].
110#[derive(Clone, Debug)]
111pub struct CreateCronOpts {
112    pub name: String,
113    pub expression: String,
114    pub input: serde_json::Value,
115    pub additional_metadata: Option<serde_json::Value>,
116    pub priority: Option<i32>,
117}
118
119/// Filter and pagination options for [`CronsClient::list`].
120#[derive(Clone, Debug, Default)]
121pub struct ListCronsOpts {
122    pub offset: Option<i64>,
123    pub limit: Option<i64>,
124    pub workflow_id: Option<String>,
125    pub workflow_name: Option<String>,
126    pub cron_name: Option<String>,
127    pub additional_metadata: Option<Vec<String>>,
128    pub order_by_field: Option<String>,
129    pub order_by_direction: Option<String>,
130}
131
132/// A cron workflow trigger returned by the Hatchet API.
133#[derive(Clone, Debug)]
134pub struct CronTrigger {
135    pub metadata_id: String,
136    pub cron: String,
137    pub name: Option<String>,
138    pub workflow_id: String,
139    pub workflow_name: String,
140    pub input: serde_json::Value,
141    pub additional_metadata: serde_json::Value,
142    pub enabled: bool,
143    pub priority: Option<i32>,
144}
145
146impl From<CronWorkflowTriggerCreate200Response> for CronTrigger {
147    fn from(r: CronWorkflowTriggerCreate200Response) -> Self {
148        Self {
149            metadata_id: r.metadata.id,
150            cron: r.cron,
151            name: r.name,
152            workflow_id: r.workflow_id,
153            workflow_name: r.workflow_name,
154            input: hash_map_to_value(r.input),
155            additional_metadata: hash_map_to_value(r.additional_metadata),
156            enabled: r.enabled,
157            priority: r.priority,
158        }
159    }
160}
161
162/// Paginated list of cron triggers returned by [`CronsClient::list`].
163#[derive(Clone, Debug)]
164pub struct CronTriggerList {
165    pub rows: Vec<CronTrigger>,
166    pub pagination: Option<PaginationResponse>,
167}
168
169impl From<CronWorkflowList200Response> for CronTriggerList {
170    fn from(r: CronWorkflowList200Response) -> Self {
171        Self {
172            rows: r
173                .rows
174                .unwrap_or_default()
175                .into_iter()
176                .map(Into::into)
177                .collect(),
178            pagination: r.pagination.map(Into::into),
179        }
180    }
181}
182
183/// Optional parameters for [`Task::cron`](crate::Task::cron) and [`Workflow::cron`](crate::Workflow::cron).
184#[derive(Clone, Debug, Default)]
185pub struct CronOptions {
186    pub additional_metadata: Option<serde_json::Value>,
187    pub priority: Option<i32>,
188}
189
190#[cfg(test)]
191mod tests {
192    #[test]
193    fn test_six_field_cron_expression_parses() {
194        assert!("0 */2 * * * *".parse::<cron::Schedule>().is_ok());
195    }
196
197    #[test]
198    fn test_five_field_cron_with_prepended_seconds_parses() {
199        let five_field = "*/2 * * * *";
200        let normalized = format!("0 {}", five_field);
201        assert!(normalized.parse::<cron::Schedule>().is_ok());
202    }
203
204    #[test]
205    fn test_invalid_cron_expression_fails() {
206        assert!("not a cron".parse::<cron::Schedule>().is_err());
207    }
208
209    #[test]
210    fn test_missing_tenant_returns_error() {
211        let client = super::CronsClient::new(
212            std::sync::Arc::new(crate::Configuration {
213                base_path: String::new(),
214                client: reqwest::Client::new(),
215                basic_auth: None,
216                oauth_access_token: None,
217                bearer_access_token: None,
218                user_agent: None,
219                api_key: None,
220            }),
221            None,
222        );
223        let err = client.tenant_id().unwrap_err();
224        assert!(matches!(err, crate::HatchetError::MissingTokenField("sub")));
225    }
226}