Skip to main content

runledger_postgres/jobs/workflows/
enqueue.rs

1use runledger_core::jobs::{
2    WorkflowDependencyReleaseMode, WorkflowRunEnqueue, validate_workflow_run_enqueue,
3};
4use serde::Serialize;
5use serde_json::Value as JsonValue;
6use sqlx::types::Uuid;
7
8use crate::jobs::transaction_isolation::ensure_read_committed_tx;
9use crate::{DbPool, DbTx, Error, Result};
10
11use super::super::row_decode::{
12    parse_job_stage, parse_job_type_name, parse_workflow_run_status,
13    parse_workflow_step_execution_kind, parse_workflow_type_name,
14};
15use super::super::workflow_types::WorkflowRunDbRecord;
16use super::errors::{
17    workflow_enqueue_conflicting_retry_error, workflow_internal_state_error,
18    workflow_legacy_idempotency_snapshot_missing_error,
19};
20use super::read::load_workflow_run_by_id_tx;
21use super::release::{StepReleaseCandidate, StepReleaseCandidateInit, release_candidate_step_tx};
22use super::runtime::recompute_workflow_run_statuses_tx;
23use super::steps::{
24    fetch_job_definition_defaults_tx, insert_workflow_step_dependencies_tx,
25    insert_workflow_steps_tx, workflow_step_effective_organization_id,
26    workflow_step_effective_stage,
27};
28use super::validation::workflow_dag_validation_error;
29
30struct WorkflowRunInsertOutcome {
31    record: WorkflowRunDbRecord,
32    inserted: bool,
33}
34
35#[derive(sqlx::FromRow)]
36struct WorkflowRunRow {
37    id: Uuid,
38    workflow_type: String,
39    organization_id: Option<Uuid>,
40    status: String,
41    idempotency_key: Option<String>,
42    metadata: JsonValue,
43    enqueue_request_matches: Option<bool>,
44    started_at: chrono::DateTime<chrono::Utc>,
45    finished_at: Option<chrono::DateTime<chrono::Utc>>,
46    created_at: chrono::DateTime<chrono::Utc>,
47    updated_at: chrono::DateTime<chrono::Utc>,
48}
49
50#[derive(Serialize)]
51struct CanonicalWorkflowRunEnqueueRequest<'a> {
52    metadata: &'a JsonValue,
53    steps: Vec<CanonicalWorkflowStep<'a>>,
54}
55
56#[derive(Serialize)]
57struct CanonicalWorkflowStep<'a> {
58    step_key: &'a str,
59    execution_kind: &'static str,
60    job_type: Option<&'a str>,
61    organization_id: Option<Uuid>,
62    payload: &'a JsonValue,
63    priority: Option<i32>,
64    max_attempts: Option<i32>,
65    timeout_seconds: Option<i32>,
66    stage: Option<&'static str>,
67    dependencies: Vec<CanonicalWorkflowDependency<'a>>,
68}
69
70#[derive(Serialize)]
71struct CanonicalWorkflowDependency<'a> {
72    prerequisite_step_key: &'a str,
73    release_mode: &'static str,
74}
75
76/// Enqueues a workflow run in its own transaction.
77///
78/// Calls without an idempotency key always create a new workflow run. Calls
79/// with an idempotency key return the existing run only when the canonical
80/// enqueue request snapshot matches. Keyed rows without snapshots are rejected
81/// by the idempotency cutover.
82pub async fn enqueue_workflow_run(
83    pool: &DbPool,
84    payload: &WorkflowRunEnqueue<'_>,
85) -> Result<WorkflowRunDbRecord> {
86    let mut tx = pool
87        .begin()
88        .await
89        .map_err(|error| Error::ConnectionError(error.to_string()))?;
90    let workflow_run = enqueue_workflow_run_tx(&mut tx, payload).await?;
91    tx.commit()
92        .await
93        .map_err(|error| Error::ConnectionError(error.to_string()))?;
94    Ok(workflow_run)
95}
96
97/// Enqueues a workflow run and returns the existing run for an identical keyed retry.
98///
99/// Idempotency is strict for the submitted request snapshot. The snapshot is
100/// compared instead of live workflow step rows because steps and dependencies
101/// can be legitimately appended or mutated after initial enqueue. Strict
102/// workflow idempotency applies to keyed rows with an `enqueue_request`
103/// snapshot; unkeyed rows do not store snapshots, and keyed rows without
104/// snapshots are rejected by the idempotency cutover.
105/// Job-step stage is part of the canonical initial request after normalizing an
106/// omitted stage to `Queued`; changing the requested initial stage is treated as
107/// a different enqueue request.
108pub async fn enqueue_workflow_run_tx(
109    tx: &mut DbTx<'_>,
110    payload: &WorkflowRunEnqueue<'_>,
111) -> Result<WorkflowRunDbRecord> {
112    validate_workflow_run_enqueue(payload).map_err(workflow_dag_validation_error)?;
113    if payload.idempotency_key().is_some() {
114        ensure_read_committed_tx(
115            tx,
116            "workflow idempotent enqueue",
117            "workflow.enqueue_idempotency_unsupported_isolation",
118            "Workflow idempotent enqueue requires READ COMMITTED transaction isolation.",
119        )
120        .await?;
121    }
122
123    let workflow_run_insert = insert_workflow_run_record_tx(tx, payload).await?;
124    let workflow_run = workflow_run_insert.record;
125    if !workflow_run_insert.inserted {
126        // Existing idempotent runs already have their steps and initial releases
127        // committed; never replay workflow initialization for a retry.
128        return Ok(workflow_run);
129    }
130
131    let defaults_by_job_type = fetch_job_definition_defaults_tx(tx, payload.steps()).await?;
132    let step_id_by_key =
133        insert_workflow_steps_tx(tx, payload, workflow_run.id, &defaults_by_job_type).await?;
134    insert_workflow_step_dependencies_tx(tx, payload, workflow_run.id, &step_id_by_key).await?;
135
136    enqueue_root_steps_tx(tx, workflow_run.id).await?;
137    recompute_workflow_run_statuses_tx(tx, &std::collections::BTreeSet::from([workflow_run.id]))
138        .await?;
139
140    load_workflow_run_by_id_tx(
141        tx,
142        workflow_run.id,
143        "load workflow run after enqueue recompute",
144    )
145    .await
146}
147
148async fn insert_workflow_run_record_tx(
149    tx: &mut DbTx<'_>,
150    payload: &WorkflowRunEnqueue<'_>,
151) -> Result<WorkflowRunInsertOutcome> {
152    let enqueue_request = payload
153        .idempotency_key()
154        .map(|_| canonical_workflow_enqueue_request(payload))
155        .transpose()?;
156    // The conflict clause is selected from static literals only; all request
157    // data remains bound below. This dynamic SQL is not SQLx macro-checked, so
158    // keep the returned columns and bind order aligned with WorkflowRunRow and
159    // the workflow_runs insert list.
160    let insert_sql = format!(
161        "INSERT INTO workflow_runs (
162            workflow_type,
163            organization_id,
164            status,
165            idempotency_key,
166            metadata,
167            enqueue_request,
168            started_at
169         )
170         VALUES ($1, $2, 'RUNNING', $3, $4::jsonb, $5::jsonb, now())
171         {}
172         RETURNING
173            id,
174            workflow_type,
175            organization_id,
176            status::text AS status,
177            idempotency_key,
178            metadata,
179            NULL::boolean AS enqueue_request_matches,
180            started_at,
181            finished_at,
182            created_at,
183            updated_at",
184        enqueue_workflow_run_idempotency_conflict_clause(payload),
185    );
186    let run_row = sqlx::query_as::<_, WorkflowRunRow>(&insert_sql)
187        .bind(payload.workflow_type())
188        .bind(payload.organization_id())
189        .bind(payload.idempotency_key())
190        .bind(payload.metadata())
191        .bind(enqueue_request.as_ref())
192        .fetch_optional(&mut **tx)
193        .await
194        .map_err(|error| Error::from_query_sqlx_with_context("enqueue workflow run", error))?;
195
196    if let Some(run_row) = run_row {
197        return Ok(WorkflowRunInsertOutcome {
198            record: workflow_run_record_from_row(run_row)?,
199            inserted: true,
200        });
201    }
202
203    let (Some(idempotency_key), Some(enqueue_request)) =
204        (payload.idempotency_key(), enqueue_request.as_ref())
205    else {
206        return Err(workflow_internal_state_error(
207            "workflow run insert returned no row without an idempotency key conflict",
208        ));
209    };
210
211    let existing =
212        load_existing_idempotent_workflow_run_tx(tx, payload, idempotency_key, enqueue_request)
213            .await?;
214    validate_existing_idempotent_workflow_run(&existing)?;
215
216    Ok(WorkflowRunInsertOutcome {
217        record: workflow_run_record_from_row(existing)?,
218        inserted: false,
219    })
220}
221
222fn workflow_run_record_from_row(run_row: WorkflowRunRow) -> Result<WorkflowRunDbRecord> {
223    // The retry-match flags are only validation scratch fields for idempotent
224    // conflict resolution; they are not part of the persisted public run record.
225    Ok(WorkflowRunDbRecord {
226        id: run_row.id,
227        workflow_type: parse_workflow_type_name(run_row.workflow_type)?,
228        organization_id: run_row.organization_id,
229        status: parse_workflow_run_status(run_row.status)?,
230        idempotency_key: run_row.idempotency_key,
231        metadata: run_row.metadata,
232        started_at: run_row.started_at,
233        finished_at: run_row.finished_at,
234        created_at: run_row.created_at,
235        updated_at: run_row.updated_at,
236    })
237}
238
239async fn load_existing_idempotent_workflow_run_tx(
240    tx: &mut DbTx<'_>,
241    payload: &WorkflowRunEnqueue<'_>,
242    idempotency_key: &str,
243    enqueue_request: &JsonValue,
244) -> Result<WorkflowRunRow> {
245    // After INSERT ... ON CONFLICT reports an existing keyed run, this locks the
246    // matched committed row while the enqueue transaction compares and returns
247    // the idempotent result.
248    let run = if let Some(organization_id) = payload.organization_id() {
249        sqlx::query_as!(
250            WorkflowRunRow,
251            r#"SELECT
252                id,
253                workflow_type,
254                organization_id,
255                status::text AS "status!",
256                idempotency_key,
257                metadata,
258                enqueue_request = $4::jsonb AS "enqueue_request_matches?",
259                started_at,
260                finished_at,
261                created_at,
262                updated_at
263             FROM workflow_runs
264             WHERE workflow_type = $1
265               AND organization_id = $2
266               AND idempotency_key = $3
267             LIMIT 1
268             FOR SHARE"#,
269            payload.workflow_type() as _,
270            organization_id,
271            idempotency_key,
272            enqueue_request,
273        )
274        .fetch_optional(&mut **tx)
275        .await
276    } else {
277        sqlx::query_as!(
278            WorkflowRunRow,
279            r#"SELECT
280                id,
281                workflow_type,
282                organization_id,
283                status::text AS "status!",
284                idempotency_key,
285                metadata,
286                enqueue_request = $3::jsonb AS "enqueue_request_matches?",
287                started_at,
288                finished_at,
289                created_at,
290                updated_at
291             FROM workflow_runs
292             WHERE workflow_type = $1
293               AND organization_id IS NULL
294               AND idempotency_key = $2
295             LIMIT 1
296             FOR SHARE"#,
297            payload.workflow_type() as _,
298            idempotency_key,
299            enqueue_request,
300        )
301        .fetch_optional(&mut **tx)
302        .await
303    };
304
305    run.map_err(|error| {
306        Error::from_query_sqlx_with_context("load idempotent workflow enqueue", error)
307    })?
308    .ok_or_else(|| {
309        workflow_internal_state_error(
310            "workflow run insert conflicted but matching idempotent workflow run was not found",
311        )
312    })
313}
314
315fn enqueue_workflow_run_idempotency_conflict_clause(
316    payload: &WorkflowRunEnqueue<'_>,
317) -> &'static str {
318    // Keep these predicates aligned with the partial unique indexes
319    // uq_workflow_runs_type_idempotency_org and uq_workflow_runs_type_idempotency_global.
320    match (payload.idempotency_key(), payload.organization_id()) {
321        (Some(_), Some(_)) => {
322            "ON CONFLICT (workflow_type, organization_id, idempotency_key)
323             WHERE idempotency_key IS NOT NULL
324               AND organization_id IS NOT NULL
325             DO NOTHING"
326        }
327        (Some(_), None) => {
328            "ON CONFLICT (workflow_type, idempotency_key)
329             WHERE idempotency_key IS NOT NULL
330               AND organization_id IS NULL
331             DO NOTHING"
332        }
333        (None, _) => "",
334    }
335}
336
337fn validate_existing_idempotent_workflow_run(existing: &WorkflowRunRow) -> Result<()> {
338    match existing.enqueue_request_matches {
339        Some(true) => Ok(()),
340        Some(false) => Err(workflow_enqueue_conflicting_retry_error("request")),
341        None => Err(workflow_legacy_idempotency_snapshot_missing_error(
342            existing.workflow_type.as_str(),
343            existing.id,
344        )),
345    }
346}
347
348fn canonical_workflow_enqueue_request(payload: &WorkflowRunEnqueue<'_>) -> Result<JsonValue> {
349    let mut steps = payload
350        .steps()
351        .iter()
352        .map(|step| {
353            let mut dependencies = step
354                .dependencies()
355                .iter()
356                .map(|dependency| CanonicalWorkflowDependency {
357                    prerequisite_step_key: dependency.prerequisite_step_key.as_str(),
358                    release_mode: dependency
359                        .release_mode
360                        .unwrap_or(WorkflowDependencyReleaseMode::OnTerminal)
361                        .as_db_value(),
362                })
363                .collect::<Vec<_>>();
364            dependencies.sort_by(|left, right| {
365                left.prerequisite_step_key
366                    .cmp(right.prerequisite_step_key)
367                    .then(left.release_mode.cmp(right.release_mode))
368            });
369
370            CanonicalWorkflowStep {
371                step_key: step.step_key().as_str(),
372                execution_kind: step.execution_kind().as_db_value(),
373                job_type: step.job_type().map(|job_type| job_type.as_str()),
374                organization_id: workflow_step_effective_organization_id(
375                    payload.organization_id(),
376                    step,
377                ),
378                payload: step.payload(),
379                priority: step.priority(),
380                max_attempts: step.max_attempts(),
381                timeout_seconds: step.timeout_seconds(),
382                stage: workflow_step_effective_stage(step),
383                dependencies,
384            }
385        })
386        .collect::<Vec<_>>();
387    steps.sort_by(|left, right| left.step_key.cmp(right.step_key));
388
389    serde_json::to_value(CanonicalWorkflowRunEnqueueRequest {
390        metadata: payload.metadata(),
391        steps,
392    })
393    .map_err(|error| {
394        workflow_internal_state_error(format!(
395            "failed to serialize canonical workflow enqueue request: {error}"
396        ))
397    })
398}
399
400pub(crate) async fn enqueue_root_steps_tx(tx: &mut DbTx<'_>, workflow_run_id: Uuid) -> Result<()> {
401    let rows = sqlx::query!(
402        "SELECT
403            id,
404            workflow_run_id,
405            execution_kind::text AS \"execution_kind!\",
406            job_type,
407            organization_id,
408            payload,
409            priority,
410            max_attempts,
411            timeout_seconds,
412            stage
413         FROM workflow_steps
414         WHERE workflow_run_id = $1
415           AND status = 'BLOCKED'
416           AND dependency_count_pending = 0
417         ORDER BY created_at ASC
418         FOR UPDATE",
419        workflow_run_id,
420    )
421    .fetch_all(&mut **tx)
422    .await
423    .map_err(|error| {
424        Error::from_query_sqlx_with_context("lookup root workflow steps for enqueue", error)
425    })?;
426
427    for row in rows {
428        let candidate = StepReleaseCandidate::from_init(StepReleaseCandidateInit {
429            id: row.id,
430            workflow_run_id: row.workflow_run_id,
431            execution_kind: parse_workflow_step_execution_kind(row.execution_kind)?,
432            job_type: row.job_type.map(parse_job_type_name).transpose()?,
433            organization_id: row.organization_id,
434            payload: row.payload,
435            priority: row.priority,
436            max_attempts: row.max_attempts,
437            timeout_seconds: row.timeout_seconds,
438            stage: row.stage.map(parse_job_stage).transpose()?,
439        });
440        release_candidate_step_tx(tx, &candidate).await?;
441    }
442
443    Ok(())
444}
445
446#[cfg(test)]
447mod tests {
448    use runledger_core::jobs::{
449        JobStage, JobType, StepKey, WorkflowRunEnqueueBuilder, WorkflowStepEnqueueBuilder,
450        WorkflowType,
451    };
452    use serde_json::json;
453    use sqlx::types::Uuid;
454
455    use super::canonical_workflow_enqueue_request;
456
457    #[test]
458    fn canonical_workflow_enqueue_request_matches_golden_snapshot() {
459        let run_org = Uuid::now_v7();
460        let step_org = Uuid::now_v7();
461        let metadata = json!({"kind": "golden"});
462        let root_payload = json!({"step": "root"});
463        let child_payload = json!({"step": "child"});
464        let root = WorkflowStepEnqueueBuilder::new_external(StepKey::new("root"), &root_payload)
465            .try_build()
466            .expect("build root step");
467        let child = WorkflowStepEnqueueBuilder::new(
468            StepKey::new("child"),
469            JobType::new("jobs.test.child"),
470            &child_payload,
471        )
472        .organization_id(step_org)
473        .priority(7)
474        .max_attempts(2)
475        .timeout_seconds(45)
476        .stage(JobStage::Scheduled)
477        .depends_on_success(&[StepKey::new("root")])
478        .try_build()
479        .expect("build child step");
480        let workflow =
481            WorkflowRunEnqueueBuilder::new(WorkflowType::new("workflow.test.golden"), &metadata)
482                .organization_id(run_org)
483                .step(child)
484                .step(root)
485                .try_build()
486                .expect("build workflow");
487
488        let canonical =
489            canonical_workflow_enqueue_request(&workflow).expect("canonicalize workflow enqueue");
490
491        assert_eq!(
492            canonical,
493            json!({
494                "metadata": {"kind": "golden"},
495                "steps": [
496                    {
497                        "step_key": "child",
498                        "execution_kind": "JOB",
499                        "job_type": "jobs.test.child",
500                        "organization_id": step_org,
501                        "payload": {"step": "child"},
502                        "priority": 7,
503                        "max_attempts": 2,
504                        "timeout_seconds": 45,
505                        "stage": "scheduled",
506                        "dependencies": [
507                            {
508                                "prerequisite_step_key": "root",
509                                "release_mode": "ON_SUCCESS"
510                            }
511                        ]
512                    },
513                    {
514                        "step_key": "root",
515                        "execution_kind": "EXTERNAL",
516                        "job_type": null,
517                        "organization_id": run_org,
518                        "payload": {"step": "root"},
519                        "priority": null,
520                        "max_attempts": null,
521                        "timeout_seconds": null,
522                        "stage": null,
523                        "dependencies": []
524                    }
525                ]
526            })
527        );
528    }
529}