1use std::{convert::TryFrom, str::FromStr};
2
3use anyhow::{Result, anyhow};
4use chrono::{DateTime, Utc};
5use cron::Schedule;
6use restate_sdk::{context::RequestTarget, prelude::*};
7use rhai::packages::Package;
8use rhai_chrono::ChronoPackage;
9use schemars::JsonSchema;
10use serde::{Deserialize, Serialize};
11
12use crate::error::TerminalExt;
13use crate::utils::RequestExt;
14
15#[restate_sdk::object]
16#[name = "CronJob"]
17pub trait Object {
18 async fn create(job: Json<JobSpec>) -> HandlerResult<()>;
20 async fn replace(job: Json<JobSpec>) -> HandlerResult<()>;
22 async fn cancel() -> HandlerResult<()>;
24 async fn run() -> HandlerResult<()>;
26 #[shared]
28 async fn get() -> HandlerResult<Json<JobSpec>>;
29 #[shared]
31 #[name = "getNextRun"]
32 async fn get_next_run() -> HandlerResult<Json<NextRun>>;
33}
34
35#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
36#[serde(rename_all = "camelCase")]
37#[schemars(example = example_cron_job())]
38pub struct JobSpec {
39 pub schedule: String,
41 pub target: ServiceType,
43 pub payload: Option<Payload>,
45}
46
47fn example_cron_job() -> JobSpec {
48 JobSpec {
49 schedule: "0 */1 * * * *".to_string(),
50 target: ServiceType::Service {
51 name: "Greeter".to_string(),
52 handler: "greet".to_string(),
53 },
54 payload: Some(Payload::Json {
55 content: serde_json::json!("World"),
56 }),
57 }
58}
59
60impl JobSpec {
61 fn validate(&self) -> HandlerResult<()> {
62 parse_schedule(self.schedule.clone())?;
63
64 Ok(())
65 }
66}
67
68#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
69#[serde(rename_all = "camelCase", tag = "type")]
70pub enum Payload {
71 Json { content: serde_json::Value },
72 Rhai { content: String },
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
76#[serde(rename_all = "camelCase", tag = "type")]
77pub enum ServiceType {
78 Service {
79 name: String,
80 handler: String,
81 },
82 Object {
83 name: String,
84 key: String,
85 handler: String,
86 },
87 Workflow {
88 name: String,
89 key: String,
90 handler: String,
91 },
92}
93
94impl From<ServiceType> for RequestTarget {
95 fn from(val: ServiceType) -> Self {
96 match val {
97 ServiceType::Service { name, handler } => RequestTarget::Service { name, handler },
98 ServiceType::Object { name, key, handler } => {
99 RequestTarget::Object { name, key, handler }
100 }
101 ServiceType::Workflow { name, key, handler } => {
102 RequestTarget::Workflow { name, key, handler }
103 }
104 }
105 }
106}
107
108#[derive(Debug, Deserialize, Serialize, JsonSchema)]
109#[serde(rename_all = "camelCase")]
110pub struct NextRun {
111 invocation_id: String,
113 timestamp: DateTime<Utc>,
115}
116
117pub struct ObjectImpl {
118 rhai_engine: rhai::Engine,
119}
120
121impl ObjectImpl {
122 pub fn new(rhai_engine: rhai::Engine) -> Self {
123 Self { rhai_engine }
124 }
125
126 async fn schedule_next(ctx: &ObjectContext<'_>, schedule: String) -> Result<()> {
127 let (next_time, schedule) = ctx
128 .run(async || {
129 let next_time = parse_schedule(schedule)?
130 .upcoming(Utc)
131 .next()
132 .ok_or_else(|| anyhow!("No upcoming schedule found"))
133 .terminal_with_code(404)?;
134
135 let duration = (next_time - Utc::now())
136 .to_std()
137 .map_err(|err| anyhow!("Failed to convert duration: {}", err))
138 .terminal_with_code(422)?;
139
140 Ok(Json((next_time, duration)))
141 })
142 .await
143 .map_err(|err| anyhow!("Error: {}", err))?
144 .into_inner();
145
146 let handle = ctx
147 .object_client::<ObjectClient>(ctx.key())
148 .run()
149 .send_after(schedule);
150
151 let next_run = NextRun {
152 invocation_id: handle
153 .invocation_id()
154 .await
155 .map_err(|err| anyhow!("Failed to get invocation ID: {}", err))?
156 .to_string(),
157 timestamp: next_time,
158 };
159
160 ctx.set::<Json<NextRun>>(NEXT_RUN, Json(next_run));
161
162 Ok(())
163 }
164
165 async fn _create(&self, ctx: &ObjectContext<'_>, job: Json<JobSpec>) -> HandlerResult<()> {
166 if ctx.get::<Json<JobSpec>>(JOB_SPEC).await?.is_some() {
168 return Err(TerminalError::new_with_code(409, "Cron job already exists").into());
169 }
170
171 let job = job.into_inner();
172
173 job.validate()?;
175
176 ctx.set::<Json<JobSpec>>(JOB_SPEC, Json(job.clone()));
177
178 ObjectImpl::schedule_next(ctx, job.schedule).await?;
179
180 Ok(())
181 }
182
183 async fn _cancel(&self, ctx: &ObjectContext<'_>) -> HandlerResult<()> {
184 let next_run = ctx.get::<Json<NextRun>>(NEXT_RUN).await?;
186
187 ctx.clear_all();
189
190 if let Some(next_run) = next_run.map(|s| s.into_inner()) {
192 ctx.invocation_handle(next_run.invocation_id)
193 .cancel()
194 .await?;
195 }
196
197 Ok(())
198 }
199}
200
201impl Default for ObjectImpl {
202 fn default() -> Self {
203 let mut engine = rhai::Engine::new();
204
205 {
206 let package = ChronoPackage::new();
207 package.register_into_engine(&mut engine);
208 }
209
210 Self {
211 rhai_engine: engine,
212 }
213 }
214}
215
216const JOB_SPEC: &str = "job_spec";
217const NEXT_RUN: &str = "next_run";
218
219impl Object for ObjectImpl {
220 async fn create(&self, ctx: ObjectContext<'_>, job: Json<JobSpec>) -> HandlerResult<()> {
221 self._create(&ctx, job).await
222 }
223
224 async fn replace(&self, ctx: ObjectContext<'_>, job: Json<JobSpec>) -> HandlerResult<()> {
225 self._cancel(&ctx).await?;
226 self._create(&ctx, job).await
227 }
228
229 async fn cancel(&self, ctx: ObjectContext<'_>) -> HandlerResult<()> {
230 self._cancel(&ctx).await
231 }
232
233 async fn run(&self, ctx: ObjectContext<'_>) -> HandlerResult<()> {
234 let job = ctx.get::<Json<JobSpec>>(JOB_SPEC).await?;
235
236 if job.is_none() {
238 return Ok(());
239 }
240
241 let job = job.unwrap().into_inner();
242 let target = job.target.clone();
243 let content_type = "application/json".to_string();
244 let idempotency_key = ctx.headers().get("x-restate-id").map(|v| v.to_string());
245
246 if let Some(payload) = &job.payload {
247 let data = match payload {
248 Payload::Json { content: data } => Json(data.clone()),
249 Payload::Rhai { content: script } => {
250 let result = self.rhai_engine.eval::<rhai::Dynamic>(script.as_str())?;
251
252 let value = rhai::serde::from_dynamic::<serde_json::Value>(&result)?;
253
254 Json(value)
255 }
256 };
257
258 ctx.request::<_, ()>(target.into(), data)
259 .header("Content-Type".to_string(), content_type)
260 .idempotency_key_maybe(idempotency_key)
261 .call()
262 .await?;
263 } else {
264 ctx.request::<(), ()>(target.into(), ())
265 .idempotency_key_maybe(idempotency_key)
266 .call()
267 .await?;
268 }
269
270 ObjectImpl::schedule_next(&ctx, job.schedule).await?;
272
273 Ok(())
274 }
275
276 async fn get(&self, ctx: SharedObjectContext<'_>) -> HandlerResult<Json<JobSpec>> {
277 ctx.get::<Json<JobSpec>>(JOB_SPEC)
278 .await?
279 .ok_or_else(|| TerminalError::new_with_code(404, "Cron job not found").into())
280 }
281
282 async fn get_next_run(&self, ctx: SharedObjectContext<'_>) -> HandlerResult<Json<NextRun>> {
283 ctx.get::<Json<NextRun>>(NEXT_RUN)
284 .await?
285 .ok_or_else(|| TerminalError::new_with_code(404, "Cron job not found").into())
286 }
287}
288
289fn parse_schedule(schedule: String) -> Result<Schedule, HandlerError> {
290 Schedule::from_str(schedule.as_str())
291 .map_err(|err| anyhow!("Failed to parse schedule: {}", err))
292 .terminal_with_code(422)
293}