use crate::resources::{NewTaskStep, Pipeline, TaskStep, TaskStepStatus, VariableValue};
use crate::schema::tasks;
use crate::Database;
use automaat_core::Context;
use diesel::prelude::*;
use juniper::GraphQLEnum;
use serde::{Deserialize, Serialize};
use std::convert::{Into, TryInto};
use std::error;
use std::thread;
pub(crate) mod step;
#[derive(Clone, Copy, Debug, Serialize, Deserialize, GraphQLEnum, DbEnum)]
#[PgType = "TaskStatus"]
#[graphql(name = "TaskStatus")]
pub(crate) enum Status {
Scheduled,
Pending,
Running,
Failed,
Cancelled,
Ok,
}
impl From<TaskStepStatus> for Status {
fn from(status: TaskStepStatus) -> Self {
use Status::*;
match status {
TaskStepStatus::Initialized => Scheduled,
TaskStepStatus::Pending => Pending,
TaskStepStatus::Running => Running,
TaskStepStatus::Failed => Failed,
TaskStepStatus::Cancelled => Cancelled,
TaskStepStatus::Ok => Ok,
}
}
}
#[derive(
Clone, Debug, Deserialize, Serialize, AsChangeset, Associations, Identifiable, Queryable,
)]
#[belongs_to(Pipeline, foreign_key = "pipeline_reference")]
#[table_name = "tasks"]
pub(crate) struct Task {
pub(crate) id: i32,
pub(crate) name: String,
pub(crate) description: Option<String>,
pub(crate) status: Status,
pub(crate) pipeline_reference: Option<i32>,
}
impl Task {
pub(crate) fn as_running(&mut self, conn: &Database) -> QueryResult<Self> {
self.status = Status::Running;
self.save_changes(&**conn)
}
pub(crate) fn as_failed(&mut self, conn: &Database) -> QueryResult<Self> {
self.status = Status::Failed;
self.save_changes(&**conn)
}
pub(crate) fn pipeline(&self, conn: &Database) -> QueryResult<Option<Pipeline>> {
use crate::schema::pipelines::dsl::*;
match self.pipeline_reference {
None => Ok(None),
Some(pipeline_id) => pipelines
.filter(id.eq(pipeline_id))
.first(&**conn)
.optional(),
}
}
pub(crate) fn steps(&self, conn: &Database) -> QueryResult<Vec<TaskStep>> {
use crate::schema::task_steps::dsl::*;
TaskStep::belonging_to(self)
.order(position.asc())
.load(&**conn)
}
pub(crate) fn enqueue(&mut self, conn: &Database) -> QueryResult<Self> {
self.status = Status::Pending;
self.save_changes(&**conn)
}
pub(crate) fn run(&self, conn: &Database) -> Result<(), Box<dyn error::Error>> {
use crate::schema::tasks::dsl::*;
let data: Option<String> = None;
let context = Context::new()?;
let mut steps = self.steps(conn)?;
let _ = steps.iter_mut().try_fold(data, |input, step| {
step.run(conn, &context, input.as_ref().map(String::as_str))
})?;
match steps.last() {
Some(step) => diesel::update(self)
.set(status.eq(Status::from(step.status)))
.execute(&**conn)
.map(|_| ())
.map_err(Into::into),
None => Ok(()),
}
}
}
pub(crate) fn poll(conn: &Database) {
loop {
let result = conn
.transaction(|| {
use crate::schema::tasks::dsl::*;
tasks
.filter(status.eq(Status::Pending))
.load::<Task>(&**conn)?
.into_iter()
.map(|mut task| task.as_running(conn))
.collect::<Result<Vec<_>, _>>()
})
.map_err(Into::into)
.and_then(|tasks| {
tasks.into_iter().try_for_each(|mut task| {
task.run(conn).or_else(|err| {
let _ = task.as_failed(conn)?;
Err(err)
})
})
});
if let Err(err) = result {
eprintln!("failed to run task: {}", err);
}
thread::sleep(std::time::Duration::from_millis(1000));
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub(crate) struct NewTask<'a> {
name: &'a str,
description: Option<&'a str>,
status: Status,
pipeline_reference: Option<i32>,
steps: Vec<NewTaskStep<'a>>,
}
impl<'a> NewTask<'a> {
pub(crate) fn new(name: &'a str, description: Option<&'a str>) -> Self {
Self {
name,
description,
status: Status::Pending,
pipeline_reference: None,
steps: vec![],
}
}
pub(crate) fn create_from_pipeline(
conn: &Database,
pipeline: &'a Pipeline,
variable_values: &[VariableValue],
) -> Result<Task, Box<dyn error::Error>> {
let steps = pipeline.steps(conn)?;
let steps = steps
.iter()
.map(|s| (s, variable_values))
.map(TryInto::try_into)
.collect::<Result<_, _>>()?;
let mut task = Self::new(
&pipeline.name,
pipeline.description.as_ref().map(String::as_ref),
);
task.with_pipeline_reference(pipeline.id);
task.with_steps(steps);
task.create(conn).map_err(Into::into)
}
pub(crate) fn with_pipeline_reference(&mut self, pipeline_id: i32) {
self.pipeline_reference = Some(pipeline_id)
}
fn with_steps(&mut self, mut steps: Vec<NewTaskStep<'a>>) {
self.steps.append(&mut steps)
}
pub(crate) fn create(self, conn: &Database) -> Result<Task, Box<dyn error::Error>> {
use crate::schema::tasks::dsl::*;
let mut task_name = self.name.to_owned();
if let Some(pipeline_id) = self.pipeline_reference {
use crate::schema::tasks::dsl::*;
let pipeline: Pipeline = {
use crate::schema::pipelines::dsl::*;
pipelines.filter(id.eq(pipeline_id)).first(&**conn)
}?;
let total = tasks
.filter(pipeline_reference.eq(pipeline_id))
.count()
.get_result::<i64>(&**conn)?;
task_name = format!("{} #{}", pipeline.name, total + 1);
}
conn.transaction(|| {
let values = (
name.eq(&task_name),
description.eq(&self.description),
status.eq(self.status),
pipeline_reference.eq(self.pipeline_reference),
);
let task = diesel::insert_into(tasks)
.values(&values)
.get_result(&**conn)?;
self.steps
.into_iter()
.try_for_each(|s| s.add_to_task(conn, &task))?;
Ok(task)
})
}
}
pub(crate) mod graphql {
use super::*;
use crate::resources::VariableValueInput;
use juniper::{object, FieldResult, GraphQLInputObject, ID};
#[derive(Clone, Debug, Deserialize, Serialize, GraphQLInputObject)]
pub(crate) struct CreateTaskFromPipelineInput {
#[serde(with = "juniper_serde")]
pub(crate) pipeline_id: ID,
pub(crate) variables: Vec<VariableValueInput>,
}
#[derive(Clone, Debug, Deserialize, Serialize, GraphQLInputObject)]
pub(crate) struct TaskVariableInput {
pub(crate) key: String,
pub(crate) value: String,
}
#[object(Context = Database)]
impl Task {
fn id() -> ID {
ID::new(self.id.to_string())
}
fn name() -> &str {
self.name.as_ref()
}
fn description() -> Option<&str> {
self.description.as_ref().map(String::as_ref)
}
fn status() -> Status {
self.status
}
fn steps(context: &Database) -> FieldResult<Option<Vec<TaskStep>>> {
self.steps(context).map(Some).map_err(Into::into)
}
fn pipeline(context: &Database) -> FieldResult<Option<Pipeline>> {
self.pipeline(context).map_err(Into::into)
}
}
}