Skip to main content

runledger_runtime/catalog/
sync.rs

1use runledger_postgres::DbPool;
2use runledger_postgres::jobs::{
3    JobDefinitionCatalogSyncMode, sync_catalog_job_definitions_exact_tx,
4    sync_catalog_job_definitions_tx,
5};
6
7use super::{
8    CatalogError, JobCatalog, JobCatalogExactSyncReport, JobCatalogSyncReport, JobCatalogSyncScope,
9};
10
11impl JobCatalog {
12    /// Upserts every catalog job into `job_definitions`.
13    ///
14    /// The catalog owns the synced definition fields: repeated syncs overwrite
15    /// `version`, `max_attempts`, `default_timeout_seconds`, and
16    /// `default_priority` for registered jobs. Enabled catalogs preserve an
17    /// existing disabled row so operator pauses survive worker restarts; disabled
18    /// catalogs explicitly write `is_enabled = false`.
19    /// Safe to call repeatedly. Does not delete or disable definitions absent from the catalog.
20    /// Use [`Self::sync_definitions_exact`] with an explicit scope when removed
21    /// catalog entries should be disabled.
22    ///
23    /// Disabled catalog sync briefly locks `job_schedules` and `job_definitions`
24    /// so active schedule checks and definition disables are evaluated against a
25    /// stable write boundary.
26    ///
27    /// # Errors
28    /// Returns [`CatalogError`] when defaults are invalid or persistence fails.
29    pub async fn sync_definitions(
30        &self,
31        pool: &DbPool,
32    ) -> Result<JobCatalogSyncReport, CatalogError> {
33        self.validate_defaults()?;
34
35        let mut tx = pool.begin().await.map_err(|error| {
36            CatalogError::SyncFailure(runledger_postgres::Error::from_query_sqlx_with_context(
37                "begin job catalog definition sync",
38                error,
39            ))
40        })?;
41
42        let definitions = self.catalog_definitions();
43        let mode = if self.defaults.is_enabled {
44            JobDefinitionCatalogSyncMode::PreserveExistingEnabledForEnabledDefinitions
45        } else {
46            JobDefinitionCatalogSyncMode::RestoreCatalogEnabledState
47        };
48        let report = sync_catalog_job_definitions_tx(&mut tx, &definitions, mode)
49            .await
50            .map_err(CatalogError::from_definition_catalog_sync_error)?;
51
52        tx.commit().await.map_err(CatalogError::CommitFailure)?;
53
54        Ok(JobCatalogSyncReport {
55            disabled_catalog_job_types: report.disabled_catalog_job_types,
56        })
57    }
58
59    /// Upserts catalog jobs, then disables enabled `job_definitions` rows in
60    /// `scope` whose job type is absent from the catalog.
61    ///
62    /// This is the stricter startup mode for applications that want the catalog
63    /// to be the active job-definition source of truth. It never deletes rows,
64    /// never operates outside the supplied owned job-type set, and rejects active
65    /// schedules that still reference a job type it would disable. Unlike
66    /// [`Self::sync_definitions`], exact sync restores catalog entries'
67    /// `is_enabled` value from catalog defaults.
68    ///
69    /// Exact sync briefly locks `job_schedules` and `job_definitions` before it
70    /// checks active schedules or disables definitions, so it is heavier than the
71    /// additive sync path.
72    ///
73    /// # Errors
74    /// Returns [`CatalogError`] when the scope is invalid, the catalog is empty,
75    /// a catalog job is outside the scope, active schedules still reference
76    /// absent scoped jobs, or persistence fails.
77    pub async fn sync_definitions_exact(
78        &self,
79        pool: &DbPool,
80        scope: &JobCatalogSyncScope,
81    ) -> Result<JobCatalogExactSyncReport, CatalogError> {
82        self.validate_defaults()?;
83        self.validate_exact_sync_scope(scope)?;
84
85        let mut tx = pool.begin().await.map_err(|error| {
86            CatalogError::SyncFailure(runledger_postgres::Error::from_query_sqlx_with_context(
87                "begin exact job catalog definition sync",
88                error,
89            ))
90        })?;
91
92        let scope_job_types = scope.job_types_for_storage();
93        let definitions = self.catalog_definitions();
94        let report = sync_catalog_job_definitions_exact_tx(&mut tx, &definitions, &scope_job_types)
95            .await
96            .map_err(CatalogError::from_definition_catalog_sync_error)?;
97
98        tx.commit().await.map_err(CatalogError::CommitFailure)?;
99
100        Ok(JobCatalogExactSyncReport {
101            disabled_absent_job_types: report.disabled_absent_job_types,
102            disabled_catalog_job_types: report.disabled_catalog_job_types,
103        })
104    }
105
106    fn catalog_definitions(&self) -> Vec<runledger_postgres::jobs::JobDefinitionUpsert<'static>> {
107        self.jobs
108            .values()
109            .map(|entry| self.materialize_definition(entry))
110            .collect()
111    }
112}