1mod config;
2mod current;
3mod cursor;
4mod entity;
5mod executor;
6mod registry;
7mod repo;
8mod traits;
9
10pub mod error;
11
12use cala_ledger::LedgerOperation;
13use chrono::{DateTime, Utc};
14use sqlx::PgPool;
15use tracing::instrument;
16
17pub use config::*;
18pub use current::*;
19pub use cursor::*;
20pub use entity::*;
21pub use registry::*;
22pub use traits::*;
23
24use error::*;
25use executor::*;
26use repo::*;
27
28#[derive(Clone)]
29pub struct Jobs {
30 _pool: PgPool,
31 repo: JobRepo,
32 executor: JobExecutor,
33}
34
35impl Jobs {
36 pub fn new(pool: &PgPool, config: JobExecutorConfig, registry: JobRegistry) -> Self {
37 let repo = JobRepo::new(pool);
38 let executor = JobExecutor::new(pool, config, registry, &repo);
39 Self {
40 _pool: pool.clone(),
41 repo,
42 executor,
43 }
44 }
45
46 #[instrument(name = "cala_server.jobs.create_and_spawn", skip(self, op, data))]
47 pub async fn create_and_spawn_in_op<I: JobInitializer + Default, D: serde::Serialize>(
48 &self,
49 op: &mut LedgerOperation<'_>,
50 id: impl Into<JobId> + std::fmt::Debug,
51 name: String,
52 description: Option<String>,
53 data: D,
54 ) -> Result<Job, JobError> {
55 let new_job = Job::new(name, <I as JobInitializer>::job_type(), description, data);
56 let job = self.repo.create_in_tx(op.tx(), new_job).await?;
57 self.executor.spawn_job::<I>(op.tx(), &job, None).await?;
58 Ok(job)
59 }
60
61 #[instrument(name = "cala_server.jobs.create_and_spawn_at", skip(self, op, data))]
62 pub async fn create_and_spawn_at_in_op<I: JobInitializer + Default, D: serde::Serialize>(
63 &self,
64 op: &mut LedgerOperation<'_>,
65 name: String,
66 description: Option<String>,
67 data: D,
68 schedule_at: DateTime<Utc>,
69 ) -> Result<Job, JobError> {
70 let new_job = Job::new(name, <I as JobInitializer>::job_type(), description, data);
71 let job = self.repo.create_in_tx(op.tx(), new_job).await?;
72 self.executor
73 .spawn_job::<I>(op.tx(), &job, Some(schedule_at))
74 .await?;
75 Ok(job)
76 }
77
78 #[instrument(name = "cala_server.jobs.find", skip(self))]
79 pub async fn find(&self, id: JobId) -> Result<Job, JobError> {
80 self.repo.find_by_id(id).await
81 }
82
83 pub(crate) async fn start_poll(&mut self) -> Result<(), JobError> {
84 self.executor.start_poll().await
85 }
86}