docbox_database/models/
tasks.rs1use std::future::Future;
2
3use crate::{DbExecutor, DbPool, DbResult};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use sqlx::{Database, Decode, error::BoxDynError, prelude::FromRow};
7use utoipa::ToSchema;
8use uuid::Uuid;
9
10use super::document_box::DocumentBoxScopeRaw;
11
12pub type TaskId = Uuid;
13
14#[derive(Debug, FromRow, Serialize, ToSchema)]
16pub struct Task {
17 pub id: Uuid,
19
20 pub document_box: DocumentBoxScopeRaw,
22
23 pub status: TaskStatus,
25
26 pub output_data: Option<serde_json::Value>,
28
29 pub created_at: DateTime<Utc>,
31
32 pub completed_at: Option<DateTime<Utc>>,
34}
35
36#[derive(
37 Debug, Clone, Copy, strum::EnumString, strum::Display, Deserialize, Serialize, ToSchema,
38)]
39pub enum TaskStatus {
40 Pending,
41 Completed,
42 Failed,
43}
44
45impl<DB: Database> sqlx::Type<DB> for TaskStatus
46where
47 String: sqlx::Type<DB>,
48{
49 fn type_info() -> DB::TypeInfo {
50 String::type_info()
51 }
52}
53
54impl<'r, DB: Database> Decode<'r, DB> for TaskStatus
55where
56 String: Decode<'r, DB>,
57{
58 fn decode(value: <DB as Database>::ValueRef<'r>) -> Result<Self, BoxDynError> {
59 let value = <String as Decode<DB>>::decode(value)?;
60 Ok(value.parse()?)
61 }
62}
63
64pub async fn background_task<Fut>(
65 db: DbPool,
66 scope: DocumentBoxScopeRaw,
67 future: Fut,
68) -> DbResult<(TaskId, DateTime<Utc>)>
69where
70 Fut: Future<Output = (TaskStatus, serde_json::Value)> + Send + 'static,
71{
72 let task = Task::create(&db, scope).await?;
74
75 let task_id = task.id;
76 let created_at = task.created_at;
77
78 tokio::spawn(async move {
80 let (status, output) = future.await;
81
82 if let Err(cause) = task.complete_task(&db, status, Some(output)).await {
84 tracing::error!(?cause, "failed to mark task as complete");
85 }
86 });
87
88 Ok((task_id, created_at))
89}
90
91impl Task {
92 pub async fn create(
94 db: impl DbExecutor<'_>,
95 document_box: DocumentBoxScopeRaw,
96 ) -> DbResult<Task> {
97 let task_id = Uuid::new_v4();
98 let status = TaskStatus::Pending;
99 let created_at = Utc::now();
100
101 sqlx::query(
102 r#"
103 INSERT INTO "docbox_tasks" ("id", "document_box", "status", "created_at")
104 VALUES ($1, $2, $3, $4)
105 "#,
106 )
107 .bind(task_id)
108 .bind(document_box.as_str())
109 .bind(status.to_string())
110 .bind(created_at)
111 .execute(db)
112 .await?;
113
114 Ok(Task {
115 id: task_id,
116 document_box,
117 status,
118 output_data: None,
119 created_at,
120 completed_at: None,
121 })
122 }
123
124 pub async fn find(
125 db: impl DbExecutor<'_>,
126 id: TaskId,
127 document_box: &DocumentBoxScopeRaw,
128 ) -> DbResult<Option<Task>> {
129 sqlx::query_as(r#"SELECT * FROM "docbox_tasks" WHERE "id" = $1 AND "document_box" = $2"#)
130 .bind(id)
131 .bind(document_box)
132 .fetch_optional(db)
133 .await
134 }
135
136 pub async fn complete_task(
138 mut self,
139 db: impl DbExecutor<'_>,
140 status: TaskStatus,
141 output_data: Option<serde_json::Value>,
142 ) -> DbResult<Task> {
143 let completed_at = Utc::now();
144
145 sqlx::query(
146 r#"UPDATE "docbox_tasks" SET
147 "status" = $1,
148 "output_data" = $2,
149 "completed_at" = $3
150 WHERE "id" = $4"#,
151 )
152 .bind(status.to_string())
153 .bind(output_data.clone())
154 .bind(completed_at)
155 .bind(self.id)
156 .execute(db)
157 .await?;
158
159 self.status = status;
160 self.output_data = output_data.clone();
161 self.completed_at = Some(completed_at);
162
163 Ok(self)
164 }
165}