1use std::{error::Error as StdError, fmt};
2
3use crate::{DbPool, DbTx, Error, QueryError, QueryErrorCategory, Result};
4use runledger_core::jobs::{JobType, JobTypeName};
5
6use super::super::row_decode::parse_job_type_name;
7use super::super::types::{
8 JobDefinitionListFilter, JobDefinitionRecord, JobDefinitionUpdate, JobDefinitionUpsert,
9 JobScheduleJobTypeReference,
10};
11
12const DEFINITION_DISABLE_LOCK_TIMEOUT: &str = "5s";
13const DEFINITION_DISABLE_LOCK_TIMEOUT_MS: i64 = 5_000;
14const DEFINITION_DISABLE_STATEMENT_TIMEOUT: &str = "30s";
15const DEFINITION_DISABLE_STATEMENT_TIMEOUT_MS: i64 = 30_000;
16
17#[derive(Debug, Clone, PartialEq, Eq)]
19pub struct JobDefinitionCatalogSyncReport {
20 pub disabled_absent_job_types: Vec<JobTypeName>,
23 pub disabled_catalog_job_types: Vec<JobTypeName>,
26}
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum JobDefinitionCatalogSyncMode {
31 PreserveExistingEnabledForEnabledDefinitions,
37 RestoreCatalogEnabledState,
39}
40
41#[non_exhaustive]
43#[derive(Debug)]
44pub enum JobDefinitionCatalogSyncError {
45 ActiveScheduleForAbsentJobType(JobScheduleJobTypeReference),
48 ActiveScheduleForDisabledJobType(JobScheduleJobTypeReference),
50 CriticalSectionTimeoutFailure(Error),
52 ScheduleLockFailure(Error),
54 DefinitionLockFailure(Error),
56 ScheduleCheckFailure(Error),
58 ValidationFailure(Error),
60 DefinitionInspectFailure(Error),
62 DefinitionSyncFailure {
64 job_type: String,
66 source: Error,
68 },
69 DisableAbsentFailure(Error),
71}
72
73impl fmt::Display for JobDefinitionCatalogSyncError {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 match self {
76 Self::ActiveScheduleForAbsentJobType(reference) => write!(
77 f,
78 "active schedule {} still references absent catalog job type {}",
79 reference.schedule_name, reference.job_type
80 ),
81 Self::ActiveScheduleForDisabledJobType(reference) => write!(
82 f,
83 "active schedule {} still references disabled catalog job type {}",
84 reference.schedule_name, reference.job_type
85 ),
86 Self::CriticalSectionTimeoutFailure(error) => {
87 write!(
88 f,
89 "failed to bound job definition disable critical section: {error}"
90 )
91 }
92 Self::ScheduleLockFailure(error) => write!(
93 f,
94 "failed to lock job schedules before disabling job definitions: {error}"
95 ),
96 Self::DefinitionLockFailure(error) => write!(
97 f,
98 "failed to lock job definitions before disabling job definitions: {error}"
99 ),
100 Self::ScheduleCheckFailure(error) => write!(
101 f,
102 "failed to check active schedules before disabling job definitions: {error}"
103 ),
104 Self::ValidationFailure(error) => {
105 write!(f, "job definition catalog sync input is invalid: {error}")
106 }
107 Self::DefinitionInspectFailure(error) => {
108 write!(
109 f,
110 "failed to inspect job definitions before catalog sync: {error}"
111 )
112 }
113 Self::DefinitionSyncFailure { job_type, source } => {
114 write!(f, "failed to sync job definition {job_type}: {source}")
115 }
116 Self::DisableAbsentFailure(error) => {
117 write!(f, "failed to disable absent job definitions: {error}")
118 }
119 }
120 }
121}
122
123impl StdError for JobDefinitionCatalogSyncError {
124 fn source(&self) -> Option<&(dyn StdError + 'static)> {
125 match self {
126 Self::CriticalSectionTimeoutFailure(error)
127 | Self::ScheduleLockFailure(error)
128 | Self::DefinitionLockFailure(error)
129 | Self::ScheduleCheckFailure(error)
130 | Self::ValidationFailure(error)
131 | Self::DefinitionInspectFailure(error)
132 | Self::DefinitionSyncFailure { source: error, .. }
133 | Self::DisableAbsentFailure(error) => Some(error),
134 Self::ActiveScheduleForAbsentJobType(_) | Self::ActiveScheduleForDisabledJobType(_) => {
135 None
136 }
137 }
138 }
139}
140
141pub async fn sync_catalog_job_definitions_tx(
142 tx: &mut DbTx<'_>,
143 definitions: &[JobDefinitionUpsert<'_>],
144 mode: JobDefinitionCatalogSyncMode,
145) -> std::result::Result<JobDefinitionCatalogSyncReport, JobDefinitionCatalogSyncError> {
146 let disabled_job_types = definition_job_type_names(
147 definitions
148 .iter()
149 .filter(|definition| !definition.is_enabled),
150 )?;
151 let disabled_catalog_job_types = if disabled_job_types.is_empty() {
152 Vec::new()
153 } else {
154 prepare_definition_disable_critical_section_tx(tx).await?;
155 reject_active_schedules_for_disabled_job_types_tx(tx, &disabled_job_types).await?;
156 list_job_types_missing_or_enabled_definitions_tx(tx, &disabled_job_types)
160 .await
161 .map_err(JobDefinitionCatalogSyncError::DefinitionInspectFailure)?
162 };
163
164 for definition in definitions {
165 let upsert_result = match (mode, definition.is_enabled) {
166 (JobDefinitionCatalogSyncMode::PreserveExistingEnabledForEnabledDefinitions, true) => {
167 upsert_job_definition_preserving_enabled_tx(tx, definition).await
168 }
169 (JobDefinitionCatalogSyncMode::PreserveExistingEnabledForEnabledDefinitions, false)
170 | (JobDefinitionCatalogSyncMode::RestoreCatalogEnabledState, _) => {
171 upsert_job_definition_tx(tx, definition).await
172 }
173 };
174 upsert_result.map_err(
175 |source| JobDefinitionCatalogSyncError::DefinitionSyncFailure {
176 job_type: definition.job_type.as_str().to_owned(),
177 source,
178 },
179 )?;
180 }
181
182 Ok(JobDefinitionCatalogSyncReport {
183 disabled_absent_job_types: Vec::new(),
184 disabled_catalog_job_types,
185 })
186}
187
188pub async fn sync_catalog_job_definitions_exact_tx(
189 tx: &mut DbTx<'_>,
190 definitions: &[JobDefinitionUpsert<'_>],
191 scope_job_types: &[JobTypeName],
192) -> std::result::Result<JobDefinitionCatalogSyncReport, JobDefinitionCatalogSyncError> {
193 let catalog_job_types = definition_job_type_names(definitions.iter())?;
194 validate_non_empty_job_types("exact catalog sync job definitions", &catalog_job_types)
195 .map_err(JobDefinitionCatalogSyncError::ValidationFailure)?;
196 validate_non_empty_job_types("exact catalog sync scope", scope_job_types)
197 .map_err(JobDefinitionCatalogSyncError::ValidationFailure)?;
198
199 let disabled_job_types = definition_job_type_names(
200 definitions
201 .iter()
202 .filter(|definition| !definition.is_enabled),
203 )?;
204 let has_absent_scope_job_types = scope_job_types
205 .iter()
206 .any(|job_type| !catalog_job_types.contains(job_type));
207 let requires_disable_guard = !disabled_job_types.is_empty() || has_absent_scope_job_types;
208 if requires_disable_guard {
209 prepare_definition_disable_critical_section_tx(tx).await?;
210 }
211
212 let disabled_catalog_job_types = if disabled_job_types.is_empty() {
213 Vec::new()
214 } else {
215 reject_active_schedules_for_disabled_job_types_tx(tx, &disabled_job_types).await?;
216 list_job_types_missing_or_enabled_definitions_tx(tx, &disabled_job_types)
217 .await
218 .map_err(JobDefinitionCatalogSyncError::DefinitionInspectFailure)?
219 };
220
221 if has_absent_scope_job_types {
222 if let Some(reference) = find_active_schedule_for_enabled_absent_job_types_tx(
223 tx,
224 &catalog_job_types,
225 scope_job_types,
226 )
227 .await
228 .map_err(JobDefinitionCatalogSyncError::ScheduleCheckFailure)?
229 {
230 return Err(JobDefinitionCatalogSyncError::ActiveScheduleForAbsentJobType(reference));
231 }
232 }
233
234 for definition in definitions {
237 upsert_job_definition_tx(tx, definition)
238 .await
239 .map_err(
240 |source| JobDefinitionCatalogSyncError::DefinitionSyncFailure {
241 job_type: definition.job_type.as_str().to_owned(),
242 source,
243 },
244 )?;
245 }
246
247 let disabled_absent_job_types = if has_absent_scope_job_types {
248 disable_enabled_job_definitions_except_tx(tx, &catalog_job_types, scope_job_types)
249 .await
250 .map_err(JobDefinitionCatalogSyncError::DisableAbsentFailure)?
251 } else {
252 Vec::new()
253 };
254
255 Ok(JobDefinitionCatalogSyncReport {
256 disabled_absent_job_types,
257 disabled_catalog_job_types,
258 })
259}
260
261pub async fn upsert_job_definition_tx(
262 tx: &mut DbTx<'_>,
263 payload: &JobDefinitionUpsert<'_>,
264) -> Result<()> {
265 sqlx::query!(
266 "INSERT INTO job_definitions (
267 job_type,
268 version,
269 max_attempts,
270 default_timeout_seconds,
271 default_priority,
272 is_enabled
273 )
274 VALUES ($1, $2, $3, $4, $5, $6)
275 ON CONFLICT (job_type)
276 DO UPDATE
277 SET version = EXCLUDED.version,
278 max_attempts = EXCLUDED.max_attempts,
279 default_timeout_seconds = EXCLUDED.default_timeout_seconds,
280 default_priority = EXCLUDED.default_priority,
281 is_enabled = EXCLUDED.is_enabled,
282 updated_at = now()
283 WHERE job_definitions.version IS DISTINCT FROM EXCLUDED.version
284 OR job_definitions.max_attempts IS DISTINCT FROM EXCLUDED.max_attempts
285 OR job_definitions.default_timeout_seconds IS DISTINCT FROM EXCLUDED.default_timeout_seconds
286 OR job_definitions.default_priority IS DISTINCT FROM EXCLUDED.default_priority
287 OR job_definitions.is_enabled IS DISTINCT FROM EXCLUDED.is_enabled",
288 payload.job_type as _,
289 payload.version,
290 payload.max_attempts,
291 payload.default_timeout_seconds,
292 payload.default_priority,
293 payload.is_enabled,
294 )
295 .execute(&mut **tx)
296 .await
297 .map_err(|error| Error::from_query_sqlx_with_context("upsert job definition", error))?;
298
299 Ok(())
300}
301
302async fn upsert_job_definition_preserving_enabled_tx(
307 tx: &mut DbTx<'_>,
308 payload: &JobDefinitionUpsert<'_>,
309) -> Result<()> {
310 sqlx::query!(
311 "INSERT INTO job_definitions (
312 job_type,
313 version,
314 max_attempts,
315 default_timeout_seconds,
316 default_priority,
317 is_enabled
318 )
319 VALUES ($1, $2, $3, $4, $5, $6)
320 ON CONFLICT (job_type)
321 DO UPDATE
322 SET version = EXCLUDED.version,
323 max_attempts = EXCLUDED.max_attempts,
324 default_timeout_seconds = EXCLUDED.default_timeout_seconds,
325 default_priority = EXCLUDED.default_priority,
326 is_enabled = job_definitions.is_enabled,
327 updated_at = now()
328 WHERE job_definitions.version IS DISTINCT FROM EXCLUDED.version
329 OR job_definitions.max_attempts IS DISTINCT FROM EXCLUDED.max_attempts
330 OR job_definitions.default_timeout_seconds IS DISTINCT FROM EXCLUDED.default_timeout_seconds
331 OR job_definitions.default_priority IS DISTINCT FROM EXCLUDED.default_priority",
332 payload.job_type as _,
333 payload.version,
334 payload.max_attempts,
335 payload.default_timeout_seconds,
336 payload.default_priority,
337 payload.is_enabled,
339 )
340 .execute(&mut **tx)
341 .await
342 .map_err(|error| {
343 Error::from_query_sqlx_with_context("upsert job definition preserving enabled", error)
344 })?;
345
346 Ok(())
347}
348
349async fn lock_job_schedules_for_definition_disable_tx(tx: &mut DbTx<'_>) -> Result<()> {
350 let previous_lock_timeout = cap_local_lock_timeout_tx(
351 tx,
352 DEFINITION_DISABLE_LOCK_TIMEOUT,
353 DEFINITION_DISABLE_LOCK_TIMEOUT_MS,
354 "set job definition disable schedule lock timeout",
355 )
356 .await?;
357
358 let lock_result = sqlx::query!("LOCK TABLE job_schedules IN SHARE ROW EXCLUSIVE MODE")
359 .execute(&mut **tx)
360 .await;
361
362 match lock_result {
363 Ok(_) => {
364 set_local_lock_timeout_tx(
365 tx,
366 &previous_lock_timeout,
367 "restore job definition disable schedule lock timeout",
368 )
369 .await
370 }
371 Err(error) => {
372 Err(Error::from_query_sqlx_with_context(
375 "lock job schedules before disabling job definitions",
376 error,
377 ))
378 }
379 }
380}
381
382async fn lock_job_definitions_for_definition_disable_tx(tx: &mut DbTx<'_>) -> Result<()> {
383 let previous_lock_timeout = cap_local_lock_timeout_tx(
384 tx,
385 DEFINITION_DISABLE_LOCK_TIMEOUT,
386 DEFINITION_DISABLE_LOCK_TIMEOUT_MS,
387 "set job definition disable definition lock timeout",
388 )
389 .await?;
390
391 let lock_result = sqlx::query("LOCK TABLE job_definitions IN SHARE ROW EXCLUSIVE MODE")
392 .execute(&mut **tx)
393 .await;
394
395 match lock_result {
396 Ok(_) => {
397 set_local_lock_timeout_tx(
398 tx,
399 &previous_lock_timeout,
400 "restore job definition disable definition lock timeout",
401 )
402 .await
403 }
404 Err(error) => {
405 Err(Error::from_query_sqlx_with_context(
408 "lock job definitions before disabling job definitions",
409 error,
410 ))
411 }
412 }
413}
414
415async fn find_active_schedule_for_job_types_tx(
416 tx: &mut DbTx<'_>,
417 job_types: &[JobTypeName],
418) -> Result<Option<JobScheduleJobTypeReference>> {
419 let job_types = job_type_strings(job_types);
420 let row = sqlx::query!(
421 "SELECT name, job_type
422 FROM job_schedules
423 WHERE is_active = true
424 AND job_type = ANY($1::text[])
425 ORDER BY name ASC
426 LIMIT 1",
427 job_types.as_slice(),
428 )
429 .fetch_optional(&mut **tx)
430 .await
431 .map_err(|error| {
432 Error::from_query_sqlx_with_context("find active schedule for job definitions", error)
433 })?;
434
435 row.map(|row| parse_schedule_job_type_reference(row.name, row.job_type))
436 .transpose()
437}
438
439async fn find_active_schedule_for_enabled_absent_job_types_tx(
440 tx: &mut DbTx<'_>,
441 catalog_job_types: &[JobTypeName],
442 scope_job_types: &[JobTypeName],
443) -> Result<Option<JobScheduleJobTypeReference>> {
444 let catalog_job_types = job_type_strings(catalog_job_types);
445 let scope_job_types = job_type_strings(scope_job_types);
446 let row = sqlx::query!(
447 "SELECT job_schedules.name, job_schedules.job_type
448 FROM job_schedules
449 INNER JOIN job_definitions
450 ON job_definitions.job_type = job_schedules.job_type
451 WHERE job_schedules.is_active = true
452 AND job_schedules.job_type <> ALL($1::text[])
453 AND job_schedules.job_type = ANY($2::text[])
454 AND job_definitions.is_enabled = true
455 ORDER BY job_schedules.name ASC
456 LIMIT 1",
457 catalog_job_types.as_slice(),
458 scope_job_types.as_slice(),
459 )
460 .fetch_optional(&mut **tx)
461 .await
462 .map_err(|error| {
463 Error::from_query_sqlx_with_context(
464 "find active schedule for enabled absent job definitions",
465 error,
466 )
467 })?;
468
469 row.map(|row| parse_schedule_job_type_reference(row.name, row.job_type))
470 .transpose()
471}
472
473async fn list_job_types_missing_or_enabled_definitions_tx(
474 tx: &mut DbTx<'_>,
475 job_types: &[JobTypeName],
476) -> Result<Vec<JobTypeName>> {
477 let job_types = job_type_strings(job_types);
478 let rows = sqlx::query_scalar!(
479 "SELECT catalog.job_type as \"job_type!\"
480 FROM unnest($1::text[]) AS catalog(job_type)
481 LEFT JOIN job_definitions
482 ON job_definitions.job_type = catalog.job_type
483 WHERE job_definitions.job_type IS NULL
484 OR job_definitions.is_enabled = true",
485 job_types.as_slice(),
486 )
487 .fetch_all(&mut **tx)
488 .await
489 .map_err(|error| {
490 Error::from_query_sqlx_with_context("list missing or enabled job definitions", error)
491 })?;
492
493 parse_job_type_rows(rows)
494}
495
496async fn disable_enabled_job_definitions_except_tx(
497 tx: &mut DbTx<'_>,
498 keep_job_types: &[JobTypeName],
499 scope_job_types: &[JobTypeName],
500) -> Result<Vec<JobTypeName>> {
501 validate_non_empty_job_types("disable enabled job definitions keep list", keep_job_types)?;
502 validate_non_empty_job_types("disable enabled job definitions scope", scope_job_types)?;
503
504 let keep_job_types = job_type_strings(keep_job_types);
505 let scope_job_types = job_type_strings(scope_job_types);
506 let rows = sqlx::query_scalar!(
507 "UPDATE job_definitions
508 SET is_enabled = false,
509 updated_at = now()
510 WHERE is_enabled = true
511 AND job_type <> ALL($1::text[])
512 AND job_type = ANY($2::text[])
513 RETURNING job_type",
514 keep_job_types.as_slice(),
515 scope_job_types.as_slice(),
516 )
517 .fetch_all(&mut **tx)
518 .await
519 .map_err(|error| {
520 Error::from_query_sqlx_with_context("disable enabled job definitions except list", error)
521 })?;
522
523 parse_job_type_rows(rows)
524}
525
526pub async fn insert_job_definition_if_missing_tx(
527 tx: &mut DbTx<'_>,
528 payload: &JobDefinitionUpsert<'_>,
529) -> Result<()> {
530 sqlx::query!(
531 "INSERT INTO job_definitions (
532 job_type,
533 version,
534 max_attempts,
535 default_timeout_seconds,
536 default_priority,
537 is_enabled
538 )
539 VALUES ($1, $2, $3, $4, $5, $6)
540 ON CONFLICT (job_type)
541 DO NOTHING",
542 payload.job_type as _,
543 payload.version,
544 payload.max_attempts,
545 payload.default_timeout_seconds,
546 payload.default_priority,
547 payload.is_enabled,
548 )
549 .execute(&mut **tx)
550 .await
551 .map_err(|error| {
552 Error::from_query_sqlx_with_context("insert job definition if missing", error)
553 })?;
554
555 Ok(())
556}
557
558pub async fn list_job_definitions(
559 pool: &DbPool,
560 filter: &JobDefinitionListFilter<'_>,
561) -> Result<Vec<JobDefinitionRecord>> {
562 let escaped_job_type = filter.job_type.map(escape_ilike_pattern);
563
564 let rows = sqlx::query!(
565 "SELECT
566 job_type,
567 version,
568 max_attempts,
569 default_timeout_seconds,
570 default_priority,
571 is_enabled,
572 created_at,
573 updated_at
574 FROM job_definitions
575 WHERE ($1::text IS NULL OR job_type ILIKE '%' || $1 || '%')
576 ORDER BY job_type ASC
577 LIMIT $2
578 OFFSET $3",
579 escaped_job_type.as_deref(),
580 filter.limit,
581 filter.offset,
582 )
583 .fetch_all(pool)
584 .await
585 .map_err(|error| Error::from_query_sqlx_with_context("list job definitions", error))?;
586
587 rows.into_iter()
588 .map(|row| {
589 Ok(JobDefinitionRecord {
590 job_type: parse_job_type_name(row.job_type)?,
591 version: row.version,
592 max_attempts: row.max_attempts,
593 default_timeout_seconds: row.default_timeout_seconds,
594 default_priority: row.default_priority,
595 is_enabled: row.is_enabled,
596 created_at: row.created_at,
597 updated_at: row.updated_at,
598 })
599 })
600 .collect()
601}
602
603fn escape_ilike_pattern(input: &str) -> String {
604 input
605 .replace('\\', "\\\\")
606 .replace('%', "\\%")
607 .replace('_', "\\_")
608}
609
610fn job_type_strings(job_types: &[JobTypeName]) -> Vec<String> {
611 job_types
612 .iter()
613 .map(|job_type| job_type.as_str().to_owned())
614 .collect()
615}
616
617fn definition_job_type_names<'definition, 'payload, I>(
618 definitions: I,
619) -> std::result::Result<Vec<JobTypeName>, JobDefinitionCatalogSyncError>
620where
621 'payload: 'definition,
622 I: IntoIterator<Item = &'definition JobDefinitionUpsert<'payload>>,
623{
624 let mut job_types = definitions
627 .into_iter()
628 .map(|definition| parse_job_type_name(definition.job_type.as_str().to_owned()))
629 .collect::<Result<Vec<_>>>()
630 .map_err(JobDefinitionCatalogSyncError::DefinitionInspectFailure)?;
631 job_types.sort();
632 Ok(job_types)
633}
634
635async fn prepare_definition_disable_critical_section_tx(
636 tx: &mut DbTx<'_>,
637) -> std::result::Result<(), JobDefinitionCatalogSyncError> {
638 cap_local_statement_timeout_tx(
639 tx,
640 DEFINITION_DISABLE_STATEMENT_TIMEOUT,
641 DEFINITION_DISABLE_STATEMENT_TIMEOUT_MS,
642 "set job definition disable statement timeout",
643 )
644 .await
645 .map_err(JobDefinitionCatalogSyncError::CriticalSectionTimeoutFailure)?;
646 lock_job_schedules_for_definition_disable_tx(tx)
647 .await
648 .map_err(JobDefinitionCatalogSyncError::ScheduleLockFailure)?;
649 lock_job_definitions_for_definition_disable_tx(tx)
650 .await
651 .map_err(JobDefinitionCatalogSyncError::DefinitionLockFailure)
652}
653
654async fn reject_active_schedules_for_disabled_job_types_tx(
655 tx: &mut DbTx<'_>,
656 job_types: &[JobTypeName],
657) -> std::result::Result<(), JobDefinitionCatalogSyncError> {
658 if let Some(reference) = find_active_schedule_for_job_types_tx(tx, job_types)
659 .await
660 .map_err(JobDefinitionCatalogSyncError::ScheduleCheckFailure)?
661 {
662 return Err(JobDefinitionCatalogSyncError::ActiveScheduleForDisabledJobType(reference));
663 }
664
665 Ok(())
666}
667
668fn parse_schedule_job_type_reference(
669 schedule_name: String,
670 job_type: String,
671) -> Result<JobScheduleJobTypeReference> {
672 Ok(JobScheduleJobTypeReference {
673 schedule_name,
674 job_type: parse_job_type_name(job_type)?,
675 })
676}
677
678fn parse_job_type_rows(rows: Vec<String>) -> Result<Vec<JobTypeName>> {
679 let mut job_types = rows
680 .into_iter()
681 .map(parse_job_type_name)
682 .collect::<Result<Vec<_>>>()?;
683 job_types.sort();
684 Ok(job_types)
685}
686
687fn validate_non_empty_job_types(context: &'static str, job_types: &[JobTypeName]) -> Result<()> {
688 if job_types.is_empty() {
689 return Err(Error::QueryError(QueryError::from_classified(
690 QueryErrorCategory::Validation,
691 "job_definition.empty_job_type_list",
692 "Job type list must not be empty.",
693 format!("{context}: job type list must not be empty"),
694 )));
695 }
696 Ok(())
697}
698
699async fn cap_local_lock_timeout_tx(
700 tx: &mut DbTx<'_>,
701 lock_timeout: &str,
702 lock_timeout_ms: i64,
703 context: &'static str,
704) -> Result<String> {
705 sqlx::query_scalar::<_, String>(
708 "WITH previous AS MATERIALIZED (
709 SELECT
710 current_setting('lock_timeout') AS lock_timeout,
711 setting::bigint AS lock_timeout_ms
712 FROM pg_settings
713 WHERE name = 'lock_timeout'
714 )
715 SELECT previous.lock_timeout
716 FROM previous,
717 LATERAL (
718 SELECT set_config(
719 'lock_timeout',
720 CASE
721 WHEN previous.lock_timeout_ms = 0 THEN $1
722 WHEN previous.lock_timeout_ms <= $2 THEN previous.lock_timeout
723 ELSE $1
724 END,
725 true
726 )
727 ) AS applied",
728 )
729 .bind(lock_timeout)
730 .bind(lock_timeout_ms)
731 .fetch_one(&mut **tx)
732 .await
733 .map_err(|error| Error::from_query_sqlx_with_context(context, error))
734}
735
736async fn cap_local_statement_timeout_tx(
737 tx: &mut DbTx<'_>,
738 statement_timeout: &str,
739 statement_timeout_ms: i64,
740 context: &'static str,
741) -> Result<String> {
742 sqlx::query_scalar::<_, String>(
746 "WITH previous AS MATERIALIZED (
747 SELECT
748 current_setting('statement_timeout') AS statement_timeout,
749 setting::bigint AS statement_timeout_ms
750 FROM pg_settings
751 WHERE name = 'statement_timeout'
752 )
753 SELECT previous.statement_timeout
754 FROM previous,
755 LATERAL (
756 SELECT set_config(
757 'statement_timeout',
758 CASE
759 WHEN previous.statement_timeout_ms = 0 THEN $1
760 WHEN previous.statement_timeout_ms <= $2 THEN previous.statement_timeout
761 ELSE $1
762 END,
763 true
764 )
765 ) AS applied",
766 )
767 .bind(statement_timeout)
768 .bind(statement_timeout_ms)
769 .fetch_one(&mut **tx)
770 .await
771 .map_err(|error| Error::from_query_sqlx_with_context(context, error))
772}
773
774async fn set_local_lock_timeout_tx(
775 tx: &mut DbTx<'_>,
776 lock_timeout: &str,
777 context: &'static str,
778) -> Result<()> {
779 sqlx::query_scalar::<_, String>("SELECT set_config('lock_timeout', $1, true)")
780 .bind(lock_timeout)
781 .fetch_one(&mut **tx)
782 .await
783 .map_err(|error| Error::from_query_sqlx_with_context(context, error))?;
784
785 Ok(())
786}
787
788pub async fn get_job_definition_by_type(
789 pool: &DbPool,
790 job_type: JobType<'_>,
791) -> Result<Option<JobDefinitionRecord>> {
792 let row = sqlx::query!(
793 "SELECT
794 job_type,
795 version,
796 max_attempts,
797 default_timeout_seconds,
798 default_priority,
799 is_enabled,
800 created_at,
801 updated_at
802 FROM job_definitions
803 WHERE job_type = $1
804 LIMIT 1",
805 job_type as _,
806 )
807 .fetch_optional(pool)
808 .await
809 .map_err(|error| Error::from_query_sqlx_with_context("get job definition by type", error))?;
810
811 row.map(|row| {
812 Ok(JobDefinitionRecord {
813 job_type: parse_job_type_name(row.job_type)?,
814 version: row.version,
815 max_attempts: row.max_attempts,
816 default_timeout_seconds: row.default_timeout_seconds,
817 default_priority: row.default_priority,
818 is_enabled: row.is_enabled,
819 created_at: row.created_at,
820 updated_at: row.updated_at,
821 })
822 })
823 .transpose()
824}
825
826pub async fn update_job_definition(
827 pool: &DbPool,
828 job_type: JobType<'_>,
829 payload: &JobDefinitionUpdate,
830) -> Result<Option<JobDefinitionRecord>> {
831 let row = sqlx::query!(
832 "UPDATE job_definitions
833 SET max_attempts = COALESCE($2, max_attempts),
834 default_timeout_seconds = COALESCE($3, default_timeout_seconds),
835 default_priority = COALESCE($4, default_priority),
836 is_enabled = COALESCE($5, is_enabled),
837 updated_at = now()
838 WHERE job_type = $1
839 RETURNING
840 job_type,
841 version,
842 max_attempts,
843 default_timeout_seconds,
844 default_priority,
845 is_enabled,
846 created_at,
847 updated_at",
848 job_type as _,
849 payload.max_attempts,
850 payload.default_timeout_seconds,
851 payload.default_priority,
852 payload.is_enabled,
853 )
854 .fetch_optional(pool)
855 .await
856 .map_err(|error| Error::from_query_sqlx_with_context("update job definition", error))?;
857
858 row.map(|row| {
859 Ok(JobDefinitionRecord {
860 job_type: parse_job_type_name(row.job_type)?,
861 version: row.version,
862 max_attempts: row.max_attempts,
863 default_timeout_seconds: row.default_timeout_seconds,
864 default_priority: row.default_priority,
865 is_enabled: row.is_enabled,
866 created_at: row.created_at,
867 updated_at: row.updated_at,
868 })
869 })
870 .transpose()
871}