1use serde::{Deserialize, Serialize};
2use serde_json::value::RawValue;
3use smallvec::SmallVec;
4use std::{
5 fmt::{Debug, Display},
6 str::FromStr,
7};
8use time::{Duration, OffsetDateTime};
9use uuid::Uuid;
10
11use crate::{Error, Queue, Result};
12
13#[derive(Debug, Serialize, Deserialize)]
15pub struct RunInfo<T: Send + Debug> {
16 pub success: bool,
18 #[serde(with = "time::serde::timestamp")]
20 pub start: OffsetDateTime,
21 #[serde(with = "time::serde::timestamp")]
23 pub end: OffsetDateTime,
24 pub info: T,
26}
27
28#[derive(Debug, Copy, Clone, PartialEq, Eq)]
30pub enum JobState {
31 Pending,
33 Running,
35 Succeeded,
37 Failed,
39}
40
41impl JobState {
42 pub fn as_str(&self) -> &'static str {
44 match self {
45 JobState::Pending => "pending",
46 JobState::Running => "running",
47 JobState::Succeeded => "succeeded",
48 JobState::Failed => "failed",
49 }
50 }
51}
52
53impl Display for JobState {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 write!(f, "{}", self.as_str())
56 }
57}
58
59impl FromStr for JobState {
60 type Err = Error;
61
62 fn from_str(s: &str) -> Result<Self> {
63 match s {
64 "pending" => Ok(JobState::Pending),
65 "running" => Ok(JobState::Running),
66 "succeeded" => Ok(JobState::Succeeded),
67 "failed" => Ok(JobState::Failed),
68 _ => Err(Error::InvalidJobState(s.to_string())),
69 }
70 }
71}
72
73#[derive(Debug)]
75pub struct JobStatus {
76 pub id: Uuid,
78 pub job_type: String,
80 pub state: JobState,
82 pub priority: i32,
84 pub weight: u16,
86 pub orig_run_at: OffsetDateTime,
88 pub run_at: Option<OffsetDateTime>,
90 pub payload: Vec<u8>,
92 pub current_try: Option<i32>,
94 pub max_retries: i32,
96 pub backoff_multiplier: f64,
98 pub backoff_randomization: f64,
100 pub backoff_initial_interval: Duration,
102 pub added_at: OffsetDateTime,
104 pub started_at: Option<OffsetDateTime>,
106 pub finished_at: Option<OffsetDateTime>,
108 pub expires_at: Option<OffsetDateTime>,
110 pub run_info: SmallVec<[RunInfo<Box<RawValue>>; 4]>,
112}
113
114#[derive(Serialize)]
115pub struct NumActiveJobs {
116 pub pending: u64,
117 pub running: u64,
118}
119
120impl Queue {
121 pub async fn get_job_status(&self, external_id: Uuid) -> Result<JobStatus> {
123 let conn = self.state.read_conn_pool.get().await?;
124
125 let status = conn
126 .interact(move |conn| {
127 let mut stmt = conn.prepare_cached(
128 r##"
129 SELECT jobs.job_type,
130 CASE
131 WHEN active_worker_id IS NOT NULL THEN 'running'
132 WHEN active_jobs.priority IS NOT NULL THEN 'pending'
133 ELSE jobs.status
134 END AS status,
135 jobs.priority, weight, orig_run_at, run_at, payload, current_try,
136 max_retries, backoff_multiplier, backoff_randomization, backoff_initial_interval,
137 added_at,
138 COALESCE(active_jobs.started_at, jobs.started_at) AS started_at,
139 finished_at, expires_at, run_info
140 FROM jobs
141 LEFT JOIN active_jobs USING(job_id)
142 WHERE external_id=?1
143 "##,
144 )?;
145
146 let mut rows = stmt.query_and_then([external_id], |row| {
147 let started_at = row
148 .get_ref(13)?
149 .as_i64_or_null()
150 .map_err(|e| Error::ColumnType(e.into(), "started_at"))?
151 .map(|i| {
152 OffsetDateTime::from_unix_timestamp(i)
153 .map_err(|_| Error::TimestampOutOfRange("started_at"))
154 })
155 .transpose()?;
156
157 let finished_at = row
158 .get_ref(14)?
159 .as_i64_or_null()
160 .map_err(|e| Error::ColumnType(e.into(), "finished_at"))?
161 .map(|i| {
162 OffsetDateTime::from_unix_timestamp(i)
163 .map_err(|_| Error::TimestampOutOfRange("finished_at"))
164 })
165 .transpose()?;
166
167 let expires_at = row
168 .get_ref(15)?
169 .as_i64_or_null()
170 .map_err(|e| Error::ColumnType(e.into(), "expires_at"))?
171 .map(|i| {
172 OffsetDateTime::from_unix_timestamp(i)
173 .map_err(|_| Error::TimestampOutOfRange("expires_at"))
174 })
175 .transpose()?;
176
177 let run_info_str = row
178 .get_ref(16)?
179 .as_str_or_null()
180 .map_err(|e| Error::ColumnType(e.into(), "run_info"))?;
181 let run_info: SmallVec<[RunInfo<Box<RawValue>>; 4]> = match run_info_str {
182 Some(run_info_str) => {
183 serde_json::from_str(run_info_str).map_err(Error::InvalidJobRunInfo)?
184 }
185 None => SmallVec::new(),
186 };
187
188 let status = JobStatus {
189 id: external_id,
190 job_type: row.get(0).map_err(|e| Error::ColumnType(e, "job_type"))?,
191 state: row
192 .get_ref(1)?
193 .as_str()
194 .map_err(|e| Error::ColumnType(e.into(), "state"))?
195 .parse()?,
196 priority: row.get(2)?,
197 weight: row.get(3)?,
198 orig_run_at: OffsetDateTime::from_unix_timestamp(row.get(4)?)
199 .map_err(|_| Error::TimestampOutOfRange("orig_run_at"))?,
200 run_at: row
201 .get_ref(5)?
202 .as_i64_or_null()
203 .map_err(|e| Error::ColumnType(e.into(), "run_at"))?
204 .map(OffsetDateTime::from_unix_timestamp)
205 .transpose()
206 .map_err(|_| Error::TimestampOutOfRange("run_at"))?,
207 payload: row.get(6)?,
208 current_try: row.get(7)?,
209 max_retries: row.get(8)?,
210 backoff_multiplier: row.get(9)?,
211 backoff_randomization: row.get(10)?,
212 backoff_initial_interval: Duration::seconds(row.get(11)?),
213 added_at: OffsetDateTime::from_unix_timestamp(row.get(12)?)
214 .map_err(|_| Error::TimestampOutOfRange("added_at"))?,
215 started_at,
216 finished_at,
217 expires_at,
218 run_info,
219 };
220
221 Ok::<_, Error>(status)
222 })?;
223
224 let status = rows.next().ok_or(Error::NotFound)??;
225
226 Ok::<_, Error>(status)
227 })
228 .await??;
229
230 Ok(status)
231 }
232
233 pub async fn num_active_jobs(&self) -> Result<NumActiveJobs> {
235 let conn = self.state.read_conn_pool.get().await?;
236 let (total, running): (i64, i64) = conn
237 .interact(move |conn| {
238 let mut stmt = conn.prepare_cached(
239 r##"SELECT COUNT(*) as total, COUNT(active_worker_id) AS running
240 FROM active_jobs"##,
241 )?;
242 stmt.query_row([], |row| Ok((row.get(0)?, row.get(1)?)))
243 })
244 .await??;
245
246 Ok(NumActiveJobs {
247 pending: (total - running) as u64,
248 running: running as u64,
249 })
250 }
251}