runledger_runtime/catalog/sync.rs
1use runledger_postgres::DbPool;
2use runledger_postgres::jobs::{
3 JobDefinitionCatalogSyncMode, sync_catalog_job_definitions_exact_tx,
4 sync_catalog_job_definitions_tx,
5};
6
7use super::{
8 CatalogError, JobCatalog, JobCatalogExactSyncReport, JobCatalogSyncReport, JobCatalogSyncScope,
9};
10
11impl JobCatalog {
12 /// Upserts every catalog job into `job_definitions`.
13 ///
14 /// The catalog owns the synced definition fields: repeated syncs overwrite
15 /// `version`, `max_attempts`, `default_timeout_seconds`, and
16 /// `default_priority` for registered jobs. Enabled catalogs preserve an
17 /// existing disabled row so operator pauses survive worker restarts; disabled
18 /// catalogs explicitly write `is_enabled = false`.
19 /// Safe to call repeatedly. Does not delete or disable definitions absent from the catalog.
20 /// Use [`Self::sync_definitions_exact`] with an explicit scope when removed
21 /// catalog entries should be disabled.
22 ///
23 /// Disabled catalog sync briefly locks `job_schedules` and `job_definitions`
24 /// so active schedule checks and definition disables are evaluated against a
25 /// stable write boundary.
26 ///
27 /// # Errors
28 /// Returns [`CatalogError`] when defaults are invalid or persistence fails.
29 pub async fn sync_definitions(
30 &self,
31 pool: &DbPool,
32 ) -> Result<JobCatalogSyncReport, CatalogError> {
33 self.validate_defaults()?;
34
35 let mut tx = pool.begin().await.map_err(|error| {
36 CatalogError::SyncFailure(runledger_postgres::Error::from_query_sqlx_with_context(
37 "begin job catalog definition sync",
38 error,
39 ))
40 })?;
41
42 let definitions = self.catalog_definitions();
43 let mode = if self.defaults.is_enabled {
44 JobDefinitionCatalogSyncMode::PreserveExistingEnabledForEnabledDefinitions
45 } else {
46 JobDefinitionCatalogSyncMode::RestoreCatalogEnabledState
47 };
48 let report = sync_catalog_job_definitions_tx(&mut tx, &definitions, mode)
49 .await
50 .map_err(CatalogError::from_definition_catalog_sync_error)?;
51
52 tx.commit().await.map_err(CatalogError::CommitFailure)?;
53
54 Ok(JobCatalogSyncReport {
55 disabled_catalog_job_types: report.disabled_catalog_job_types,
56 })
57 }
58
59 /// Upserts catalog jobs, then disables enabled `job_definitions` rows in
60 /// `scope` whose job type is absent from the catalog.
61 ///
62 /// This is the stricter startup mode for applications that want the catalog
63 /// to be the active job-definition source of truth. It never deletes rows,
64 /// never operates outside the supplied owned job-type set, and rejects active
65 /// schedules that still reference a job type it would disable. Unlike
66 /// [`Self::sync_definitions`], exact sync restores catalog entries'
67 /// `is_enabled` value from catalog defaults.
68 ///
69 /// Exact sync briefly locks `job_schedules` and `job_definitions` before it
70 /// checks active schedules or disables definitions, so it is heavier than the
71 /// additive sync path.
72 ///
73 /// # Errors
74 /// Returns [`CatalogError`] when the scope is invalid, the catalog is empty,
75 /// a catalog job is outside the scope, active schedules still reference
76 /// absent scoped jobs, or persistence fails.
77 pub async fn sync_definitions_exact(
78 &self,
79 pool: &DbPool,
80 scope: &JobCatalogSyncScope,
81 ) -> Result<JobCatalogExactSyncReport, CatalogError> {
82 self.validate_defaults()?;
83 self.validate_exact_sync_scope(scope)?;
84
85 let mut tx = pool.begin().await.map_err(|error| {
86 CatalogError::SyncFailure(runledger_postgres::Error::from_query_sqlx_with_context(
87 "begin exact job catalog definition sync",
88 error,
89 ))
90 })?;
91
92 let scope_job_types = scope.job_types_for_storage();
93 let definitions = self.catalog_definitions();
94 let report = sync_catalog_job_definitions_exact_tx(&mut tx, &definitions, &scope_job_types)
95 .await
96 .map_err(CatalogError::from_definition_catalog_sync_error)?;
97
98 tx.commit().await.map_err(CatalogError::CommitFailure)?;
99
100 Ok(JobCatalogExactSyncReport {
101 disabled_absent_job_types: report.disabled_absent_job_types,
102 disabled_catalog_job_types: report.disabled_catalog_job_types,
103 })
104 }
105
106 fn catalog_definitions(&self) -> Vec<runledger_postgres::jobs::JobDefinitionUpsert<'static>> {
107 self.jobs
108 .values()
109 .map(|entry| self.materialize_definition(entry))
110 .collect()
111 }
112}