cala_server/job/
mod.rs

1mod config;
2mod current;
3mod cursor;
4mod entity;
5mod executor;
6mod registry;
7mod repo;
8mod traits;
9
10pub mod error;
11
12use cala_ledger::LedgerOperation;
13use chrono::{DateTime, Utc};
14use sqlx::PgPool;
15use tracing::instrument;
16
17pub use config::*;
18pub use current::*;
19pub use cursor::*;
20pub use entity::*;
21pub use registry::*;
22pub use traits::*;
23
24use error::*;
25use executor::*;
26use repo::*;
27
28#[derive(Clone)]
29pub struct Jobs {
30    _pool: PgPool,
31    repo: JobRepo,
32    executor: JobExecutor,
33}
34
35impl Jobs {
36    pub fn new(pool: &PgPool, config: JobExecutorConfig, registry: JobRegistry) -> Self {
37        let repo = JobRepo::new(pool);
38        let executor = JobExecutor::new(pool, config, registry, &repo);
39        Self {
40            _pool: pool.clone(),
41            repo,
42            executor,
43        }
44    }
45
46    #[instrument(name = "cala_server.jobs.create_and_spawn", skip(self, op, data))]
47    pub async fn create_and_spawn_in_op<I: JobInitializer + Default, D: serde::Serialize>(
48        &self,
49        op: &mut LedgerOperation<'_>,
50        id: impl Into<JobId> + std::fmt::Debug,
51        name: String,
52        description: Option<String>,
53        data: D,
54    ) -> Result<Job, JobError> {
55        let new_job = Job::new(name, <I as JobInitializer>::job_type(), description, data);
56        let job = self.repo.create_in_tx(op.tx(), new_job).await?;
57        self.executor.spawn_job::<I>(op.tx(), &job, None).await?;
58        Ok(job)
59    }
60
61    #[instrument(name = "cala_server.jobs.create_and_spawn_at", skip(self, op, data))]
62    pub async fn create_and_spawn_at_in_op<I: JobInitializer + Default, D: serde::Serialize>(
63        &self,
64        op: &mut LedgerOperation<'_>,
65        name: String,
66        description: Option<String>,
67        data: D,
68        schedule_at: DateTime<Utc>,
69    ) -> Result<Job, JobError> {
70        let new_job = Job::new(name, <I as JobInitializer>::job_type(), description, data);
71        let job = self.repo.create_in_tx(op.tx(), new_job).await?;
72        self.executor
73            .spawn_job::<I>(op.tx(), &job, Some(schedule_at))
74            .await?;
75        Ok(job)
76    }
77
78    #[instrument(name = "cala_server.jobs.find", skip(self))]
79    pub async fn find(&self, id: JobId) -> Result<Job, JobError> {
80        self.repo.find_by_id(id).await
81    }
82
83    pub(crate) async fn start_poll(&mut self) -> Result<(), JobError> {
84        self.executor.start_poll().await
85    }
86}