1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use runledger_postgres::DbPool;
use runledger_postgres::jobs::{
JobDefinitionCatalogSyncMode, sync_catalog_job_definitions_exact_tx,
sync_catalog_job_definitions_tx,
};
use super::{
CatalogError, JobCatalog, JobCatalogExactSyncReport, JobCatalogSyncReport, JobCatalogSyncScope,
};
impl JobCatalog {
/// Upserts every catalog job into `job_definitions`.
///
/// The catalog owns the synced definition fields: repeated syncs overwrite
/// `version`, `max_attempts`, `default_timeout_seconds`, and
/// `default_priority` for registered jobs. Enabled catalogs preserve an
/// existing disabled row so operator pauses survive worker restarts; disabled
/// catalogs explicitly write `is_enabled = false`.
/// Safe to call repeatedly. Does not delete or disable definitions absent from the catalog.
/// Use [`Self::sync_definitions_exact`] with an explicit scope when removed
/// catalog entries should be disabled.
///
/// Disabled catalog sync briefly locks `job_schedules` and `job_definitions`
/// so active schedule checks and definition disables are evaluated against a
/// stable write boundary.
///
/// # Errors
/// Returns [`CatalogError`] when defaults are invalid or persistence fails.
pub async fn sync_definitions(
&self,
pool: &DbPool,
) -> Result<JobCatalogSyncReport, CatalogError> {
self.validate_defaults()?;
let mut tx = pool.begin().await.map_err(|error| {
CatalogError::SyncFailure(runledger_postgres::Error::from_query_sqlx_with_context(
"begin job catalog definition sync",
error,
))
})?;
let definitions = self.catalog_definitions();
let mode = if self.defaults.is_enabled {
JobDefinitionCatalogSyncMode::PreserveExistingEnabledForEnabledDefinitions
} else {
JobDefinitionCatalogSyncMode::RestoreCatalogEnabledState
};
let report = sync_catalog_job_definitions_tx(&mut tx, &definitions, mode)
.await
.map_err(CatalogError::from_definition_catalog_sync_error)?;
tx.commit().await.map_err(CatalogError::CommitFailure)?;
Ok(JobCatalogSyncReport {
disabled_catalog_job_types: report.disabled_catalog_job_types,
})
}
/// Upserts catalog jobs, then disables enabled `job_definitions` rows in
/// `scope` whose job type is absent from the catalog.
///
/// This is the stricter startup mode for applications that want the catalog
/// to be the active job-definition source of truth. It never deletes rows,
/// never operates outside the supplied owned job-type set, and rejects active
/// schedules that still reference a job type it would disable. Unlike
/// [`Self::sync_definitions`], exact sync restores catalog entries'
/// `is_enabled` value from catalog defaults.
///
/// Exact sync briefly locks `job_schedules` and `job_definitions` before it
/// checks active schedules or disables definitions, so it is heavier than the
/// additive sync path.
///
/// # Errors
/// Returns [`CatalogError`] when the scope is invalid, the catalog is empty,
/// a catalog job is outside the scope, active schedules still reference
/// absent scoped jobs, or persistence fails.
pub async fn sync_definitions_exact(
&self,
pool: &DbPool,
scope: &JobCatalogSyncScope,
) -> Result<JobCatalogExactSyncReport, CatalogError> {
self.validate_defaults()?;
self.validate_exact_sync_scope(scope)?;
let mut tx = pool.begin().await.map_err(|error| {
CatalogError::SyncFailure(runledger_postgres::Error::from_query_sqlx_with_context(
"begin exact job catalog definition sync",
error,
))
})?;
let scope_job_types = scope.job_types_for_storage();
let definitions = self.catalog_definitions();
let report = sync_catalog_job_definitions_exact_tx(&mut tx, &definitions, &scope_job_types)
.await
.map_err(CatalogError::from_definition_catalog_sync_error)?;
tx.commit().await.map_err(CatalogError::CommitFailure)?;
Ok(JobCatalogExactSyncReport {
disabled_absent_job_types: report.disabled_absent_job_types,
disabled_catalog_job_types: report.disabled_catalog_job_types,
})
}
fn catalog_definitions(&self) -> Vec<runledger_postgres::jobs::JobDefinitionUpsert<'static>> {
self.jobs
.values()
.map(|entry| self.materialize_definition(entry))
.collect()
}
}