use crate::database::Db;
use serde::{Serialize, Deserialize};
use surrealdb::RecordId;
use serde_json::{Value, Number};
use crate::cron_schedule::CronSchedule;
use chrono::{DateTime, Duration, Utc};
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum ScheduleStatus {
Enabled,
Disabled,
}
impl std::fmt::Display for ScheduleStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Enabled => write!(f,"Enabled"),
Self::Disabled => write!(f,"Disabled"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScheduleData {
pub id: Option<RecordId>,
pub name: Option<String>,
pub queue: Option<String>,
pub cron_expression: Option<CronSchedule>,
pub handler: Option<String>,
pub parameters: Option<HashMap<String,Value>>,
pub status: Option<ScheduleStatus>,
pub one_time: bool,
pub start_schedule: Option<DateTime<Utc>>,
pub until_schedule: Option<DateTime<Utc>>,
pub next_schedule: Option<DateTime<Utc>>,
pub date_created: Option<DateTime<Utc>>,
pub date_modified: Option<DateTime<Utc>>
}
impl Default for ScheduleData {
fn default() -> Self {
Self {
id: None,
name: None,
parameters: None,
cron_expression: None,
queue: None,
handler: None,
status: Some(ScheduleStatus::Disabled),
one_time: false,
start_schedule: None,
next_schedule: None,
until_schedule: None,
date_created: None,
date_modified: None
}
}
}
#[derive(Debug,Clone)]
pub struct Schedule<'a>{
db: Arc<Db>,
pub table: &'a str
}
#[derive(Debug, Clone)]
pub struct ScheduleListConditions {
pub status: Option<Vec<String>>,
pub queue: Option<Vec<String>>,
pub handler: Option<Vec<String>>,
pub name: Option<Vec<String>>,
pub start_schedule: Option<DateTime<Utc>>,
pub until_schedule: Option<DateTime<Utc>>,
pub one_time: Option<bool>,
pub upcoming: Option<bool>,
pub limit: Option<usize>
}
impl Default for ScheduleListConditions {
fn default() -> Self {
Self {
status: None,
queue: None,
handler: None,
name: None,
one_time: None,
start_schedule: None,
until_schedule: None,
upcoming: Some(true),
limit: None
}
}
}
impl<'a> Schedule<'a>{
pub async fn new(db: Option<Arc<Db>>) -> Self {
let db: Arc<Db> = if let Some(value) = db {
value.clone()
}
else {
Arc::new(Db::new(None).await.unwrap())
};
Self {
db,
table: "kafru_schedules"
}
}
pub async fn list(&self,conditions: ScheduleListConditions) -> Result<Vec<ScheduleData>,String> {
let mut bindings: HashMap<&str,Value> = HashMap::new();
bindings.insert("table", Value::String(self.table.to_string()));
let mut stmt = String::from("SELECT * FROM type::table($table)");
let mut stmt_where: Vec<&str> = Vec::new();
if conditions.status.is_some() {
let values: Vec<Value> = conditions.status.unwrap().into_iter().map(|value| Value::String(value)).collect();
bindings.insert("status", Value::Array(values));
stmt_where.push("status IN $status");
}
if conditions.queue.is_some() {
let values: Vec<Value> = conditions.queue.unwrap().into_iter().map(|value| Value::String(value)).collect();
bindings.insert("queue", Value::Array(values));
stmt_where.push("queue IN $queue");
}
if conditions.handler.is_some() {
let values: Vec<Value> = conditions.handler.unwrap().into_iter().map(|value| Value::String(value)).collect();
bindings.insert("handler", Value::Array(values));
stmt_where.push("handler IN $handler");
}
if conditions.one_time.is_some() {
bindings.insert("one_time", Value::Bool(conditions.one_time.unwrap()));
stmt_where.push("one_time=$one_time");
}
if conditions.name.is_some() {
let values: Vec<Value> = conditions.name.unwrap().into_iter().map(|value| Value::String(value)).collect();
bindings.insert("name", Value::Array(values));
stmt_where.push("name IN $name");
}
if let Some(start_schedule) = conditions.start_schedule {
bindings.insert("start_schedule", Value::String(start_schedule.to_rfc3339()));
stmt_where.push("start_schedule <= $start_schedule");
}
if let Some(until_schedule) = conditions.until_schedule {
bindings.insert("until_schedule", Value::String(until_schedule.to_rfc3339()));
stmt_where.push("until_schedule >= $until_schedule");
}
if let Some(upcoming) = conditions.upcoming {
if upcoming {
let today: String = Utc::now().to_rfc3339();
bindings.insert("next_schedule", Value::String(today));
stmt_where.push("next_schedule <= $next_schedule");
}
}
if stmt_where.len() > 0 {
stmt.push_str(format!(" WHERE {}",stmt_where.join(" AND ")).as_str());
}
stmt.push_str(" ORDER BY next_schedule ASC");
if conditions.limit.is_some() {
bindings.insert("limit", Value::Number(Number::from(conditions.limit.unwrap())));
stmt.push_str(" LIMIT $limit");
}
match self.db.client.query(stmt)
.bind(bindings).await {
Ok(mut response) => {
match response.take::<Vec<ScheduleData>>(0) {
Ok(data)=> {
Ok(data)
},
Err(error) => Err(error.to_string())
}
},
Err(error) => Err(error.to_string())
}
}
pub async fn create(&self, mut data: ScheduleData ) -> Result<ScheduleData,String> {
let id: String = uuid::Uuid::new_v4().to_string();
data.date_created = Some(Utc::now());
data.start_schedule = Some(data.start_schedule.unwrap_or(Utc::now()));
if let Some(cron_expression) = data.cron_expression.clone() {
let cron_expression = cron_expression;
data.next_schedule = cron_expression.get_upcoming(data.start_schedule).unwrap();
}
else {
if data.next_schedule.is_none() {
return Err("cron expression is required when the next scheduled time is not provided".to_string());
}
}
match self.db.client.create::<Option<ScheduleData>>((self.table,id)).content(data).await {
Ok(result) => {
if let Some(record) = result {
return Ok(record);
}
Err("unable to push record".to_string())
}
Err(error) => Err(error.to_string())
}
}
pub async fn remove(&self, id: RecordId ) -> Result<ScheduleData,String> {
match self.db.client.delete::<Option<ScheduleData>>(id).await {
Ok(result) => {
if let Some(record) = result {
return Ok(record);
}
Err("unable to remove record".to_string())
}
Err(error) => Err(error.to_string())
}
}
pub async fn purge(&self) -> Result<u64,String> {
match self.db.client.query("
SELECT count() as total FROM type::table($table) GROUP ALL;
DELETE FROM type::table($table);
").bind(("table",self.table.to_owned())).await {
Ok(mut response) => {
match response.take::<Option<Value>>(0){
Ok(data) => {
let mut total: u64 = 0;
if let Some(item) = data {
total = item.get("total").unwrap().as_u64().unwrap();
}
Ok(total)
}
Err(error) => Err(error.to_string())
}
}
Err(error) => Err(error.to_string())
}
}
pub async fn get(&self, id: RecordId) -> Result<ScheduleData,String> {
match self.db.client.select::<Option<ScheduleData>>(id).await {
Ok(data) => {
if let Some(record) = data {
return Ok(record)
}
Err("unable to retrieve record".to_string())
}
Err(error) => Err(error.to_string())
}
}
pub async fn update(&self, id: RecordId, data: ScheduleData) -> Result<ScheduleData,String> {
match self.get(id.clone()).await {
Ok(record)=> {
let cron_expression: Option<CronSchedule> = if data.cron_expression.is_none() { record.cron_expression } else { data.cron_expression };
let start_schedule: DateTime<Utc> = if data.start_schedule.is_none() { record.start_schedule.unwrap() } else { data.start_schedule.unwrap() };
let next_schedule: Option<DateTime<Utc>> = if data.one_time { None } else {
if data.next_schedule.is_none() {
if let Some(cron_expression) = cron_expression.clone() {
let cron_upcoming_start = if start_schedule > Utc::now() { start_schedule } else { Utc::now() + Duration::minutes(1) };
cron_expression.get_upcoming(Some(cron_upcoming_start)).unwrap()
}
else {
return Err("cron expression is required when the next scheduled time is not provided".to_string())
}
}
else {
data.next_schedule
}
};
let data: ScheduleData = ScheduleData {
name: if data.name.is_none() { record.name } else { data.name },
parameters: if data.parameters.is_none() { record.parameters } else { data.parameters },
handler: if data.handler.is_none() { record.handler } else { data.handler },
status: if data.status.is_none() { record.status } else { data.status },
until_schedule: if data.until_schedule.is_none() { record.until_schedule } else { data.until_schedule },
cron_expression: cron_expression.clone(),
one_time: data.one_time,
start_schedule: Some(start_schedule),
next_schedule,
queue: if data.queue.is_none() { record.queue } else { data.queue },
date_created: if data.date_created.is_none() { record.date_created } else { data.date_created },
date_modified: Some(Utc::now()),
..Default::default()
};
match self.db.client.update::<Option<ScheduleData>>(record.id.unwrap()).content(data).await {
Ok(result) => {
if let Some(record) = result {
return Ok(record);
}
Err("unable to update record".to_string())
}
Err(error) => Err(error.to_string())
}
}
Err(error) => Err(error.to_string())
}
}
}