use std::sync::Arc;
use super::pagination::{PaginationResponse, hash_map_to_value};
use crate::clients::rest::apis::workflow_api::{
cron_workflow_list, workflow_cron_delete, workflow_cron_get,
};
use crate::clients::rest::apis::workflow_run_api::cron_workflow_trigger_create;
use crate::clients::rest::models::{
CronWorkflowList200Response, CronWorkflowTriggerCreate200Response,
CronWorkflowTriggerCreateRequest,
};
use crate::{Configuration, HatchetError};
#[derive(Clone, Debug)]
pub struct CronsClient {
configuration: Arc<Configuration>,
tenant_id: Option<String>,
}
impl CronsClient {
pub(crate) fn new(configuration: Arc<Configuration>, tenant_id: Option<String>) -> Self {
Self {
configuration,
tenant_id,
}
}
fn tenant_id(&self) -> Result<&str, HatchetError> {
self.tenant_id
.as_deref()
.ok_or_else(|| HatchetError::MissingTokenField("sub"))
}
pub async fn create(
&self,
workflow_name: &str,
opts: CreateCronOpts,
) -> Result<CronTrigger, HatchetError> {
let normalized = if opts.expression.split_whitespace().count() == 5 {
format!("0 {}", opts.expression)
} else {
opts.expression.clone()
};
normalized
.parse::<cron::Schedule>()
.map_err(|_| HatchetError::InvalidCronExpression(opts.expression.clone()))?;
let request = CronWorkflowTriggerCreateRequest {
input: opts.input,
additional_metadata: opts
.additional_metadata
.unwrap_or_else(|| serde_json::json!({})),
cron_name: opts.name,
cron_expression: opts.expression,
priority: opts.priority,
};
let tenant = self.tenant_id()?;
cron_workflow_trigger_create(&self.configuration, tenant, workflow_name, request)
.await
.map(Into::into)
.map_err(HatchetError::from_rest)
}
pub async fn get(&self, cron_id: &str) -> Result<CronTrigger, HatchetError> {
let tenant = self.tenant_id()?;
workflow_cron_get(&self.configuration, tenant, cron_id)
.await
.map(Into::into)
.map_err(HatchetError::from_rest)
}
pub async fn list(&self, opts: ListCronsOpts) -> Result<CronTriggerList, HatchetError> {
let tenant = self.tenant_id()?;
cron_workflow_list(
&self.configuration,
tenant,
opts.offset,
opts.limit,
opts.workflow_id.as_deref(),
opts.workflow_name.as_deref(),
opts.cron_name.as_deref(),
opts.additional_metadata,
opts.order_by_field.as_deref(),
opts.order_by_direction.as_deref(),
)
.await
.map(Into::into)
.map_err(HatchetError::from_rest)
}
pub async fn delete(&self, cron_id: &str) -> Result<(), HatchetError> {
let tenant = self.tenant_id()?;
workflow_cron_delete(&self.configuration, tenant, cron_id)
.await
.map_err(HatchetError::from_rest)
}
}
#[derive(Clone, Debug)]
pub struct CreateCronOpts {
pub name: String,
pub expression: String,
pub input: serde_json::Value,
pub additional_metadata: Option<serde_json::Value>,
pub priority: Option<i32>,
}
#[derive(Clone, Debug, Default)]
pub struct ListCronsOpts {
pub offset: Option<i64>,
pub limit: Option<i64>,
pub workflow_id: Option<String>,
pub workflow_name: Option<String>,
pub cron_name: Option<String>,
pub additional_metadata: Option<Vec<String>>,
pub order_by_field: Option<String>,
pub order_by_direction: Option<String>,
}
#[derive(Clone, Debug)]
pub struct CronTrigger {
pub metadata_id: String,
pub cron: String,
pub name: Option<String>,
pub workflow_id: String,
pub workflow_name: String,
pub input: serde_json::Value,
pub additional_metadata: serde_json::Value,
pub enabled: bool,
pub priority: Option<i32>,
}
impl From<CronWorkflowTriggerCreate200Response> for CronTrigger {
fn from(r: CronWorkflowTriggerCreate200Response) -> Self {
Self {
metadata_id: r.metadata.id,
cron: r.cron,
name: r.name,
workflow_id: r.workflow_id,
workflow_name: r.workflow_name,
input: hash_map_to_value(r.input),
additional_metadata: hash_map_to_value(r.additional_metadata),
enabled: r.enabled,
priority: r.priority,
}
}
}
#[derive(Clone, Debug)]
pub struct CronTriggerList {
pub rows: Vec<CronTrigger>,
pub pagination: Option<PaginationResponse>,
}
impl From<CronWorkflowList200Response> for CronTriggerList {
fn from(r: CronWorkflowList200Response) -> Self {
Self {
rows: r
.rows
.unwrap_or_default()
.into_iter()
.map(Into::into)
.collect(),
pagination: r.pagination.map(Into::into),
}
}
}
#[derive(Clone, Debug, Default)]
pub struct CronOptions {
pub additional_metadata: Option<serde_json::Value>,
pub priority: Option<i32>,
}
#[cfg(test)]
mod tests {
#[test]
fn test_six_field_cron_expression_parses() {
assert!("0 */2 * * * *".parse::<cron::Schedule>().is_ok());
}
#[test]
fn test_five_field_cron_with_prepended_seconds_parses() {
let five_field = "*/2 * * * *";
let normalized = format!("0 {}", five_field);
assert!(normalized.parse::<cron::Schedule>().is_ok());
}
#[test]
fn test_invalid_cron_expression_fails() {
assert!("not a cron".parse::<cron::Schedule>().is_err());
}
#[test]
fn test_missing_tenant_returns_error() {
let client = super::CronsClient::new(
std::sync::Arc::new(crate::Configuration {
base_path: String::new(),
client: reqwest::Client::new(),
basic_auth: None,
oauth_access_token: None,
bearer_access_token: None,
user_agent: None,
api_key: None,
}),
None,
);
let err = client.tenant_id().unwrap_err();
assert!(matches!(err, crate::HatchetError::MissingTokenField("sub")));
}
}