1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
mod config;
mod error;

use sqlx::PgPool;
use tracing::instrument;

use cala_ledger::{query::*, CalaLedger};

use crate::job::*;
pub use config::*;
pub use error::*;

#[derive(Clone)]
pub struct CalaApp {
    pool: PgPool,
    ledger: CalaLedger,
    jobs: Jobs,
    job_executor: JobExecutor,
}

impl CalaApp {
    pub(crate) async fn run(
        pool: PgPool,
        config: AppConfig,
        ledger: CalaLedger,
        registry: JobRegistry,
    ) -> Result<Self, ApplicationError> {
        let jobs = Jobs::new(&pool);
        let mut job_executor =
            JobExecutor::new(&pool, config.job_execution.clone(), registry, &jobs);
        job_executor.start_poll().await?;
        Ok(Self {
            pool,
            ledger,
            job_executor,
            jobs,
        })
    }

    pub fn ledger(&self) -> &CalaLedger {
        &self.ledger
    }

    #[instrument(name = "cala_server.create_and_spawn_job", skip(self, config))]
    pub async fn create_and_spawn_job<I: JobInitializer + Default, C: serde::Serialize>(
        &self,
        name: String,
        description: Option<String>,
        config: C,
    ) -> Result<Job, ApplicationError> {
        let new_job = NewJob::builder()
            .name(name)
            .description(description)
            .config(config)?
            .job_type(<I as JobInitializer>::job_type())
            .build()
            .expect("Could not build job");
        let mut tx = self.pool.begin().await?;
        let job = self.jobs.create_in_tx(&mut tx, new_job).await?;
        self.job_executor.spawn_job::<I>(&mut tx, &job).await?;
        tx.commit().await?;
        Ok(job)
    }

    #[instrument(name = "cala_server.list_jobs", skip(self))]
    pub(crate) async fn list_jobs(
        &self,
        query: PaginatedQueryArgs<JobByNameCursor>,
    ) -> Result<PaginatedQueryRet<Job, JobByNameCursor>, ApplicationError> {
        Ok(self.jobs.list(query).await?)
    }
}