Skip to main content

runledger_postgres/jobs/queue/
definitions.rs

1use std::{error::Error as StdError, fmt};
2
3use crate::{DbPool, DbTx, Error, QueryError, QueryErrorCategory, Result};
4use runledger_core::jobs::{JobType, JobTypeName};
5
6use super::super::row_decode::parse_job_type_name;
7use super::super::types::{
8    JobDefinitionListFilter, JobDefinitionRecord, JobDefinitionUpdate, JobDefinitionUpsert,
9    JobScheduleJobTypeReference,
10};
11
12const DEFINITION_DISABLE_LOCK_TIMEOUT: &str = "5s";
13const DEFINITION_DISABLE_LOCK_TIMEOUT_MS: i64 = 5_000;
14const DEFINITION_DISABLE_STATEMENT_TIMEOUT: &str = "30s";
15const DEFINITION_DISABLE_STATEMENT_TIMEOUT_MS: i64 = 30_000;
16
17/// Summary of definition rows changed by a catalog sync.
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub struct JobDefinitionCatalogSyncReport {
20    /// Enabled definitions in the exact-sync scope that were absent from the
21    /// catalog and changed to disabled.
22    pub disabled_absent_job_types: Vec<JobTypeName>,
23    /// Catalog definitions changed to disabled because the catalog synced them
24    /// with `is_enabled = false`.
25    pub disabled_catalog_job_types: Vec<JobTypeName>,
26}
27
28/// Enabled-state handling for additive catalog definition sync.
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum JobDefinitionCatalogSyncMode {
31    /// Preserve stored `is_enabled` for enabled catalog definitions on conflict.
32    ///
33    /// This keeps operator pauses in place when a worker restarts with the same
34    /// enabled catalog. Disabled definitions in the payload still write
35    /// `is_enabled = false`.
36    PreserveExistingEnabledForEnabledDefinitions,
37    /// Write each payload's `is_enabled` value on insert and conflict.
38    RestoreCatalogEnabledState,
39}
40
41/// Error returned while applying a catalog-owned job-definition sync.
42#[non_exhaustive]
43#[derive(Debug)]
44pub enum JobDefinitionCatalogSyncError {
45    /// An active schedule references an enabled scoped definition absent from
46    /// the catalog.
47    ActiveScheduleForAbsentJobType(JobScheduleJobTypeReference),
48    /// An active schedule references a catalog definition that would be disabled.
49    ActiveScheduleForDisabledJobType(JobScheduleJobTypeReference),
50    /// Applying transaction-local statement timeout bounds failed.
51    CriticalSectionTimeoutFailure(Error),
52    /// Locking `job_schedules` before disabling definitions failed.
53    ScheduleLockFailure(Error),
54    /// Locking `job_definitions` before disabling definitions failed.
55    DefinitionLockFailure(Error),
56    /// Checking active schedules before disabling definitions failed.
57    ScheduleCheckFailure(Error),
58    /// Sync input failed validation before any catalog writes.
59    ValidationFailure(Error),
60    /// Inspecting existing definitions before sync failed.
61    DefinitionInspectFailure(Error),
62    /// Syncing one catalog definition failed.
63    DefinitionSyncFailure {
64        /// Job type whose definition failed to sync.
65        job_type: String,
66        /// Persistence-layer failure returned by the definition write.
67        source: Error,
68    },
69    /// Disabling absent scoped definitions failed.
70    DisableAbsentFailure(Error),
71}
72
73impl fmt::Display for JobDefinitionCatalogSyncError {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        match self {
76            Self::ActiveScheduleForAbsentJobType(reference) => write!(
77                f,
78                "active schedule {} still references absent catalog job type {}",
79                reference.schedule_name, reference.job_type
80            ),
81            Self::ActiveScheduleForDisabledJobType(reference) => write!(
82                f,
83                "active schedule {} still references disabled catalog job type {}",
84                reference.schedule_name, reference.job_type
85            ),
86            Self::CriticalSectionTimeoutFailure(error) => {
87                write!(
88                    f,
89                    "failed to bound job definition disable critical section: {error}"
90                )
91            }
92            Self::ScheduleLockFailure(error) => write!(
93                f,
94                "failed to lock job schedules before disabling job definitions: {error}"
95            ),
96            Self::DefinitionLockFailure(error) => write!(
97                f,
98                "failed to lock job definitions before disabling job definitions: {error}"
99            ),
100            Self::ScheduleCheckFailure(error) => write!(
101                f,
102                "failed to check active schedules before disabling job definitions: {error}"
103            ),
104            Self::ValidationFailure(error) => {
105                write!(f, "job definition catalog sync input is invalid: {error}")
106            }
107            Self::DefinitionInspectFailure(error) => {
108                write!(
109                    f,
110                    "failed to inspect job definitions before catalog sync: {error}"
111                )
112            }
113            Self::DefinitionSyncFailure { job_type, source } => {
114                write!(f, "failed to sync job definition {job_type}: {source}")
115            }
116            Self::DisableAbsentFailure(error) => {
117                write!(f, "failed to disable absent job definitions: {error}")
118            }
119        }
120    }
121}
122
123impl StdError for JobDefinitionCatalogSyncError {
124    fn source(&self) -> Option<&(dyn StdError + 'static)> {
125        match self {
126            Self::CriticalSectionTimeoutFailure(error)
127            | Self::ScheduleLockFailure(error)
128            | Self::DefinitionLockFailure(error)
129            | Self::ScheduleCheckFailure(error)
130            | Self::ValidationFailure(error)
131            | Self::DefinitionInspectFailure(error)
132            | Self::DefinitionSyncFailure { source: error, .. }
133            | Self::DisableAbsentFailure(error) => Some(error),
134            Self::ActiveScheduleForAbsentJobType(_) | Self::ActiveScheduleForDisabledJobType(_) => {
135                None
136            }
137        }
138    }
139}
140
141pub async fn sync_catalog_job_definitions_tx(
142    tx: &mut DbTx<'_>,
143    definitions: &[JobDefinitionUpsert<'_>],
144    mode: JobDefinitionCatalogSyncMode,
145) -> std::result::Result<JobDefinitionCatalogSyncReport, JobDefinitionCatalogSyncError> {
146    let disabled_job_types = definition_job_type_names(
147        definitions
148            .iter()
149            .filter(|definition| !definition.is_enabled),
150    )?;
151    let disabled_catalog_job_types = if disabled_job_types.is_empty() {
152        Vec::new()
153    } else {
154        prepare_definition_disable_critical_section_tx(tx).await?;
155        reject_active_schedules_for_disabled_job_types_tx(tx, &disabled_job_types).await?;
156        // Report rows that this sync will newly create as disabled or change
157        // from enabled to disabled. Already-disabled rows are intentionally
158        // omitted from the report.
159        list_job_types_missing_or_enabled_definitions_tx(tx, &disabled_job_types)
160            .await
161            .map_err(JobDefinitionCatalogSyncError::DefinitionInspectFailure)?
162    };
163
164    for definition in definitions {
165        let upsert_result = match (mode, definition.is_enabled) {
166            (JobDefinitionCatalogSyncMode::PreserveExistingEnabledForEnabledDefinitions, true) => {
167                upsert_job_definition_preserving_enabled_tx(tx, definition).await
168            }
169            (JobDefinitionCatalogSyncMode::PreserveExistingEnabledForEnabledDefinitions, false)
170            | (JobDefinitionCatalogSyncMode::RestoreCatalogEnabledState, _) => {
171                upsert_job_definition_tx(tx, definition).await
172            }
173        };
174        upsert_result.map_err(
175            |source| JobDefinitionCatalogSyncError::DefinitionSyncFailure {
176                job_type: definition.job_type.as_str().to_owned(),
177                source,
178            },
179        )?;
180    }
181
182    Ok(JobDefinitionCatalogSyncReport {
183        disabled_absent_job_types: Vec::new(),
184        disabled_catalog_job_types,
185    })
186}
187
188pub async fn sync_catalog_job_definitions_exact_tx(
189    tx: &mut DbTx<'_>,
190    definitions: &[JobDefinitionUpsert<'_>],
191    scope_job_types: &[JobTypeName],
192) -> std::result::Result<JobDefinitionCatalogSyncReport, JobDefinitionCatalogSyncError> {
193    let catalog_job_types = definition_job_type_names(definitions.iter())?;
194    validate_non_empty_job_types("exact catalog sync job definitions", &catalog_job_types)
195        .map_err(JobDefinitionCatalogSyncError::ValidationFailure)?;
196    validate_non_empty_job_types("exact catalog sync scope", scope_job_types)
197        .map_err(JobDefinitionCatalogSyncError::ValidationFailure)?;
198
199    let disabled_job_types = definition_job_type_names(
200        definitions
201            .iter()
202            .filter(|definition| !definition.is_enabled),
203    )?;
204    let has_absent_scope_job_types = scope_job_types
205        .iter()
206        .any(|job_type| !catalog_job_types.contains(job_type));
207    let requires_disable_guard = !disabled_job_types.is_empty() || has_absent_scope_job_types;
208    if requires_disable_guard {
209        prepare_definition_disable_critical_section_tx(tx).await?;
210    }
211
212    let disabled_catalog_job_types = if disabled_job_types.is_empty() {
213        Vec::new()
214    } else {
215        reject_active_schedules_for_disabled_job_types_tx(tx, &disabled_job_types).await?;
216        list_job_types_missing_or_enabled_definitions_tx(tx, &disabled_job_types)
217            .await
218            .map_err(JobDefinitionCatalogSyncError::DefinitionInspectFailure)?
219    };
220
221    if has_absent_scope_job_types {
222        if let Some(reference) = find_active_schedule_for_enabled_absent_job_types_tx(
223            tx,
224            &catalog_job_types,
225            scope_job_types,
226        )
227        .await
228        .map_err(JobDefinitionCatalogSyncError::ScheduleCheckFailure)?
229        {
230            return Err(JobDefinitionCatalogSyncError::ActiveScheduleForAbsentJobType(reference));
231        }
232    }
233
234    // Re-enabling catalog definitions does not need the disable guard because it
235    // cannot orphan active schedules or authorize work for a row being disabled.
236    for definition in definitions {
237        upsert_job_definition_tx(tx, definition)
238            .await
239            .map_err(
240                |source| JobDefinitionCatalogSyncError::DefinitionSyncFailure {
241                    job_type: definition.job_type.as_str().to_owned(),
242                    source,
243                },
244            )?;
245    }
246
247    let disabled_absent_job_types = if has_absent_scope_job_types {
248        disable_enabled_job_definitions_except_tx(tx, &catalog_job_types, scope_job_types)
249            .await
250            .map_err(JobDefinitionCatalogSyncError::DisableAbsentFailure)?
251    } else {
252        Vec::new()
253    };
254
255    Ok(JobDefinitionCatalogSyncReport {
256        disabled_absent_job_types,
257        disabled_catalog_job_types,
258    })
259}
260
261pub async fn upsert_job_definition_tx(
262    tx: &mut DbTx<'_>,
263    payload: &JobDefinitionUpsert<'_>,
264) -> Result<()> {
265    sqlx::query!(
266        "INSERT INTO job_definitions (
267            job_type,
268            version,
269            max_attempts,
270            default_timeout_seconds,
271            default_priority,
272            is_enabled
273         )
274         VALUES ($1, $2, $3, $4, $5, $6)
275         ON CONFLICT (job_type)
276         DO UPDATE
277            SET version = EXCLUDED.version,
278                max_attempts = EXCLUDED.max_attempts,
279                default_timeout_seconds = EXCLUDED.default_timeout_seconds,
280                default_priority = EXCLUDED.default_priority,
281                is_enabled = EXCLUDED.is_enabled,
282                updated_at = now()
283          WHERE job_definitions.version IS DISTINCT FROM EXCLUDED.version
284             OR job_definitions.max_attempts IS DISTINCT FROM EXCLUDED.max_attempts
285             OR job_definitions.default_timeout_seconds IS DISTINCT FROM EXCLUDED.default_timeout_seconds
286             OR job_definitions.default_priority IS DISTINCT FROM EXCLUDED.default_priority
287             OR job_definitions.is_enabled IS DISTINCT FROM EXCLUDED.is_enabled",
288        payload.job_type as _,
289        payload.version,
290        payload.max_attempts,
291        payload.default_timeout_seconds,
292        payload.default_priority,
293        payload.is_enabled,
294    )
295    .execute(&mut **tx)
296    .await
297    .map_err(|error| Error::from_query_sqlx_with_context("upsert job definition", error))?;
298
299    Ok(())
300}
301
302/// Upserts a job definition while preserving an existing row's `is_enabled`.
303///
304/// Inserts use `payload.is_enabled`; updates keep the stored enabled state and
305/// refresh only the catalog-owned version, retry, timeout, and priority fields.
306async fn upsert_job_definition_preserving_enabled_tx(
307    tx: &mut DbTx<'_>,
308    payload: &JobDefinitionUpsert<'_>,
309) -> Result<()> {
310    sqlx::query!(
311        "INSERT INTO job_definitions (
312            job_type,
313            version,
314            max_attempts,
315            default_timeout_seconds,
316            default_priority,
317            is_enabled
318         )
319         VALUES ($1, $2, $3, $4, $5, $6)
320         ON CONFLICT (job_type)
321         DO UPDATE
322            SET version = EXCLUDED.version,
323                max_attempts = EXCLUDED.max_attempts,
324                default_timeout_seconds = EXCLUDED.default_timeout_seconds,
325                default_priority = EXCLUDED.default_priority,
326                is_enabled = job_definitions.is_enabled,
327                updated_at = now()
328          WHERE job_definitions.version IS DISTINCT FROM EXCLUDED.version
329             OR job_definitions.max_attempts IS DISTINCT FROM EXCLUDED.max_attempts
330             OR job_definitions.default_timeout_seconds IS DISTINCT FROM EXCLUDED.default_timeout_seconds
331             OR job_definitions.default_priority IS DISTINCT FROM EXCLUDED.default_priority",
332        payload.job_type as _,
333        payload.version,
334        payload.max_attempts,
335        payload.default_timeout_seconds,
336        payload.default_priority,
337        // Used only by the INSERT path; conflicts preserve the stored value.
338        payload.is_enabled,
339    )
340    .execute(&mut **tx)
341    .await
342    .map_err(|error| {
343        Error::from_query_sqlx_with_context("upsert job definition preserving enabled", error)
344    })?;
345
346    Ok(())
347}
348
349async fn lock_job_schedules_for_definition_disable_tx(tx: &mut DbTx<'_>) -> Result<()> {
350    let previous_lock_timeout = cap_local_lock_timeout_tx(
351        tx,
352        DEFINITION_DISABLE_LOCK_TIMEOUT,
353        DEFINITION_DISABLE_LOCK_TIMEOUT_MS,
354        "set job definition disable schedule lock timeout",
355    )
356    .await?;
357
358    let lock_result = sqlx::query!("LOCK TABLE job_schedules IN SHARE ROW EXCLUSIVE MODE")
359        .execute(&mut **tx)
360        .await;
361
362    match lock_result {
363        Ok(_) => {
364            set_local_lock_timeout_tx(
365                tx,
366                &previous_lock_timeout,
367                "restore job definition disable schedule lock timeout",
368            )
369            .await
370        }
371        Err(error) => {
372            // No restore is needed on the error path: the caller rolls back the
373            // transaction and PostgreSQL discards the SET LOCAL lock_timeout.
374            Err(Error::from_query_sqlx_with_context(
375                "lock job schedules before disabling job definitions",
376                error,
377            ))
378        }
379    }
380}
381
382async fn lock_job_definitions_for_definition_disable_tx(tx: &mut DbTx<'_>) -> Result<()> {
383    let previous_lock_timeout = cap_local_lock_timeout_tx(
384        tx,
385        DEFINITION_DISABLE_LOCK_TIMEOUT,
386        DEFINITION_DISABLE_LOCK_TIMEOUT_MS,
387        "set job definition disable definition lock timeout",
388    )
389    .await?;
390
391    let lock_result = sqlx::query("LOCK TABLE job_definitions IN SHARE ROW EXCLUSIVE MODE")
392        .execute(&mut **tx)
393        .await;
394
395    match lock_result {
396        Ok(_) => {
397            set_local_lock_timeout_tx(
398                tx,
399                &previous_lock_timeout,
400                "restore job definition disable definition lock timeout",
401            )
402            .await
403        }
404        Err(error) => {
405            // No restore is needed on the error path: the caller rolls back the
406            // transaction and PostgreSQL discards the SET LOCAL lock_timeout.
407            Err(Error::from_query_sqlx_with_context(
408                "lock job definitions before disabling job definitions",
409                error,
410            ))
411        }
412    }
413}
414
415async fn find_active_schedule_for_job_types_tx(
416    tx: &mut DbTx<'_>,
417    job_types: &[JobTypeName],
418) -> Result<Option<JobScheduleJobTypeReference>> {
419    let job_types = job_type_strings(job_types);
420    let row = sqlx::query!(
421        "SELECT name, job_type
422         FROM job_schedules
423         WHERE is_active = true
424           AND job_type = ANY($1::text[])
425         ORDER BY name ASC
426         LIMIT 1",
427        job_types.as_slice(),
428    )
429    .fetch_optional(&mut **tx)
430    .await
431    .map_err(|error| {
432        Error::from_query_sqlx_with_context("find active schedule for job definitions", error)
433    })?;
434
435    row.map(|row| parse_schedule_job_type_reference(row.name, row.job_type))
436        .transpose()
437}
438
439async fn find_active_schedule_for_enabled_absent_job_types_tx(
440    tx: &mut DbTx<'_>,
441    catalog_job_types: &[JobTypeName],
442    scope_job_types: &[JobTypeName],
443) -> Result<Option<JobScheduleJobTypeReference>> {
444    let catalog_job_types = job_type_strings(catalog_job_types);
445    let scope_job_types = job_type_strings(scope_job_types);
446    let row = sqlx::query!(
447        "SELECT job_schedules.name, job_schedules.job_type
448         FROM job_schedules
449         INNER JOIN job_definitions
450            ON job_definitions.job_type = job_schedules.job_type
451         WHERE job_schedules.is_active = true
452           AND job_schedules.job_type <> ALL($1::text[])
453           AND job_schedules.job_type = ANY($2::text[])
454           AND job_definitions.is_enabled = true
455         ORDER BY job_schedules.name ASC
456         LIMIT 1",
457        catalog_job_types.as_slice(),
458        scope_job_types.as_slice(),
459    )
460    .fetch_optional(&mut **tx)
461    .await
462    .map_err(|error| {
463        Error::from_query_sqlx_with_context(
464            "find active schedule for enabled absent job definitions",
465            error,
466        )
467    })?;
468
469    row.map(|row| parse_schedule_job_type_reference(row.name, row.job_type))
470        .transpose()
471}
472
473async fn list_job_types_missing_or_enabled_definitions_tx(
474    tx: &mut DbTx<'_>,
475    job_types: &[JobTypeName],
476) -> Result<Vec<JobTypeName>> {
477    let job_types = job_type_strings(job_types);
478    let rows = sqlx::query_scalar!(
479        "SELECT catalog.job_type as \"job_type!\"
480         FROM unnest($1::text[]) AS catalog(job_type)
481         LEFT JOIN job_definitions
482            ON job_definitions.job_type = catalog.job_type
483         WHERE job_definitions.job_type IS NULL
484            OR job_definitions.is_enabled = true",
485        job_types.as_slice(),
486    )
487    .fetch_all(&mut **tx)
488    .await
489    .map_err(|error| {
490        Error::from_query_sqlx_with_context("list missing or enabled job definitions", error)
491    })?;
492
493    parse_job_type_rows(rows)
494}
495
496async fn disable_enabled_job_definitions_except_tx(
497    tx: &mut DbTx<'_>,
498    keep_job_types: &[JobTypeName],
499    scope_job_types: &[JobTypeName],
500) -> Result<Vec<JobTypeName>> {
501    validate_non_empty_job_types("disable enabled job definitions keep list", keep_job_types)?;
502    validate_non_empty_job_types("disable enabled job definitions scope", scope_job_types)?;
503
504    let keep_job_types = job_type_strings(keep_job_types);
505    let scope_job_types = job_type_strings(scope_job_types);
506    let rows = sqlx::query_scalar!(
507        "UPDATE job_definitions
508         SET is_enabled = false,
509             updated_at = now()
510         WHERE is_enabled = true
511           AND job_type <> ALL($1::text[])
512           AND job_type = ANY($2::text[])
513         RETURNING job_type",
514        keep_job_types.as_slice(),
515        scope_job_types.as_slice(),
516    )
517    .fetch_all(&mut **tx)
518    .await
519    .map_err(|error| {
520        Error::from_query_sqlx_with_context("disable enabled job definitions except list", error)
521    })?;
522
523    parse_job_type_rows(rows)
524}
525
526pub async fn insert_job_definition_if_missing_tx(
527    tx: &mut DbTx<'_>,
528    payload: &JobDefinitionUpsert<'_>,
529) -> Result<()> {
530    sqlx::query!(
531        "INSERT INTO job_definitions (
532            job_type,
533            version,
534            max_attempts,
535            default_timeout_seconds,
536            default_priority,
537            is_enabled
538         )
539         VALUES ($1, $2, $3, $4, $5, $6)
540         ON CONFLICT (job_type)
541         DO NOTHING",
542        payload.job_type as _,
543        payload.version,
544        payload.max_attempts,
545        payload.default_timeout_seconds,
546        payload.default_priority,
547        payload.is_enabled,
548    )
549    .execute(&mut **tx)
550    .await
551    .map_err(|error| {
552        Error::from_query_sqlx_with_context("insert job definition if missing", error)
553    })?;
554
555    Ok(())
556}
557
558pub async fn list_job_definitions(
559    pool: &DbPool,
560    filter: &JobDefinitionListFilter<'_>,
561) -> Result<Vec<JobDefinitionRecord>> {
562    let escaped_job_type = filter.job_type.map(escape_ilike_pattern);
563
564    let rows = sqlx::query!(
565        "SELECT
566            job_type,
567            version,
568            max_attempts,
569            default_timeout_seconds,
570            default_priority,
571            is_enabled,
572            created_at,
573            updated_at
574         FROM job_definitions
575         WHERE ($1::text IS NULL OR job_type ILIKE '%' || $1 || '%')
576         ORDER BY job_type ASC
577         LIMIT $2
578         OFFSET $3",
579        escaped_job_type.as_deref(),
580        filter.limit,
581        filter.offset,
582    )
583    .fetch_all(pool)
584    .await
585    .map_err(|error| Error::from_query_sqlx_with_context("list job definitions", error))?;
586
587    rows.into_iter()
588        .map(|row| {
589            Ok(JobDefinitionRecord {
590                job_type: parse_job_type_name(row.job_type)?,
591                version: row.version,
592                max_attempts: row.max_attempts,
593                default_timeout_seconds: row.default_timeout_seconds,
594                default_priority: row.default_priority,
595                is_enabled: row.is_enabled,
596                created_at: row.created_at,
597                updated_at: row.updated_at,
598            })
599        })
600        .collect()
601}
602
603fn escape_ilike_pattern(input: &str) -> String {
604    input
605        .replace('\\', "\\\\")
606        .replace('%', "\\%")
607        .replace('_', "\\_")
608}
609
610fn job_type_strings(job_types: &[JobTypeName]) -> Vec<String> {
611    job_types
612        .iter()
613        .map(|job_type| job_type.as_str().to_owned())
614        .collect()
615}
616
617fn definition_job_type_names<'definition, 'payload, I>(
618    definitions: I,
619) -> std::result::Result<Vec<JobTypeName>, JobDefinitionCatalogSyncError>
620where
621    'payload: 'definition,
622    I: IntoIterator<Item = &'definition JobDefinitionUpsert<'payload>>,
623{
624    // JobType::new is intentionally lightweight, so the catalog sync boundary
625    // revalidates names before using them in scope comparisons or reports.
626    let mut job_types = definitions
627        .into_iter()
628        .map(|definition| parse_job_type_name(definition.job_type.as_str().to_owned()))
629        .collect::<Result<Vec<_>>>()
630        .map_err(JobDefinitionCatalogSyncError::DefinitionInspectFailure)?;
631    job_types.sort();
632    Ok(job_types)
633}
634
635async fn prepare_definition_disable_critical_section_tx(
636    tx: &mut DbTx<'_>,
637) -> std::result::Result<(), JobDefinitionCatalogSyncError> {
638    cap_local_statement_timeout_tx(
639        tx,
640        DEFINITION_DISABLE_STATEMENT_TIMEOUT,
641        DEFINITION_DISABLE_STATEMENT_TIMEOUT_MS,
642        "set job definition disable statement timeout",
643    )
644    .await
645    .map_err(JobDefinitionCatalogSyncError::CriticalSectionTimeoutFailure)?;
646    lock_job_schedules_for_definition_disable_tx(tx)
647        .await
648        .map_err(JobDefinitionCatalogSyncError::ScheduleLockFailure)?;
649    lock_job_definitions_for_definition_disable_tx(tx)
650        .await
651        .map_err(JobDefinitionCatalogSyncError::DefinitionLockFailure)
652}
653
654async fn reject_active_schedules_for_disabled_job_types_tx(
655    tx: &mut DbTx<'_>,
656    job_types: &[JobTypeName],
657) -> std::result::Result<(), JobDefinitionCatalogSyncError> {
658    if let Some(reference) = find_active_schedule_for_job_types_tx(tx, job_types)
659        .await
660        .map_err(JobDefinitionCatalogSyncError::ScheduleCheckFailure)?
661    {
662        return Err(JobDefinitionCatalogSyncError::ActiveScheduleForDisabledJobType(reference));
663    }
664
665    Ok(())
666}
667
668fn parse_schedule_job_type_reference(
669    schedule_name: String,
670    job_type: String,
671) -> Result<JobScheduleJobTypeReference> {
672    Ok(JobScheduleJobTypeReference {
673        schedule_name,
674        job_type: parse_job_type_name(job_type)?,
675    })
676}
677
678fn parse_job_type_rows(rows: Vec<String>) -> Result<Vec<JobTypeName>> {
679    let mut job_types = rows
680        .into_iter()
681        .map(parse_job_type_name)
682        .collect::<Result<Vec<_>>>()?;
683    job_types.sort();
684    Ok(job_types)
685}
686
687fn validate_non_empty_job_types(context: &'static str, job_types: &[JobTypeName]) -> Result<()> {
688    if job_types.is_empty() {
689        return Err(Error::QueryError(QueryError::from_classified(
690            QueryErrorCategory::Validation,
691            "job_definition.empty_job_type_list",
692            "Job type list must not be empty.",
693            format!("{context}: job type list must not be empty"),
694        )));
695    }
696    Ok(())
697}
698
699async fn cap_local_lock_timeout_tx(
700    tx: &mut DbTx<'_>,
701    lock_timeout: &str,
702    lock_timeout_ms: i64,
703    context: &'static str,
704) -> Result<String> {
705    // Preserve PostgreSQL's reported GUC text so restore keeps units and special
706    // values such as "0" exactly as the connection reported them.
707    sqlx::query_scalar::<_, String>(
708        "WITH previous AS MATERIALIZED (
709             SELECT
710                current_setting('lock_timeout') AS lock_timeout,
711                setting::bigint AS lock_timeout_ms
712             FROM pg_settings
713             WHERE name = 'lock_timeout'
714         )
715         SELECT previous.lock_timeout
716         FROM previous,
717              LATERAL (
718                SELECT set_config(
719                    'lock_timeout',
720                    CASE
721                        WHEN previous.lock_timeout_ms = 0 THEN $1
722                        WHEN previous.lock_timeout_ms <= $2 THEN previous.lock_timeout
723                        ELSE $1
724                    END,
725                    true
726                )
727              ) AS applied",
728    )
729    .bind(lock_timeout)
730    .bind(lock_timeout_ms)
731    .fetch_one(&mut **tx)
732    .await
733    .map_err(|error| Error::from_query_sqlx_with_context(context, error))
734}
735
736async fn cap_local_statement_timeout_tx(
737    tx: &mut DbTx<'_>,
738    statement_timeout: &str,
739    statement_timeout_ms: i64,
740    context: &'static str,
741) -> Result<String> {
742    // Cap statement_timeout while the transaction holds definition-disable locks
743    // so a stalled check, upsert, or disable statement cannot hold table locks
744    // indefinitely. Preserve stricter caller settings.
745    sqlx::query_scalar::<_, String>(
746        "WITH previous AS MATERIALIZED (
747             SELECT
748                current_setting('statement_timeout') AS statement_timeout,
749                setting::bigint AS statement_timeout_ms
750             FROM pg_settings
751             WHERE name = 'statement_timeout'
752         )
753         SELECT previous.statement_timeout
754         FROM previous,
755              LATERAL (
756                SELECT set_config(
757                    'statement_timeout',
758                    CASE
759                        WHEN previous.statement_timeout_ms = 0 THEN $1
760                        WHEN previous.statement_timeout_ms <= $2 THEN previous.statement_timeout
761                        ELSE $1
762                    END,
763                    true
764                )
765              ) AS applied",
766    )
767    .bind(statement_timeout)
768    .bind(statement_timeout_ms)
769    .fetch_one(&mut **tx)
770    .await
771    .map_err(|error| Error::from_query_sqlx_with_context(context, error))
772}
773
774async fn set_local_lock_timeout_tx(
775    tx: &mut DbTx<'_>,
776    lock_timeout: &str,
777    context: &'static str,
778) -> Result<()> {
779    sqlx::query_scalar::<_, String>("SELECT set_config('lock_timeout', $1, true)")
780        .bind(lock_timeout)
781        .fetch_one(&mut **tx)
782        .await
783        .map_err(|error| Error::from_query_sqlx_with_context(context, error))?;
784
785    Ok(())
786}
787
788pub async fn get_job_definition_by_type(
789    pool: &DbPool,
790    job_type: JobType<'_>,
791) -> Result<Option<JobDefinitionRecord>> {
792    let row = sqlx::query!(
793        "SELECT
794            job_type,
795            version,
796            max_attempts,
797            default_timeout_seconds,
798            default_priority,
799            is_enabled,
800            created_at,
801            updated_at
802         FROM job_definitions
803         WHERE job_type = $1
804         LIMIT 1",
805        job_type as _,
806    )
807    .fetch_optional(pool)
808    .await
809    .map_err(|error| Error::from_query_sqlx_with_context("get job definition by type", error))?;
810
811    row.map(|row| {
812        Ok(JobDefinitionRecord {
813            job_type: parse_job_type_name(row.job_type)?,
814            version: row.version,
815            max_attempts: row.max_attempts,
816            default_timeout_seconds: row.default_timeout_seconds,
817            default_priority: row.default_priority,
818            is_enabled: row.is_enabled,
819            created_at: row.created_at,
820            updated_at: row.updated_at,
821        })
822    })
823    .transpose()
824}
825
826pub async fn update_job_definition(
827    pool: &DbPool,
828    job_type: JobType<'_>,
829    payload: &JobDefinitionUpdate,
830) -> Result<Option<JobDefinitionRecord>> {
831    let row = sqlx::query!(
832        "UPDATE job_definitions
833         SET max_attempts = COALESCE($2, max_attempts),
834             default_timeout_seconds = COALESCE($3, default_timeout_seconds),
835             default_priority = COALESCE($4, default_priority),
836             is_enabled = COALESCE($5, is_enabled),
837             updated_at = now()
838         WHERE job_type = $1
839         RETURNING
840            job_type,
841            version,
842            max_attempts,
843            default_timeout_seconds,
844            default_priority,
845            is_enabled,
846            created_at,
847            updated_at",
848        job_type as _,
849        payload.max_attempts,
850        payload.default_timeout_seconds,
851        payload.default_priority,
852        payload.is_enabled,
853    )
854    .fetch_optional(pool)
855    .await
856    .map_err(|error| Error::from_query_sqlx_with_context("update job definition", error))?;
857
858    row.map(|row| {
859        Ok(JobDefinitionRecord {
860            job_type: parse_job_type_name(row.job_type)?,
861            version: row.version,
862            max_attempts: row.max_attempts,
863            default_timeout_seconds: row.default_timeout_seconds,
864            default_priority: row.default_priority,
865            is_enabled: row.is_enabled,
866            created_at: row.created_at,
867            updated_at: row.updated_at,
868        })
869    })
870    .transpose()
871}