docbox_database/models/
tasks.rs

1use std::future::Future;
2
3use crate::{DbExecutor, DbPool, DbResult};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use sqlx::{Database, Decode, error::BoxDynError, prelude::FromRow};
7use utoipa::ToSchema;
8use uuid::Uuid;
9
10use super::document_box::DocumentBoxScopeRaw;
11
12pub type TaskId = Uuid;
13
14/// Represents a stored asynchronous task progress
15#[derive(Debug, FromRow, Serialize, ToSchema)]
16pub struct Task {
17    /// Unique ID of the task
18    pub id: Uuid,
19
20    /// ID of the document box the task belongs to
21    pub document_box: DocumentBoxScopeRaw,
22
23    /// Status of the task
24    pub status: TaskStatus,
25
26    /// Output data from the task completion
27    pub output_data: Option<serde_json::Value>,
28
29    /// When the task was created
30    pub created_at: DateTime<Utc>,
31
32    // When execution of the task completed
33    pub completed_at: Option<DateTime<Utc>>,
34}
35
36#[derive(
37    Debug, Clone, Copy, strum::EnumString, strum::Display, Deserialize, Serialize, ToSchema,
38)]
39pub enum TaskStatus {
40    Pending,
41    Completed,
42    Failed,
43}
44
45impl<DB: Database> sqlx::Type<DB> for TaskStatus
46where
47    String: sqlx::Type<DB>,
48{
49    fn type_info() -> DB::TypeInfo {
50        String::type_info()
51    }
52}
53
54impl<'r, DB: Database> Decode<'r, DB> for TaskStatus
55where
56    String: Decode<'r, DB>,
57{
58    fn decode(value: <DB as Database>::ValueRef<'r>) -> Result<Self, BoxDynError> {
59        let value = <String as Decode<DB>>::decode(value)?;
60        Ok(value.parse()?)
61    }
62}
63
64pub async fn background_task<Fut>(
65    db: DbPool,
66    scope: DocumentBoxScopeRaw,
67    future: Fut,
68) -> DbResult<(TaskId, DateTime<Utc>)>
69where
70    Fut: Future<Output = (TaskStatus, serde_json::Value)> + Send + 'static,
71{
72    // Create task for progression
73    let task = Task::create(&db, scope).await?;
74
75    let task_id = task.id;
76    let created_at = task.created_at;
77
78    // Swap background task
79    tokio::spawn(async move {
80        let (status, output) = future.await;
81
82        // Update task completion
83        if let Err(cause) = task.complete_task(&db, status, Some(output)).await {
84            tracing::error!(?cause, "failed to mark task as complete");
85        }
86    });
87
88    Ok((task_id, created_at))
89}
90
91impl Task {
92    /// Stores / updates the stored user data, returns back the user ID
93    pub async fn create(
94        db: impl DbExecutor<'_>,
95        document_box: DocumentBoxScopeRaw,
96    ) -> DbResult<Task> {
97        let task_id = Uuid::new_v4();
98        let status = TaskStatus::Pending;
99        let created_at = Utc::now();
100
101        sqlx::query(
102            r#"
103            INSERT INTO "docbox_tasks" ("id", "document_box", "status", "created_at") 
104            VALUES ($1, $2, $3, $4)
105        "#,
106        )
107        .bind(task_id)
108        .bind(document_box.as_str())
109        .bind(status.to_string())
110        .bind(created_at)
111        .execute(db)
112        .await?;
113
114        Ok(Task {
115            id: task_id,
116            document_box,
117            status,
118            output_data: None,
119            created_at,
120            completed_at: None,
121        })
122    }
123
124    pub async fn find(
125        db: impl DbExecutor<'_>,
126        id: TaskId,
127        document_box: &DocumentBoxScopeRaw,
128    ) -> DbResult<Option<Task>> {
129        sqlx::query_as(r#"SELECT * FROM "docbox_tasks" WHERE "id" = $1 AND "document_box" = $2"#)
130            .bind(id)
131            .bind(document_box)
132            .fetch_optional(db)
133            .await
134    }
135
136    /// Mark the task as completed and set its output data
137    pub async fn complete_task(
138        mut self,
139        db: impl DbExecutor<'_>,
140        status: TaskStatus,
141        output_data: Option<serde_json::Value>,
142    ) -> DbResult<Task> {
143        let completed_at = Utc::now();
144
145        sqlx::query(
146            r#"UPDATE "docbox_tasks" SET 
147            "status" = $1, 
148            "output_data" = $2, 
149            "completed_at" = $3
150            WHERE "id" = $4"#,
151        )
152        .bind(status.to_string())
153        .bind(output_data.clone())
154        .bind(completed_at)
155        .bind(self.id)
156        .execute(db)
157        .await?;
158
159        self.status = status;
160        self.output_data = output_data.clone();
161        self.completed_at = Some(completed_at);
162
163        Ok(self)
164    }
165}