cala_server/job/
current.rs1use serde::{de::DeserializeOwned, Serialize};
2use sqlx::{PgPool, Postgres, Transaction};
3
4use super::error::JobError;
5use crate::{integration::*, primitives::JobId};
6
7pub struct CurrentJob {
8 id: JobId,
9 attempt: u32,
10 pool: PgPool,
11 state_json: Option<serde_json::Value>,
12}
13
14impl CurrentJob {
15 pub(super) fn new(
16 id: JobId,
17 attempt: u32,
18 pool: PgPool,
19 state: Option<serde_json::Value>,
20 ) -> Self {
21 Self {
22 id,
23 attempt,
24 pool,
25 state_json: state,
26 }
27 }
28
29 pub fn attempt(&self) -> u32 {
30 self.attempt
31 }
32
33 pub fn state<T: DeserializeOwned>(&self) -> Result<Option<T>, serde_json::Error> {
34 if let Some(state) = self.state_json.as_ref() {
35 serde_json::from_value(state.clone()).map(Some)
36 } else {
37 Ok(None)
38 }
39 }
40
41 pub async fn update_state<T: Serialize>(
42 &mut self,
43 db: &mut Transaction<'_, Postgres>,
44 state: T,
45 ) -> Result<(), JobError> {
46 let state_json = serde_json::to_value(state).map_err(JobError::CouldNotSerializeState)?;
47 sqlx::query!(
48 r#"
49 UPDATE jobs
50 SET state_json = $1
51 WHERE id = $2
52 "#,
53 state_json,
54 self.id as JobId
55 )
56 .execute(&mut **db)
57 .await?;
58 self.state_json = Some(state_json);
59 Ok(())
60 }
61
62 pub fn id(&self) -> JobId {
63 self.id
64 }
65
66 pub fn pool(&self) -> &PgPool {
67 &self.pool
68 }
69
70 pub async fn integration(&self, id: IntegrationId) -> Result<Integration, sqlx::Error> {
71 Integrations::new(self.pool()).find_by_id(id).await
72 }
73}