cala_server/job/
current.rs

1use serde::{de::DeserializeOwned, Serialize};
2use sqlx::{PgPool, Postgres, Transaction};
3
4use super::error::JobError;
5use crate::{integration::*, primitives::JobId};
6
7pub struct CurrentJob {
8    id: JobId,
9    attempt: u32,
10    pool: PgPool,
11    state_json: Option<serde_json::Value>,
12}
13
14impl CurrentJob {
15    pub(super) fn new(
16        id: JobId,
17        attempt: u32,
18        pool: PgPool,
19        state: Option<serde_json::Value>,
20    ) -> Self {
21        Self {
22            id,
23            attempt,
24            pool,
25            state_json: state,
26        }
27    }
28
29    pub fn attempt(&self) -> u32 {
30        self.attempt
31    }
32
33    pub fn state<T: DeserializeOwned>(&self) -> Result<Option<T>, serde_json::Error> {
34        if let Some(state) = self.state_json.as_ref() {
35            serde_json::from_value(state.clone()).map(Some)
36        } else {
37            Ok(None)
38        }
39    }
40
41    pub async fn update_state<T: Serialize>(
42        &mut self,
43        db: &mut Transaction<'_, Postgres>,
44        state: T,
45    ) -> Result<(), JobError> {
46        let state_json = serde_json::to_value(state).map_err(JobError::CouldNotSerializeState)?;
47        sqlx::query!(
48            r#"
49          UPDATE jobs
50          SET state_json = $1
51          WHERE id = $2
52        "#,
53            state_json,
54            self.id as JobId
55        )
56        .execute(&mut **db)
57        .await?;
58        self.state_json = Some(state_json);
59        Ok(())
60    }
61
62    pub fn id(&self) -> JobId {
63        self.id
64    }
65
66    pub fn pool(&self) -> &PgPool {
67        &self.pool
68    }
69
70    pub async fn integration(&self, id: IntegrationId) -> Result<Integration, sqlx::Error> {
71        Integrations::new(self.pool()).find_by_id(id).await
72    }
73}