use crate::resources::{Step, Task, VariableValue};
use crate::schema::task_steps;
use crate::Database;
use crate::Processor;
use automaat_core::Context;
use chrono::prelude::*;
use chrono::NaiveDateTime;
use diesel::prelude::*;
use juniper::GraphQLEnum;
use serde::{Deserialize, Serialize};
use std::convert::{AsRef, TryFrom};
use std::error;
#[derive(Clone, Copy, Debug, DbEnum, GraphQLEnum, Serialize, Deserialize)]
#[PgType = "TaskStepStatus"]
#[graphql(name = "TaskStepStatus")]
pub enum Status {
Initialized,
Pending,
Running,
Failed,
Cancelled,
Ok,
}
#[derive(
Clone, Debug, Deserialize, Serialize, AsChangeset, Associations, Identifiable, Queryable,
)]
#[belongs_to(Task)]
#[table_name = "task_steps"]
pub(crate) struct TaskStep {
pub(crate) id: i32,
pub(crate) name: String,
pub(crate) description: Option<String>,
pub(crate) processor: serde_json::Value,
pub(crate) position: i32,
pub(crate) started_at: Option<NaiveDateTime>,
pub(crate) finished_at: Option<NaiveDateTime>,
pub(crate) status: Status,
pub(crate) output: Option<String>,
pub(crate) task_id: i32,
}
impl TaskStep {
pub(crate) fn processor(&self) -> Option<Processor> {
serde_json::from_value(self.processor.clone()).ok()
}
pub(crate) fn task(&self, conn: &Database) -> QueryResult<Task> {
use crate::schema::tasks::dsl::*;
tasks.filter(id.eq(self.task_id)).first(&**conn)
}
pub(crate) fn run(
&mut self,
conn: &Database,
context: &Context,
input: Option<&str>,
) -> Result<Option<String>, Box<dyn error::Error>> {
self.start(conn)?;
let result = match self.processor_with_input_and_context(input, context) {
Ok(p) => p.run(context),
Err(err) => Err(format!("task processor cannot be deserialized: {}", err).into()),
};
match result {
Ok(output) => {
self.finished(conn, Status::Ok, output.clone())?;
Ok(output)
}
Err(err) => {
self.finished(conn, Status::Failed, Some(err.to_string()))?;
Err(err)
}
}
}
fn start(&mut self, conn: &Database) -> QueryResult<()> {
self.status = Status::Running;
self.started_at = Some(Utc::now().naive_utc());
match self.save_changes::<Self>(&**conn) {
Ok(_) => Ok(()),
Err(err) => {
self.status = Status::Failed;
Err(err)
}
}
}
fn finished(
&mut self,
conn: &Database,
status: Status,
output: Option<String>,
) -> QueryResult<()> {
self.finished_at = Some(Utc::now().naive_utc());
self.status = status;
self.output = output;
self.save_changes::<Self>(&**conn).map(|_| ())
}
fn value_replace(&self, value: &mut serde_json::Value, find: &str, replace: &str) {
if value.is_array() {
value
.as_array_mut()
.unwrap()
.iter_mut()
.for_each(|v| self.value_replace(v, find, replace));
};
if !value.is_string() {
return;
}
let string = value.as_str().unwrap().to_owned();
let string = string.replace(find, replace);
*value = string.into();
}
fn processor_with_input_and_context(
&mut self,
input: Option<&str>,
context: &Context,
) -> Result<Processor, serde_json::Error> {
let mut processor = self.processor.clone();
let workspace = context.workspace_path().to_str().expect("valid path");
processor
.as_object_mut()
.expect("unexpected serialized data stored in database")
.values_mut()
.for_each(|v| {
v.as_object_mut()
.expect("unexpected serialized data stored in database")
.values_mut()
.for_each(|v| {
self.value_replace(v, "{$input}", input.as_ref().unwrap_or(&""));
self.value_replace(v, "${$workspace}", workspace)
})
});
serde_json::from_value(processor)
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub(crate) struct NewTaskStep<'a> {
name: &'a str,
description: Option<&'a str>,
processor: Processor,
position: i32,
started_at: Option<NaiveDateTime>,
finished_at: Option<NaiveDateTime>,
output: Option<&'a str>,
status: Status,
}
impl<'a> NewTaskStep<'a> {
pub(crate) const fn new(
name: &'a str,
description: Option<&'a str>,
processor: Processor,
position: i32,
) -> Self {
Self {
name,
description,
processor,
position,
started_at: None,
finished_at: None,
output: None,
status: Status::Initialized,
}
}
pub(crate) fn add_to_task(
self,
conn: &Database,
task: &Task,
) -> Result<(), Box<dyn error::Error>> {
use crate::schema::task_steps::dsl::*;
self.processor.validate()?;
let values = (
name.eq(&self.name),
description.eq(&self.description),
processor.eq(serde_json::to_value(self.processor)?),
position.eq(self.position),
started_at.eq(self.started_at),
finished_at.eq(self.finished_at),
status.eq(Status::Pending),
output.eq(&self.output),
task_id.eq(task.id),
);
diesel::insert_into(task_steps)
.values(values)
.execute(&**conn)
.map(|_| ())
.map_err(Into::into)
}
}
pub(crate) mod graphql {
use super::*;
use juniper::{object, FieldResult, ID};
#[object(Context = Database)]
impl TaskStep {
fn id() -> ID {
ID::new(self.id.to_string())
}
fn name() -> &str {
&self.name
}
fn description() -> Option<&str> {
self.description.as_ref().map(String::as_ref)
}
fn processor() -> Option<Processor> {
self.processor()
}
fn position() -> i32 {
self.position
}
fn started_at() -> Option<DateTime<Utc>> {
self.started_at.map(|t| DateTime::from_utc(t, Utc))
}
fn finished_at() -> Option<DateTime<Utc>> {
self.finished_at.map(|t| DateTime::from_utc(t, Utc))
}
fn status() -> Status {
self.status
}
fn output() -> Option<&str> {
self.output.as_ref().map(String::as_ref)
}
fn task(context: &Database) -> FieldResult<Option<Task>> {
self.task(context).map(Some).map_err(Into::into)
}
}
}
impl<'a> TryFrom<(&'a Step, &[VariableValue])> for NewTaskStep<'a> {
type Error = serde_json::Error;
fn try_from(
(step, variable_values): (&'a Step, &[VariableValue]),
) -> Result<Self, Self::Error> {
use serde_json::{from_value, Value};
fn replace(value: &mut Value, variable_values: &[VariableValue]) {
if value.is_array() {
value
.as_array_mut()
.unwrap()
.iter_mut()
.for_each(|v| replace(v, variable_values));
};
if !value.is_string() {
return;
}
variable_values.iter().for_each(|vv| {
let string = value.as_str().unwrap().to_owned();
let string = string.replace(&format!("{{{}}}", vv.key), &vv.value);
*value = string.into();
});
}
let mut processor: Value = step.processor.clone();
processor
.as_object_mut()
.expect("unexpected serialized data stored in database")
.values_mut()
.for_each(|v| {
v.as_object_mut()
.expect("unexpected serialized data stored in database")
.values_mut()
.for_each(|v| replace(v, variable_values))
});
Ok(Self::new(
&step.name,
step.description.as_ref().map(String::as_ref),
from_value(processor)?,
step.position,
))
}
}