1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
//! Runnable instantiations of jobs on the task queue
pub mod task_req;

use crate::cron::Cron;
use crate::unix_time::get_cur_unix_time;
use postgres::rows::Rows;
use postgres::Connection;

#[derive(Clone, Debug)]
pub struct Task {
    pub id: i64,

    /// UUID of associated `Job`
    pub job_uuid: String,

    /// Among:
    /// * `pending` - Task is not running nor completed
    /// * `running` - Currently being executed
    /// * `failing` - Task failed, but has some retries remaining
    /// * `failed` - Task failed and won't be restarted
    pub status: String,

    /// Stores result of last run
    pub result: Option<String>,

    /// When to run this task (milliseconds since epoch)
    pub run_at: Option<i64>,

    /// Name of queue to which this task belongs
    pub queue: Option<String>,

    /// Number of attempts made to execute this task
    pub attempts: i32,

    /// Maximum number of attempts allowed
    pub max_attempts: i32,

    pub created_at: i64,
    pub updated_at: i64,

    /// Cron schedule for task
    pub cron: Option<Cron>,

    /// How often (in milliseconds) to execute task
    pub interval: Option<i64>,

    /// Serialized `Job`
    pub job: String,

    /// Task table to which this task belongs
    pub table_name: String,
}

impl Task {
    /// Pop next task from the queue
    pub fn pop_queue(conn: &Connection, queues: &[String], table_name: &str) -> Option<Task> {
        let current_time = get_cur_unix_time();

        let transaction = conn.transaction().expect("Error creating transaction");
        let rows = if queues.is_empty() {
            transaction.query(
                &format!(
                    "with top as (
select id from {table_name} where status = 'pending'
AND (run_at IS NULL OR run_at <= $1)
AND attempts < max_attempts
OR (status = 'failing' AND updated_at + 5000 + (attempts ^ 4) * 1000 <= $1)
order by id limit 1
)
update {table_name}
set status = 'running', attempts = attempts + 1, updated_at = $1
from top
where {table_name}.id = top.id
RETURNING {table_name}.*",
                    table_name = table_name
                ),
                &[&current_time],
            )
        } else {
            transaction.query(
                format!(
                    "with top as (
select id from {table_name} where status = 'pending'
AND (run_at IS NULL OR run_at <= $1) AND queue IN ('{queues}')
AND attempts < max_attempts
OR (status = 'failing' AND updated_at + 5000 + (attempts ^ 4) * 1000 <= $1)
order by id limit 1
)
update {table_name}
set status = 'running', attempts = attempts + 1, updated_at = $1
from top
where {table_name}.id = top.id
RETURNING {table_name}.*",
                    table_name = table_name,
                    queues = queues.join("','")
                )
                .as_str(),
                &[&current_time],
            )
        }
        .expect("Error popping queue");
        transaction.commit().expect("Error committing transaction");
        Task::rows_to_tasks(rows, table_name).first().cloned()
    }

    /// Return all pending tasks
    pub fn get_pending(conn: &Connection, queues: &[String], table_name: &str) -> Vec<Task> {
        Task::get_by_status(conn, queues, "pending", table_name)
    }

    /// Return all finished tasks
    pub fn get_finished(conn: &Connection, queues: &[String], table_name: &str) -> Vec<Task> {
        Task::get_by_status(conn, queues, "finished", table_name)
    }

    /// Return all failed tasks
    pub fn get_failed(conn: &Connection, queues: &[String], table_name: &str) -> Vec<Task> {
        Task::get_by_status(conn, queues, "failed", table_name)
    }

    /// Return task with the provided ID
    pub fn get_task_by_id(id: i64, conn: &Connection, table_name: &str) -> Option<Task> {
        Task::rows_to_tasks(
            conn.query(
                &format!("SELECT * FROM {} WHERE id = $1", table_name),
                &[&id],
            )
            .expect("error finding task"),
            table_name,
        )
        .first()
        .cloned()
    }

    pub fn delete(&self, conn: &Connection) {
        conn.execute(
            &format!("DELETE FROM {} WHERE id = $1", self.table_name),
            &[&self.id],
        )
        .expect("error deleting task");
    }

    fn get_by_status(
        conn: &Connection,
        queues: &[String],
        status: &str,
        table_name: &str,
    ) -> Vec<Task> {
        let current_time = get_cur_unix_time();
        if queues.is_empty() {
            Task::rows_to_tasks(
                conn.query(
                    &format!(
                        "SELECT * FROM {} WHERE status = $2 AND (run_at IS NULL OR run_at <= $1)",
                        table_name
                    ),
                    &[&current_time, &status],
                )
                .expect("error loading pending tasks"),
                table_name,
            )
        } else {
            let query = format!("SELECT * FROM {} WHERE status = $1 AND (run_at IS NULL OR run_at <= $2) AND queue IN ('{}')", table_name, queues.join("','"));
            Task::rows_to_tasks(
                conn.query(&query, &[&status, &current_time])
                    .expect("error loading pending tasks"),
                table_name,
            )
        }
    }

    /// Finish task
    pub fn finished(&self, res: &str, conn: &Connection) {
        self.set_status(conn, "finished", Some(res.to_owned()));
    }

    /// Reschedule task to run in the future
    pub fn reschedule(&self, res: &str, when: i64, conn: &Connection) {
        let updated_at = get_cur_unix_time();
        conn.execute(
            &format!("UPDATE {} SET status = 'pending', run_at = $1, updated_at = $2, result = $4, attempts = 0 WHERE id = $3", &self.table_name),
            &[&when, &updated_at, &self.id, &res],
        ).expect("Error rescheduling task");
    }

    /// Fail task
    pub fn failed(&self, err: Option<String>, conn: &Connection) {
        self.set_status(conn, "failed", err);
    }

    /// Mark task as failing to be retried again
    pub fn failing(&self, err: Option<String>, conn: &Connection) {
        self.set_status(conn, "failing", err);
    }

    /// Mark task as pending again
    pub fn release(&self, conn: &Connection) {
        self.set_status(conn, "pending", None);
    }

    fn set_status(&self, conn: &Connection, stat: &str, res: Option<String>) {
        let updated_at = get_cur_unix_time();
        conn.execute(
            &format!(
                "UPDATE {} SET status = $1, result = $3, updated_at = $4 WHERE id = $2",
                &self.table_name
            ),
            &[&stat, &self.id, &res, &updated_at],
        )
        .expect("Error updating task status");
    }

    /// Convert Postgres rows to a list of tasks
    pub fn rows_to_tasks(rows: Rows, table_name: &str) -> Vec<Task> {
        rows.iter()
            .map(|row| {
                let cron_str: String = row.get(10);
                Task {
                    id: row.get(0),
                    job_uuid: row.get(1),
                    status: row.get(2),
                    result: row.get(3),
                    run_at: row.get(4),
                    queue: row.get(5),
                    attempts: row.get(6),
                    max_attempts: row.get(7),
                    created_at: row.get(8),
                    updated_at: row.get(9),
                    cron: serde_json::from_str(&cron_str).unwrap(),
                    interval: row.get(11),
                    job: row.get(12),
                    table_name: table_name.to_string(),
                }
            })
            .collect()
    }

    /// Reload task from DB
    pub fn reload(&self, conn: &Connection) -> Task {
        Task::get_task_by_id(self.id, conn, &self.table_name).unwrap()
    }
}