use std::{convert::TryFrom, str::FromStr};
use anyhow::{Result, anyhow};
use chrono::{DateTime, Utc};
use cron::Schedule;
use restate_sdk::{context::RequestTarget, prelude::*};
use rhai::packages::Package;
use rhai_chrono::ChronoPackage;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::error::TerminalExt;
use crate::utils::RequestExt;
#[restate_sdk::object]
#[name = "CronJob"]
pub trait Object {
async fn create(job: Json<JobSpec>) -> HandlerResult<()>;
async fn replace(job: Json<JobSpec>) -> HandlerResult<()>;
async fn cancel() -> HandlerResult<()>;
async fn run() -> HandlerResult<()>;
#[shared]
async fn get() -> HandlerResult<Json<JobSpec>>;
#[shared]
#[name = "getNextRun"]
async fn get_next_run() -> HandlerResult<Json<NextRun>>;
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
#[schemars(example = example_cron_job())]
pub struct JobSpec {
pub schedule: String,
pub target: ServiceType,
pub payload: Option<Payload>,
}
fn example_cron_job() -> JobSpec {
JobSpec {
schedule: "0 */1 * * * *".to_string(),
target: ServiceType::Service {
name: "Greeter".to_string(),
handler: "greet".to_string(),
},
payload: Some(Payload::Json {
content: serde_json::json!("World"),
}),
}
}
impl JobSpec {
fn validate(&self) -> HandlerResult<()> {
parse_schedule(self.schedule.clone())?;
Ok(())
}
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum Payload {
Json { content: serde_json::Value },
Rhai { content: String },
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum ServiceType {
Service {
name: String,
handler: String,
},
Object {
name: String,
key: String,
handler: String,
},
Workflow {
name: String,
key: String,
handler: String,
},
}
impl From<ServiceType> for RequestTarget {
fn from(val: ServiceType) -> Self {
match val {
ServiceType::Service { name, handler } => RequestTarget::Service { name, handler },
ServiceType::Object { name, key, handler } => {
RequestTarget::Object { name, key, handler }
}
ServiceType::Workflow { name, key, handler } => {
RequestTarget::Workflow { name, key, handler }
}
}
}
}
#[derive(Debug, Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct NextRun {
invocation_id: String,
timestamp: DateTime<Utc>,
}
pub struct ObjectImpl {
rhai_engine: rhai::Engine,
}
impl ObjectImpl {
pub fn new(rhai_engine: rhai::Engine) -> Self {
Self { rhai_engine }
}
async fn schedule_next(ctx: &ObjectContext<'_>, schedule: String) -> Result<()> {
let (next_time, schedule) = ctx
.run(async || {
let next_time = parse_schedule(schedule)?
.upcoming(Utc)
.next()
.ok_or_else(|| anyhow!("No upcoming schedule found"))
.terminal_with_code(404)?;
let duration = (next_time - Utc::now())
.to_std()
.map_err(|err| anyhow!("Failed to convert duration: {}", err))
.terminal_with_code(422)?;
Ok(Json((next_time, duration)))
})
.await
.map_err(|err| anyhow!("Error: {}", err))?
.into_inner();
let handle = ctx
.object_client::<ObjectClient>(ctx.key())
.run()
.send_after(schedule);
let next_run = NextRun {
invocation_id: handle
.invocation_id()
.await
.map_err(|err| anyhow!("Failed to get invocation ID: {}", err))?
.to_string(),
timestamp: next_time,
};
ctx.set::<Json<NextRun>>(NEXT_RUN, Json(next_run));
Ok(())
}
async fn _create(&self, ctx: &ObjectContext<'_>, job: Json<JobSpec>) -> HandlerResult<()> {
if ctx.get::<Json<JobSpec>>(JOB_SPEC).await?.is_some() {
return Err(TerminalError::new_with_code(409, "Cron job already exists").into());
}
let job = job.into_inner();
job.validate()?;
ctx.set::<Json<JobSpec>>(JOB_SPEC, Json(job.clone()));
ObjectImpl::schedule_next(ctx, job.schedule).await?;
Ok(())
}
async fn _cancel(&self, ctx: &ObjectContext<'_>) -> HandlerResult<()> {
let next_run = ctx.get::<Json<NextRun>>(NEXT_RUN).await?;
ctx.clear_all();
if let Some(next_run) = next_run.map(|s| s.into_inner()) {
ctx.invocation_handle(next_run.invocation_id)
.cancel()
.await?;
}
Ok(())
}
}
impl Default for ObjectImpl {
fn default() -> Self {
let mut engine = rhai::Engine::new();
{
let package = ChronoPackage::new();
package.register_into_engine(&mut engine);
}
Self {
rhai_engine: engine,
}
}
}
const JOB_SPEC: &str = "job_spec";
const NEXT_RUN: &str = "next_run";
impl Object for ObjectImpl {
async fn create(&self, ctx: ObjectContext<'_>, job: Json<JobSpec>) -> HandlerResult<()> {
self._create(&ctx, job).await
}
async fn replace(&self, ctx: ObjectContext<'_>, job: Json<JobSpec>) -> HandlerResult<()> {
self._cancel(&ctx).await?;
self._create(&ctx, job).await
}
async fn cancel(&self, ctx: ObjectContext<'_>) -> HandlerResult<()> {
self._cancel(&ctx).await
}
async fn run(&self, ctx: ObjectContext<'_>) -> HandlerResult<()> {
let job = ctx.get::<Json<JobSpec>>(JOB_SPEC).await?;
if job.is_none() {
return Ok(());
}
let job = job.unwrap().into_inner();
let target = job.target.clone();
let content_type = "application/json".to_string();
let idempotency_key = ctx.headers().get("x-restate-id").map(|v| v.to_string());
if let Some(payload) = &job.payload {
let data = match payload {
Payload::Json { content: data } => Json(data.clone()),
Payload::Rhai { content: script } => {
let result = self.rhai_engine.eval::<rhai::Dynamic>(script.as_str())?;
let value = rhai::serde::from_dynamic::<serde_json::Value>(&result)?;
Json(value)
}
};
ctx.request::<_, ()>(target.into(), data)
.header("Content-Type".to_string(), content_type)
.idempotency_key_maybe(idempotency_key)
.call()
.await?;
} else {
ctx.request::<(), ()>(target.into(), ())
.idempotency_key_maybe(idempotency_key)
.call()
.await?;
}
ObjectImpl::schedule_next(&ctx, job.schedule).await?;
Ok(())
}
async fn get(&self, ctx: SharedObjectContext<'_>) -> HandlerResult<Json<JobSpec>> {
ctx.get::<Json<JobSpec>>(JOB_SPEC)
.await?
.ok_or_else(|| TerminalError::new_with_code(404, "Cron job not found").into())
}
async fn get_next_run(&self, ctx: SharedObjectContext<'_>) -> HandlerResult<Json<NextRun>> {
ctx.get::<Json<NextRun>>(NEXT_RUN)
.await?
.ok_or_else(|| TerminalError::new_with_code(404, "Cron job not found").into())
}
}
fn parse_schedule(schedule: String) -> Result<Schedule, HandlerError> {
Schedule::from_str(schedule.as_str())
.map_err(|err| anyhow!("Failed to parse schedule: {}", err))
.terminal_with_code(422)
}