Skip to main content

restate_cron/
service.rs

1use std::{convert::TryFrom, str::FromStr};
2
3use anyhow::{Result, anyhow};
4use chrono::{DateTime, Utc};
5use cron::Schedule;
6use restate_sdk::{context::RequestTarget, prelude::*};
7use rhai::packages::Package;
8use rhai_chrono::ChronoPackage;
9use schemars::JsonSchema;
10use serde::{Deserialize, Serialize};
11
12use crate::error::TerminalExt;
13use crate::utils::RequestExt;
14
15#[restate_sdk::object]
16#[name = "CronJob"]
17pub trait Object {
18    /// Create a new cron job.
19    async fn create(job: Json<JobSpec>) -> HandlerResult<()>;
20    /// Create a new or replace an existing cron job.
21    async fn replace(job: Json<JobSpec>) -> HandlerResult<()>;
22    /// Cancel an existing cron job.
23    async fn cancel() -> HandlerResult<()>;
24    /// Internal handler for running the cron job.
25    async fn run() -> HandlerResult<()>;
26    /// Get the details of an existing cron job.
27    #[shared]
28    async fn get() -> HandlerResult<Json<JobSpec>>;
29    /// Get the next run time of an existing cron job.
30    #[shared]
31    #[name = "getNextRun"]
32    async fn get_next_run() -> HandlerResult<Json<NextRun>>;
33}
34
35#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
36#[serde(rename_all = "camelCase")]
37#[schemars(example = example_cron_job())]
38pub struct JobSpec {
39    /// Cron schedule for the job (eg. "*/1 * * * * *").
40    pub schedule: String,
41    /// Target service to be called.
42    pub target: ServiceType,
43    /// Payload to be sent to the target service.
44    pub payload: Option<Payload>,
45}
46
47fn example_cron_job() -> JobSpec {
48    JobSpec {
49        schedule: "0 */1 * * * *".to_string(),
50        target: ServiceType::Service {
51            name: "Greeter".to_string(),
52            handler: "greet".to_string(),
53        },
54        payload: Some(Payload::Json {
55            content: serde_json::json!("World"),
56        }),
57    }
58}
59
60impl JobSpec {
61    fn validate(&self) -> HandlerResult<()> {
62        parse_schedule(self.schedule.clone())?;
63
64        Ok(())
65    }
66}
67
68#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
69#[serde(rename_all = "camelCase", tag = "type")]
70pub enum Payload {
71    Json { content: serde_json::Value },
72    Rhai { content: String },
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
76#[serde(rename_all = "camelCase", tag = "type")]
77pub enum ServiceType {
78    Service {
79        name: String,
80        handler: String,
81    },
82    Object {
83        name: String,
84        key: String,
85        handler: String,
86    },
87    Workflow {
88        name: String,
89        key: String,
90        handler: String,
91    },
92}
93
94impl From<ServiceType> for RequestTarget {
95    fn from(val: ServiceType) -> Self {
96        match val {
97            ServiceType::Service { name, handler } => RequestTarget::Service { name, handler },
98            ServiceType::Object { name, key, handler } => {
99                RequestTarget::Object { name, key, handler }
100            }
101            ServiceType::Workflow { name, key, handler } => {
102                RequestTarget::Workflow { name, key, handler }
103            }
104        }
105    }
106}
107
108#[derive(Debug, Deserialize, Serialize, JsonSchema)]
109#[serde(rename_all = "camelCase")]
110pub struct NextRun {
111    /// Invocation ID of the next run.
112    invocation_id: String,
113    /// Timestamp of the next run.
114    timestamp: DateTime<Utc>,
115}
116
117pub struct ObjectImpl {
118    rhai_engine: rhai::Engine,
119}
120
121impl ObjectImpl {
122    pub fn new(rhai_engine: rhai::Engine) -> Self {
123        Self { rhai_engine }
124    }
125
126    async fn schedule_next(ctx: &ObjectContext<'_>, schedule: String) -> Result<()> {
127        let (next_time, schedule) = ctx
128            .run(async || {
129                let next_time = parse_schedule(schedule)?
130                    .upcoming(Utc)
131                    .next()
132                    .ok_or_else(|| anyhow!("No upcoming schedule found"))
133                    .terminal_with_code(404)?;
134
135                let duration = (next_time - Utc::now())
136                    .to_std()
137                    .map_err(|err| anyhow!("Failed to convert duration: {}", err))
138                    .terminal_with_code(422)?;
139
140                Ok(Json((next_time, duration)))
141            })
142            .await
143            .map_err(|err| anyhow!("Error: {}", err))?
144            .into_inner();
145
146        let handle = ctx
147            .object_client::<ObjectClient>(ctx.key())
148            .run()
149            .send_after(schedule);
150
151        let next_run = NextRun {
152            invocation_id: handle
153                .invocation_id()
154                .await
155                .map_err(|err| anyhow!("Failed to get invocation ID: {}", err))?
156                .to_string(),
157            timestamp: next_time,
158        };
159
160        ctx.set::<Json<NextRun>>(NEXT_RUN, Json(next_run));
161
162        Ok(())
163    }
164
165    async fn _create(&self, ctx: &ObjectContext<'_>, job: Json<JobSpec>) -> HandlerResult<()> {
166        // Check if job already exists
167        if ctx.get::<Json<JobSpec>>(JOB_SPEC).await?.is_some() {
168            return Err(TerminalError::new_with_code(409, "Cron job already exists").into());
169        }
170
171        let job = job.into_inner();
172
173        // Validate job specification and return early if invalid
174        job.validate()?;
175
176        ctx.set::<Json<JobSpec>>(JOB_SPEC, Json(job.clone()));
177
178        ObjectImpl::schedule_next(ctx, job.schedule).await?;
179
180        Ok(())
181    }
182
183    async fn _cancel(&self, ctx: &ObjectContext<'_>) -> HandlerResult<()> {
184        // Get next run
185        let next_run = ctx.get::<Json<NextRun>>(NEXT_RUN).await?;
186
187        // Clear state
188        ctx.clear_all();
189
190        // Cancel the next scheduled invocation
191        if let Some(next_run) = next_run.map(|s| s.into_inner()) {
192            ctx.invocation_handle(next_run.invocation_id)
193                .cancel()
194                .await?;
195        }
196
197        Ok(())
198    }
199}
200
201impl Default for ObjectImpl {
202    fn default() -> Self {
203        let mut engine = rhai::Engine::new();
204
205        {
206            let package = ChronoPackage::new();
207            package.register_into_engine(&mut engine);
208        }
209
210        Self {
211            rhai_engine: engine,
212        }
213    }
214}
215
216const JOB_SPEC: &str = "job_spec";
217const NEXT_RUN: &str = "next_run";
218
219impl Object for ObjectImpl {
220    async fn create(&self, ctx: ObjectContext<'_>, job: Json<JobSpec>) -> HandlerResult<()> {
221        self._create(&ctx, job).await
222    }
223
224    async fn replace(&self, ctx: ObjectContext<'_>, job: Json<JobSpec>) -> HandlerResult<()> {
225        self._cancel(&ctx).await?;
226        self._create(&ctx, job).await
227    }
228
229    async fn cancel(&self, ctx: ObjectContext<'_>) -> HandlerResult<()> {
230        self._cancel(&ctx).await
231    }
232
233    async fn run(&self, ctx: ObjectContext<'_>) -> HandlerResult<()> {
234        let job = ctx.get::<Json<JobSpec>>(JOB_SPEC).await?;
235
236        // Job is not scheduled, do nothing
237        if job.is_none() {
238            return Ok(());
239        }
240
241        let job = job.unwrap().into_inner();
242        let target = job.target.clone();
243        let content_type = "application/json".to_string();
244        let idempotency_key = ctx.headers().get("x-restate-id").map(|v| v.to_string());
245
246        if let Some(payload) = &job.payload {
247            let data = match payload {
248                Payload::Json { content: data } => Json(data.clone()),
249                Payload::Rhai { content: script } => {
250                    let result = self.rhai_engine.eval::<rhai::Dynamic>(script.as_str())?;
251
252                    let value = rhai::serde::from_dynamic::<serde_json::Value>(&result)?;
253
254                    Json(value)
255                }
256            };
257
258            ctx.request::<_, ()>(target.into(), data)
259                .header("Content-Type".to_string(), content_type)
260                .idempotency_key_maybe(idempotency_key)
261                .call()
262                .await?;
263        } else {
264            ctx.request::<(), ()>(target.into(), ())
265                .idempotency_key_maybe(idempotency_key)
266                .call()
267                .await?;
268        }
269
270        // Schedule the next invocation
271        ObjectImpl::schedule_next(&ctx, job.schedule).await?;
272
273        Ok(())
274    }
275
276    async fn get(&self, ctx: SharedObjectContext<'_>) -> HandlerResult<Json<JobSpec>> {
277        ctx.get::<Json<JobSpec>>(JOB_SPEC)
278            .await?
279            .ok_or_else(|| TerminalError::new_with_code(404, "Cron job not found").into())
280    }
281
282    async fn get_next_run(&self, ctx: SharedObjectContext<'_>) -> HandlerResult<Json<NextRun>> {
283        ctx.get::<Json<NextRun>>(NEXT_RUN)
284            .await?
285            .ok_or_else(|| TerminalError::new_with_code(404, "Cron job not found").into())
286    }
287}
288
289fn parse_schedule(schedule: String) -> Result<Schedule, HandlerError> {
290    Schedule::from_str(schedule.as_str())
291        .map_err(|err| anyhow!("Failed to parse schedule: {}", err))
292        .terminal_with_code(422)
293}