prefect/
job_status.rs

1use serde::{Deserialize, Serialize};
2use serde_json::value::RawValue;
3use smallvec::SmallVec;
4use std::{
5    fmt::{Debug, Display},
6    str::FromStr,
7};
8use time::{Duration, OffsetDateTime};
9use uuid::Uuid;
10
11use crate::{Error, Queue, Result};
12
13/// Information about the results of a job run.
14#[derive(Debug, Serialize, Deserialize)]
15pub struct RunInfo<T: Send + Debug> {
16    /// If this run succeeded or not.
17    pub success: bool,
18    /// When this run started
19    #[serde(with = "time::serde::timestamp")]
20    pub start: OffsetDateTime,
21    /// When this run ended
22    #[serde(with = "time::serde::timestamp")]
23    pub end: OffsetDateTime,
24    /// Information about the run returned from the task runner function.
25    pub info: T,
26}
27
28/// The current state of a job.
29#[derive(Debug, Copy, Clone, PartialEq, Eq)]
30pub enum JobState {
31    /// The job is waiting to run.
32    Pending,
33    /// The job is currently running.
34    Running,
35    /// The job finished successfully.
36    Succeeded,
37    /// The job failed and exceeded its retry limit. It will not be retried.
38    Failed,
39}
40
41impl JobState {
42    /// Return a string representation of the state.
43    pub fn as_str(&self) -> &'static str {
44        match self {
45            JobState::Pending => "pending",
46            JobState::Running => "running",
47            JobState::Succeeded => "succeeded",
48            JobState::Failed => "failed",
49        }
50    }
51}
52
53impl Display for JobState {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        write!(f, "{}", self.as_str())
56    }
57}
58
59impl FromStr for JobState {
60    type Err = Error;
61
62    fn from_str(s: &str) -> Result<Self> {
63        match s {
64            "pending" => Ok(JobState::Pending),
65            "running" => Ok(JobState::Running),
66            "succeeded" => Ok(JobState::Succeeded),
67            "failed" => Ok(JobState::Failed),
68            _ => Err(Error::InvalidJobState(s.to_string())),
69        }
70    }
71}
72
73/// Status information about a job.
74#[derive(Debug)]
75pub struct JobStatus {
76    /// The job's ID.
77    pub id: Uuid,
78    /// The type of a job
79    pub job_type: String,
80    /// If the job is waiting, running, or finished
81    pub state: JobState,
82    /// Higher priority jobs will be run first.
83    pub priority: i32,
84    /// Higher weight indicates a job counts more against a worker's concurrency.
85    pub weight: u16,
86    /// The original run_at time, before any retries.
87    pub orig_run_at: OffsetDateTime,
88    /// The current run_at time, if the job is pending.
89    pub run_at: Option<OffsetDateTime>,
90    /// The job's payload
91    pub payload: Vec<u8>,
92    /// The current try count, if the job is running or pending.
93    pub current_try: Option<i32>,
94    /// The limit on the number of retries.
95    pub max_retries: i32,
96    /// The multiplier used when calculating the next retry time. See [Retries](crate::Retries).
97    pub backoff_multiplier: f64,
98    /// The random factor used when calculating the next retry time. See [Retries](crate::Retries).
99    pub backoff_randomization: f64,
100    /// The initial delay used when calculating the next retry time. See [Retries](crate::Retries).
101    pub backoff_initial_interval: Duration,
102    /// When the job was added to the queue.
103    pub added_at: OffsetDateTime,
104    /// When the job's last run started.
105    pub started_at: Option<OffsetDateTime>,
106    /// When the job finished.
107    pub finished_at: Option<OffsetDateTime>,
108    /// If currently running, when the job will time out.
109    pub expires_at: Option<OffsetDateTime>,
110    /// Information about each run of the job.
111    pub run_info: SmallVec<[RunInfo<Box<RawValue>>; 4]>,
112}
113
114#[derive(Serialize)]
115pub struct NumActiveJobs {
116    pub pending: u64,
117    pub running: u64,
118}
119
120impl Queue {
121    /// Return information about a job
122    pub async fn get_job_status(&self, external_id: Uuid) -> Result<JobStatus> {
123        let conn = self.state.read_conn_pool.get().await?;
124
125        let status = conn
126            .interact(move |conn| {
127                let mut stmt = conn.prepare_cached(
128                    r##"
129    SELECT jobs.job_type,
130        CASE
131            WHEN active_worker_id IS NOT NULL THEN 'running'
132            WHEN active_jobs.priority IS NOT NULL THEN 'pending'
133            ELSE jobs.status
134        END AS status,
135        jobs.priority, weight, orig_run_at, run_at, payload, current_try,
136        max_retries, backoff_multiplier, backoff_randomization, backoff_initial_interval,
137        added_at,
138        COALESCE(active_jobs.started_at, jobs.started_at) AS started_at,
139        finished_at, expires_at, run_info
140    FROM jobs
141    LEFT JOIN active_jobs USING(job_id)
142    WHERE external_id=?1
143    "##,
144                )?;
145
146                let mut rows = stmt.query_and_then([external_id], |row| {
147                    let started_at = row
148                        .get_ref(13)?
149                        .as_i64_or_null()
150                        .map_err(|e| Error::ColumnType(e.into(), "started_at"))?
151                        .map(|i| {
152                            OffsetDateTime::from_unix_timestamp(i)
153                                .map_err(|_| Error::TimestampOutOfRange("started_at"))
154                        })
155                        .transpose()?;
156
157                    let finished_at = row
158                        .get_ref(14)?
159                        .as_i64_or_null()
160                        .map_err(|e| Error::ColumnType(e.into(), "finished_at"))?
161                        .map(|i| {
162                            OffsetDateTime::from_unix_timestamp(i)
163                                .map_err(|_| Error::TimestampOutOfRange("finished_at"))
164                        })
165                        .transpose()?;
166
167                    let expires_at = row
168                        .get_ref(15)?
169                        .as_i64_or_null()
170                        .map_err(|e| Error::ColumnType(e.into(), "expires_at"))?
171                        .map(|i| {
172                            OffsetDateTime::from_unix_timestamp(i)
173                                .map_err(|_| Error::TimestampOutOfRange("expires_at"))
174                        })
175                        .transpose()?;
176
177                    let run_info_str = row
178                        .get_ref(16)?
179                        .as_str_or_null()
180                        .map_err(|e| Error::ColumnType(e.into(), "run_info"))?;
181                    let run_info: SmallVec<[RunInfo<Box<RawValue>>; 4]> = match run_info_str {
182                        Some(run_info_str) => {
183                            serde_json::from_str(run_info_str).map_err(Error::InvalidJobRunInfo)?
184                        }
185                        None => SmallVec::new(),
186                    };
187
188                    let status = JobStatus {
189                        id: external_id,
190                        job_type: row.get(0).map_err(|e| Error::ColumnType(e, "job_type"))?,
191                        state: row
192                            .get_ref(1)?
193                            .as_str()
194                            .map_err(|e| Error::ColumnType(e.into(), "state"))?
195                            .parse()?,
196                        priority: row.get(2)?,
197                        weight: row.get(3)?,
198                        orig_run_at: OffsetDateTime::from_unix_timestamp(row.get(4)?)
199                            .map_err(|_| Error::TimestampOutOfRange("orig_run_at"))?,
200                        run_at: row
201                            .get_ref(5)?
202                            .as_i64_or_null()
203                            .map_err(|e| Error::ColumnType(e.into(), "run_at"))?
204                            .map(OffsetDateTime::from_unix_timestamp)
205                            .transpose()
206                            .map_err(|_| Error::TimestampOutOfRange("run_at"))?,
207                        payload: row.get(6)?,
208                        current_try: row.get(7)?,
209                        max_retries: row.get(8)?,
210                        backoff_multiplier: row.get(9)?,
211                        backoff_randomization: row.get(10)?,
212                        backoff_initial_interval: Duration::seconds(row.get(11)?),
213                        added_at: OffsetDateTime::from_unix_timestamp(row.get(12)?)
214                            .map_err(|_| Error::TimestampOutOfRange("added_at"))?,
215                        started_at,
216                        finished_at,
217                        expires_at,
218                        run_info,
219                    };
220
221                    Ok::<_, Error>(status)
222                })?;
223
224                let status = rows.next().ok_or(Error::NotFound)??;
225
226                Ok::<_, Error>(status)
227            })
228            .await??;
229
230        Ok(status)
231    }
232
233    /// Return counts about the number of jobs running and waiting to run.
234    pub async fn num_active_jobs(&self) -> Result<NumActiveJobs> {
235        let conn = self.state.read_conn_pool.get().await?;
236        let (total, running): (i64, i64) = conn
237            .interact(move |conn| {
238                let mut stmt = conn.prepare_cached(
239                    r##"SELECT COUNT(*) as total, COUNT(active_worker_id) AS running
240                    FROM active_jobs"##,
241                )?;
242                stmt.query_row([], |row| Ok((row.get(0)?, row.get(1)?)))
243            })
244            .await??;
245
246        Ok(NumActiveJobs {
247            pending: (total - running) as u64,
248            running: running as u64,
249        })
250    }
251}