automaat-server 0.1.0

HTTP API for the Automaat automation utility.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
//! A [`Task`] is a job that is scheduled to be ran, or already ran in the past.
//!
//! It is similar to a [`Pipeline`], but a pipeline represents a set of steps
//! that _can be ran_ by providing a set of variables, whereas a task represents
//! a set of steps that are _ready to run_ and have their variables swapped for
//! real values.

use crate::resources::{NewTaskStep, Pipeline, TaskStep, TaskStepStatus, VariableValue};
use crate::schema::tasks;
use crate::Database;
use automaat_core::Context;
use diesel::prelude::*;
use juniper::GraphQLEnum;
use serde::{Deserialize, Serialize};
use std::convert::{Into, TryInto};
use std::error;
use std::thread;

pub(crate) mod step;

/// The status of the [`Task`].
#[derive(Clone, Copy, Debug, Serialize, Deserialize, GraphQLEnum, DbEnum)]
#[PgType = "TaskStatus"]
#[graphql(name = "TaskStatus")]
pub(crate) enum Status {
    /// The task is scheduled for execution in the future.
    Scheduled,

    /// The task is ready to be executed and waiting for the scheduler to pick
    /// it up.
    Pending,

    /// The task is currently running its steps one by one.
    Running,

    /// One of the task steps failed, resulting in the task itself to fail.
    Failed,

    /// The task was cancelled.
    Cancelled,

    /// All task steps ran successfully.
    Ok,
}

impl From<TaskStepStatus> for Status {
    fn from(status: TaskStepStatus) -> Self {
        use Status::*;

        match status {
            TaskStepStatus::Initialized => Scheduled,
            TaskStepStatus::Pending => Pending,
            TaskStepStatus::Running => Running,
            TaskStepStatus::Failed => Failed,
            TaskStepStatus::Cancelled => Cancelled,
            TaskStepStatus::Ok => Ok,
        }
    }
}

#[derive(
    Clone, Debug, Deserialize, Serialize, AsChangeset, Associations, Identifiable, Queryable,
)]
#[belongs_to(Pipeline, foreign_key = "pipeline_reference")]
#[table_name = "tasks"]
/// The model representing a task stored in the database.
pub(crate) struct Task {
    pub(crate) id: i32,
    pub(crate) name: String,
    pub(crate) description: Option<String>,
    pub(crate) status: Status,

    // This is a weak reference, meaning that pipelines can be removed, which
    // breaks the link between a task and the pipeline it was created from. This
    // is acceptable, it just means the UI can't link back to the pipeline.
    //
    // Similarly, a task can be created separately from a reference, in which
    // case this field is also `None`.
    pub(crate) pipeline_reference: Option<i32>,
}

impl Task {
    pub(crate) fn as_running(&mut self, conn: &Database) -> QueryResult<Self> {
        self.status = Status::Running;
        self.save_changes(&**conn)
    }

    pub(crate) fn as_failed(&mut self, conn: &Database) -> QueryResult<Self> {
        self.status = Status::Failed;
        self.save_changes(&**conn)
    }

    pub(crate) fn pipeline(&self, conn: &Database) -> QueryResult<Option<Pipeline>> {
        use crate::schema::pipelines::dsl::*;

        match self.pipeline_reference {
            None => Ok(None),
            Some(pipeline_id) => pipelines
                .filter(id.eq(pipeline_id))
                .first(&**conn)
                .optional(),
        }
    }

    pub(crate) fn steps(&self, conn: &Database) -> QueryResult<Vec<TaskStep>> {
        use crate::schema::task_steps::dsl::*;

        TaskStep::belonging_to(self)
            .order(position.asc())
            .load(&**conn)
    }

    /// Mark a task ready to run by changing its status to `Pending`.
    pub(crate) fn enqueue(&mut self, conn: &Database) -> QueryResult<Self> {
        self.status = Status::Pending;
        self.save_changes(&**conn)
    }

    // TODO: implement some kind of `TaskRunner`, that has a reference to
    // &Database, and then impl `Drop` so that if the runner stops, we can check
    // the result, and update the database based on the final status.
    pub(crate) fn run(&self, conn: &Database) -> Result<(), Box<dyn error::Error>> {
        use crate::schema::tasks::dsl::*;

        let data: Option<String> = None;
        let context = Context::new()?;
        let mut steps = self.steps(conn)?;

        let _ = steps.iter_mut().try_fold(data, |input, step| {
            step.run(conn, &context, input.as_ref().map(String::as_str))
        })?;

        // TODO: need to test this, I believe this will always take the status
        // of the last step, which might not be the step that failed.
        match steps.last() {
            Some(step) => diesel::update(self)
                .set(status.eq(Status::from(step.status)))
                .execute(&**conn)
                .map(|_| ())
                .map_err(Into::into),
            None => Ok(()),
        }
    }
}

/// This is the top-level task runner that gets executed when the server is
/// booted. It continuously polls the database for new tasks with status
/// `Pending`, and will run them.
pub(crate) fn poll(conn: &Database) {
    loop {
        // Fetch all pending tasks, and set them to running in one transaction,
        // after that, we'll start running them one by one...
        let result = conn
            .transaction(|| {
                use crate::schema::tasks::dsl::*;
                tasks
                    .filter(status.eq(Status::Pending))
                    .load::<Task>(&**conn)?
                    .into_iter()
                    .map(|mut task| task.as_running(conn))
                    .collect::<Result<Vec<_>, _>>()
            })
            .map_err(Into::into)
            .and_then(|tasks| {
                tasks.into_iter().try_for_each(|mut task| {
                    task.run(conn).or_else(|err| {
                        let _ = task.as_failed(conn)?;
                        Err(err)
                    })
                })
            });

        if let Err(err) = result {
            eprintln!("failed to run task: {}", err);
        }

        thread::sleep(std::time::Duration::from_millis(1000));
    }
}

/// Contains all the details needed to store a task in the database.
///
/// The fields are private, use [`NewTask::new`] to initialize this struct.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub(crate) struct NewTask<'a> {
    name: &'a str,
    description: Option<&'a str>,
    status: Status,
    pipeline_reference: Option<i32>,
    steps: Vec<NewTaskStep<'a>>,
}

impl<'a> NewTask<'a> {
    /// Initialize a `NewTask` struct, which can be inserted into the
    /// database using the [`NewTask#create`] method.
    pub(crate) fn new(name: &'a str, description: Option<&'a str>) -> Self {
        Self {
            name,
            description,
            status: Status::Pending,
            pipeline_reference: None,
            steps: vec![],
        }
    }

    pub(crate) fn create_from_pipeline(
        conn: &Database,
        pipeline: &'a Pipeline,
        variable_values: &[VariableValue],
    ) -> Result<Task, Box<dyn error::Error>> {
        let steps = pipeline.steps(conn)?;
        let steps = steps
            .iter()
            .map(|s| (s, variable_values))
            .map(TryInto::try_into)
            .collect::<Result<_, _>>()?;

        let mut task = Self::new(
            &pipeline.name,
            pipeline.description.as_ref().map(String::as_ref),
        );
        task.with_pipeline_reference(pipeline.id);
        task.with_steps(steps);

        task.create(conn).map_err(Into::into)
    }

    pub(crate) fn with_pipeline_reference(&mut self, pipeline_id: i32) {
        self.pipeline_reference = Some(pipeline_id)
    }

    /// Attach zero or more steps to this pipeline.
    ///
    /// `NewPipeline` takes ownership of the steps, but you are required to
    /// call [`NewPipeline#create`] to persist the pipeline and its steps.
    ///
    /// Can be called multiple times to append more steps.
    fn with_steps(&mut self, mut steps: Vec<NewTaskStep<'a>>) {
        self.steps.append(&mut steps)
    }

    /// Persist the task into the database.
    pub(crate) fn create(self, conn: &Database) -> Result<Task, Box<dyn error::Error>> {
        use crate::schema::tasks::dsl::*;

        let mut task_name = self.name.to_owned();

        // Task names are unique over (name, pipeline_reference). If a reference
        // exists, we add a count (such as "My Task #3") to the name of the
        // pipeline, based on the total amount of tasks for that pipeline ID.
        //
        // Non-pipeline based tasks will simply return an error if their name
        // isn't unique.
        if let Some(pipeline_id) = self.pipeline_reference {
            use crate::schema::tasks::dsl::*;

            let pipeline: Pipeline = {
                use crate::schema::pipelines::dsl::*;
                pipelines.filter(id.eq(pipeline_id)).first(&**conn)
            }?;

            let total = tasks
                .filter(pipeline_reference.eq(pipeline_id))
                .count()
                .get_result::<i64>(&**conn)?;

            task_name = format!("{} #{}", pipeline.name, total + 1);
        }

        conn.transaction(|| {
            // waiting on https://github.com/diesel-rs/diesel/issues/860
            let values = (
                name.eq(&task_name),
                description.eq(&self.description),
                status.eq(self.status),
                pipeline_reference.eq(self.pipeline_reference),
            );

            let task = diesel::insert_into(tasks)
                .values(&values)
                .get_result(&**conn)?;

            self.steps
                .into_iter()
                .try_for_each(|s| s.add_to_task(conn, &task))?;

            Ok(task)
        })
    }
}

pub(crate) mod graphql {
    //! All GraphQL related functionality is encapsulated in this module. The
    //! relevant functions and structs are re-exported through
    //! [`crate::graphql`].
    //!
    //! API documentation in this module is also used in the GraphQL API itself
    //! as documentation for the clients.
    //!
    //! You can browse to `/graphql/playground` to see all relevant query,
    //! mutation, and type documentation.

    use super::*;
    use crate::resources::VariableValueInput;
    use juniper::{object, FieldResult, GraphQLInputObject, ID};

    /// Contains all the data needed to create a new `Pipeline`.
    #[derive(Clone, Debug, Deserialize, Serialize, GraphQLInputObject)]
    pub(crate) struct CreateTaskFromPipelineInput {
        /// The `id` of the pipeline from which to create this task.
        #[serde(with = "juniper_serde")]
        pub(crate) pipeline_id: ID,

        /// An optional list of variable values required by the pipeline.
        ///
        /// Note that the eventual `Task` object has no concept of "variables".
        ///
        /// The provided variable values are used in-place of the templated
        /// variables in the pipeline before creating the task. The final step
        /// configs are then stored alongside the task in the database.
        pub(crate) variables: Vec<VariableValueInput>,
    }

    /// Contains all the data needed to replace templated processor
    /// configurations.
    #[derive(Clone, Debug, Deserialize, Serialize, GraphQLInputObject)]
    pub(crate) struct TaskVariableInput {
        pub(crate) key: String,
        pub(crate) value: String,
    }

    #[object(Context = Database)]
    impl Task {
        /// The unique identifier for a specific task.
        fn id() -> ID {
            ID::new(self.id.to_string())
        }

        /// A unique and descriptive name of the task.
        fn name() -> &str {
            self.name.as_ref()
        }

        /// An (optional) detailed description of the functionality provided by
        /// this task.
        ///
        /// A description _might_ be markdown formatted, and should be parsed
        /// accordingly by the client.
        fn description() -> Option<&str> {
            self.description.as_ref().map(String::as_ref)
        }

        /// The status of the task.
        fn status() -> Status {
            self.status
        }

        /// The steps belonging to the task.
        ///
        /// This field can return `null`, but _only_ if a database error
        /// prevents the data from being retrieved.
        ///
        /// If no steps are attached to a task, an empty array is returned
        /// instead.
        ///
        /// If a `null` value is returned, it is up to the client to decide the
        /// best course of action. The following actions are advised, sorted by
        /// preference:
        ///
        /// 1. continue execution if the information is not critical to success,
        /// 2. retry the request to try and get the relevant information,
        /// 3. disable parts of the application reliant on the information,
        /// 4. show a global error, and ask the user to retry.
        fn steps(context: &Database) -> FieldResult<Option<Vec<TaskStep>>> {
            self.steps(context).map(Some).map_err(Into::into)
        }

        /// The pipeline from which the task was created.
        ///
        /// A task _can_ but _does not have to_ be created from an existing
        /// pipeline.
        ///
        /// If a task was created from a pipeline, this will return the relevant
        /// `Pipeline` object.
        ///
        /// If a task was not created from an existing pipeline, this will
        /// return `null`.
        ///
        /// If a pipeline has been removed since the task was created, this will
        /// also return `null`.
        ///
        /// There is also the possibility of this task being created from a
        /// pipeline, but the database lookup to fetch the pipeline details
        /// failed. In this case, the value will also be `null`, but an `errors`
        /// object will be attached to the result, explaining the problem that
        /// occurred.
        ///
        /// If a `null` value is returned as the result of a lookup error, it is
        /// up to the client to decide the best course of action. The following
        /// actions are advised, sorted by preference:
        ///
        /// 1. continue execution if the information is not critical to success,
        /// 2. retry the request to try and get the relevant information,
        /// 3. disable parts of the application reliant on the information,
        /// 4. show a global error, and ask the user to retry.
        fn pipeline(context: &Database) -> FieldResult<Option<Pipeline>> {
            self.pipeline(context).map_err(Into::into)
        }
    }
}