Skip to main content

docbox_database/models/
tasks.rs

1use super::document_box::DocumentBoxScopeRaw;
2use crate::{DbExecutor, DbResult, models::document_box::DocumentBoxScopeRawRef};
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use sqlx::{Database, Decode, error::BoxDynError, postgres::PgQueryResult, prelude::FromRow};
6use utoipa::ToSchema;
7use uuid::Uuid;
8
9pub type TaskId = Uuid;
10
11/// Represents a stored asynchronous task progress
12#[derive(Debug, Clone, FromRow, Serialize, ToSchema)]
13pub struct Task {
14    /// Unique ID of the task
15    pub id: Uuid,
16
17    /// ID of the document box the task belongs to
18    pub document_box: DocumentBoxScopeRaw,
19
20    /// Status of the task
21    pub status: TaskStatus,
22
23    /// Output data from the task completion
24    pub output_data: Option<serde_json::Value>,
25
26    /// When the task was created
27    pub created_at: DateTime<Utc>,
28
29    // When execution of the task completed
30    pub completed_at: Option<DateTime<Utc>>,
31}
32
33impl Eq for Task {}
34
35impl PartialEq for Task {
36    fn eq(&self, other: &Self) -> bool {
37        let complete_eq = match (&self.completed_at, &other.completed_at) {
38            (Some(a), Some(b)) => a.timestamp_millis().eq(&b.timestamp_millis()),
39            (None, None) => true,
40            _ => false,
41        };
42
43        self.id.eq(&other.id)
44            && self.document_box.eq(&other.document_box)
45            && self.status.eq(&other.status)
46            && self.output_data.eq(&self.output_data)
47            // Reduce precision when checking creation timestamp
48            // (Database does not store the full precision)
49            && self
50                .created_at
51                .timestamp_millis()
52                .eq(&other.created_at.timestamp_millis())
53            // Reduce precision when checking creation timestamp
54            // (Database does not store the full precision)
55            && complete_eq
56    }
57}
58
59#[derive(
60    Debug,
61    Clone,
62    Copy,
63    strum::EnumString,
64    strum::Display,
65    Deserialize,
66    Serialize,
67    ToSchema,
68    PartialEq,
69    Eq,
70)]
71pub enum TaskStatus {
72    Pending,
73    Completed,
74    Failed,
75}
76
77impl<DB: Database> sqlx::Type<DB> for TaskStatus
78where
79    String: sqlx::Type<DB>,
80{
81    fn type_info() -> DB::TypeInfo {
82        String::type_info()
83    }
84}
85
86impl<'r, DB: Database> Decode<'r, DB> for TaskStatus
87where
88    String: Decode<'r, DB>,
89{
90    fn decode(value: <DB as Database>::ValueRef<'r>) -> Result<Self, BoxDynError> {
91        let value = <String as Decode<DB>>::decode(value)?;
92        Ok(value.parse()?)
93    }
94}
95
96impl Task {
97    /// Stores / updates the stored user data, returns back the user ID
98    pub async fn create(
99        db: impl DbExecutor<'_>,
100        document_box: DocumentBoxScopeRaw,
101    ) -> DbResult<Task> {
102        let task_id = Uuid::new_v4();
103        let status = TaskStatus::Pending;
104        let created_at = Utc::now();
105
106        sqlx::query(
107            r#"
108            INSERT INTO "docbox_tasks" ("id", "document_box", "status", "created_at")
109            VALUES ($1, $2, $3, $4)
110        "#,
111        )
112        .bind(task_id)
113        .bind(document_box.as_str())
114        .bind(status.to_string())
115        .bind(created_at)
116        .execute(db)
117        .await?;
118
119        Ok(Task {
120            id: task_id,
121            document_box,
122            status,
123            output_data: None,
124            created_at,
125            completed_at: None,
126        })
127    }
128
129    pub async fn find(
130        db: impl DbExecutor<'_>,
131        id: TaskId,
132        document_box: DocumentBoxScopeRawRef<'_>,
133    ) -> DbResult<Option<Task>> {
134        sqlx::query_as(r#"SELECT * FROM "docbox_tasks" WHERE "id" = $1 AND "document_box" = $2"#)
135            .bind(id)
136            .bind(document_box)
137            .fetch_optional(db)
138            .await
139    }
140
141    /// Mark the task as completed and set its output data
142    pub async fn complete_task(
143        &mut self,
144        db: impl DbExecutor<'_>,
145        status: TaskStatus,
146        output_data: Option<serde_json::Value>,
147    ) -> DbResult<()> {
148        let completed_at = Utc::now();
149
150        sqlx::query(
151            r#"UPDATE "docbox_tasks" SET
152            "status" = $1,
153            "output_data" = $2,
154            "completed_at" = $3
155            WHERE "id" = $4"#,
156        )
157        .bind(status.to_string())
158        .bind(output_data.clone())
159        .bind(completed_at)
160        .bind(self.id)
161        .execute(db)
162        .await?;
163
164        self.status = status;
165        self.output_data = output_data.clone();
166        self.completed_at = Some(completed_at);
167
168        Ok(())
169    }
170
171    /// Deletes all tasks where the creation date is older than the `before` date
172    pub async fn delete_expired(
173        db: impl DbExecutor<'_>,
174        before: DateTime<Utc>,
175    ) -> DbResult<PgQueryResult> {
176        sqlx::query(r#"DELETE FROM "docbox_tasks" WHERE "created_at" < $1"#)
177            .bind(before)
178            .execute(db)
179            .await
180    }
181}