Skip to main content

runledger_postgres/jobs/workflows/
enqueue.rs

1use runledger_core::jobs::{
2    WorkflowDependencyReleaseMode, WorkflowRunEnqueue, validate_workflow_run_enqueue,
3};
4use serde::Serialize;
5use serde_json::Value as JsonValue;
6use sqlx::types::Uuid;
7
8use crate::jobs::transaction_isolation::ensure_read_committed_tx;
9use crate::{DbPool, DbTx, Error, Result};
10
11use super::super::row_decode::{
12    parse_job_stage, parse_job_type_name, parse_workflow_run_status,
13    parse_workflow_step_execution_kind, parse_workflow_type_name,
14};
15use super::super::workflow_types::WorkflowRunDbRecord;
16use super::errors::{
17    workflow_enqueue_conflicting_retry_error, workflow_internal_state_error,
18    workflow_legacy_idempotency_snapshot_missing_error,
19};
20use super::read::load_workflow_run_by_id_tx;
21use super::release::{StepReleaseCandidate, StepReleaseCandidateInit, release_candidate_step_tx};
22use super::runtime::recompute_workflow_run_statuses_tx;
23use super::steps::{
24    fetch_job_definition_defaults_tx, insert_workflow_step_dependencies_tx,
25    insert_workflow_steps_tx, workflow_step_effective_organization_id,
26    workflow_step_effective_stage,
27};
28use super::validation::workflow_dag_validation_error;
29
30struct WorkflowRunInsertOutcome {
31    record: WorkflowRunDbRecord,
32    inserted: bool,
33}
34
35#[derive(sqlx::FromRow)]
36struct WorkflowRunRow {
37    id: Uuid,
38    workflow_type: String,
39    organization_id: Option<Uuid>,
40    status: String,
41    idempotency_key: Option<String>,
42    metadata: JsonValue,
43    enqueue_request_matches: Option<bool>,
44    started_at: chrono::DateTime<chrono::Utc>,
45    finished_at: Option<chrono::DateTime<chrono::Utc>>,
46    created_at: chrono::DateTime<chrono::Utc>,
47    updated_at: chrono::DateTime<chrono::Utc>,
48}
49
50#[derive(Serialize)]
51struct CanonicalWorkflowRunEnqueueRequest<'a> {
52    metadata: &'a JsonValue,
53    steps: Vec<CanonicalWorkflowStep<'a>>,
54}
55
56#[derive(Serialize)]
57struct CanonicalWorkflowStep<'a> {
58    step_key: &'a str,
59    execution_kind: &'static str,
60    job_type: Option<&'a str>,
61    organization_id: Option<Uuid>,
62    payload: &'a JsonValue,
63    priority: Option<i32>,
64    max_attempts: Option<i32>,
65    timeout_seconds: Option<i32>,
66    stage: Option<&'static str>,
67    dependencies: Vec<CanonicalWorkflowDependency<'a>>,
68}
69
70#[derive(Serialize)]
71struct CanonicalWorkflowDependency<'a> {
72    prerequisite_step_key: &'a str,
73    release_mode: &'static str,
74}
75
76/// Enqueues a workflow run in its own transaction.
77///
78/// Use this API for multi-step work with dependencies, fan-out/fan-in, external
79/// gates, cancellation as one logical run, or workflow-level idempotency. Build
80/// the payload with `WorkflowRunEnqueueBuilder` and
81/// `WorkflowStepEnqueueBuilder`.
82///
83/// Calls without an idempotency key always create a new workflow run. Calls
84/// with an idempotency key return the existing run only when the canonical
85/// enqueue request snapshot matches. Keyed rows without snapshots are rejected
86/// by the idempotency cutover.
87#[doc(alias = "dag")]
88#[doc(alias = "orchestration")]
89#[doc(alias = "dependencies")]
90pub async fn enqueue_workflow_run(
91    pool: &DbPool,
92    payload: &WorkflowRunEnqueue<'_>,
93) -> Result<WorkflowRunDbRecord> {
94    let mut tx = pool
95        .begin()
96        .await
97        .map_err(|error| Error::ConnectionError(error.to_string()))?;
98    let workflow_run = enqueue_workflow_run_tx(&mut tx, payload).await?;
99    tx.commit()
100        .await
101        .map_err(|error| Error::ConnectionError(error.to_string()))?;
102    Ok(workflow_run)
103}
104
105/// Enqueues a workflow run and returns the existing run for an identical keyed retry.
106///
107/// Use this API when composing a workflow enqueue with other database writes in
108/// one transaction. For ordinary dependent work, prefer this workflow path over
109/// direct-job polling or handler-chained follow-up jobs.
110///
111/// Idempotency is strict for the submitted request snapshot. The snapshot is
112/// compared instead of live workflow step rows because steps and dependencies
113/// can be legitimately appended or mutated after initial enqueue. Strict
114/// workflow idempotency applies to keyed rows with an `enqueue_request`
115/// snapshot; unkeyed rows do not store snapshots, and keyed rows without
116/// snapshots are rejected by the idempotency cutover.
117/// Job-step stage is part of the canonical initial request after normalizing an
118/// omitted stage to `Queued`; changing the requested initial stage is treated as
119/// a different enqueue request.
120#[doc(alias = "dag")]
121#[doc(alias = "orchestration")]
122#[doc(alias = "dependencies")]
123pub async fn enqueue_workflow_run_tx(
124    tx: &mut DbTx<'_>,
125    payload: &WorkflowRunEnqueue<'_>,
126) -> Result<WorkflowRunDbRecord> {
127    validate_workflow_run_enqueue(payload).map_err(workflow_dag_validation_error)?;
128    if payload.idempotency_key().is_some() {
129        ensure_read_committed_tx(
130            tx,
131            "workflow idempotent enqueue",
132            "workflow.enqueue_idempotency_unsupported_isolation",
133            "Workflow idempotent enqueue requires READ COMMITTED transaction isolation.",
134        )
135        .await?;
136    }
137
138    let workflow_run_insert = insert_workflow_run_record_tx(tx, payload).await?;
139    let workflow_run = workflow_run_insert.record;
140    if !workflow_run_insert.inserted {
141        // Existing idempotent runs already have their steps and initial releases
142        // committed; never replay workflow initialization for a retry.
143        return Ok(workflow_run);
144    }
145
146    let defaults_by_job_type = fetch_job_definition_defaults_tx(tx, payload.steps()).await?;
147    let step_id_by_key =
148        insert_workflow_steps_tx(tx, payload, workflow_run.id, &defaults_by_job_type).await?;
149    insert_workflow_step_dependencies_tx(tx, payload, workflow_run.id, &step_id_by_key).await?;
150
151    enqueue_root_steps_tx(tx, workflow_run.id).await?;
152    recompute_workflow_run_statuses_tx(tx, &std::collections::BTreeSet::from([workflow_run.id]))
153        .await?;
154
155    load_workflow_run_by_id_tx(
156        tx,
157        workflow_run.id,
158        "load workflow run after enqueue recompute",
159    )
160    .await
161}
162
163async fn insert_workflow_run_record_tx(
164    tx: &mut DbTx<'_>,
165    payload: &WorkflowRunEnqueue<'_>,
166) -> Result<WorkflowRunInsertOutcome> {
167    let enqueue_request = payload
168        .idempotency_key()
169        .map(|_| canonical_workflow_enqueue_request(payload))
170        .transpose()?;
171    // The conflict clause is selected from static literals only; all request
172    // data remains bound below. This dynamic SQL is not SQLx macro-checked, so
173    // keep the returned columns and bind order aligned with WorkflowRunRow and
174    // the workflow_runs insert list.
175    let insert_sql = format!(
176        "INSERT INTO workflow_runs (
177            workflow_type,
178            organization_id,
179            status,
180            idempotency_key,
181            metadata,
182            enqueue_request,
183            started_at
184         )
185         VALUES ($1, $2, 'RUNNING', $3, $4::jsonb, $5::jsonb, now())
186         {}
187         RETURNING
188            id,
189            workflow_type,
190            organization_id,
191            status::text AS status,
192            idempotency_key,
193            metadata,
194            NULL::boolean AS enqueue_request_matches,
195            started_at,
196            finished_at,
197            created_at,
198            updated_at",
199        enqueue_workflow_run_idempotency_conflict_clause(payload),
200    );
201    let run_row = sqlx::query_as::<_, WorkflowRunRow>(&insert_sql)
202        .bind(payload.workflow_type())
203        .bind(payload.organization_id())
204        .bind(payload.idempotency_key())
205        .bind(payload.metadata())
206        .bind(enqueue_request.as_ref())
207        .fetch_optional(&mut **tx)
208        .await
209        .map_err(|error| Error::from_query_sqlx_with_context("enqueue workflow run", error))?;
210
211    if let Some(run_row) = run_row {
212        return Ok(WorkflowRunInsertOutcome {
213            record: workflow_run_record_from_row(run_row)?,
214            inserted: true,
215        });
216    }
217
218    let (Some(idempotency_key), Some(enqueue_request)) =
219        (payload.idempotency_key(), enqueue_request.as_ref())
220    else {
221        return Err(workflow_internal_state_error(
222            "workflow run insert returned no row without an idempotency key conflict",
223        ));
224    };
225
226    let existing =
227        load_existing_idempotent_workflow_run_tx(tx, payload, idempotency_key, enqueue_request)
228            .await?;
229    validate_existing_idempotent_workflow_run(&existing)?;
230
231    Ok(WorkflowRunInsertOutcome {
232        record: workflow_run_record_from_row(existing)?,
233        inserted: false,
234    })
235}
236
237fn workflow_run_record_from_row(run_row: WorkflowRunRow) -> Result<WorkflowRunDbRecord> {
238    // The retry-match flags are only validation scratch fields for idempotent
239    // conflict resolution; they are not part of the persisted public run record.
240    Ok(WorkflowRunDbRecord {
241        id: run_row.id,
242        workflow_type: parse_workflow_type_name(run_row.workflow_type)?,
243        organization_id: run_row.organization_id,
244        status: parse_workflow_run_status(run_row.status)?,
245        idempotency_key: run_row.idempotency_key,
246        metadata: run_row.metadata,
247        started_at: run_row.started_at,
248        finished_at: run_row.finished_at,
249        created_at: run_row.created_at,
250        updated_at: run_row.updated_at,
251    })
252}
253
254async fn load_existing_idempotent_workflow_run_tx(
255    tx: &mut DbTx<'_>,
256    payload: &WorkflowRunEnqueue<'_>,
257    idempotency_key: &str,
258    enqueue_request: &JsonValue,
259) -> Result<WorkflowRunRow> {
260    // After INSERT ... ON CONFLICT reports an existing keyed run, this locks the
261    // matched committed row while the enqueue transaction compares and returns
262    // the idempotent result.
263    let run = if let Some(organization_id) = payload.organization_id() {
264        sqlx::query_as!(
265            WorkflowRunRow,
266            r#"SELECT
267                id,
268                workflow_type,
269                organization_id,
270                status::text AS "status!",
271                idempotency_key,
272                metadata,
273                enqueue_request = $4::jsonb AS "enqueue_request_matches?",
274                started_at,
275                finished_at,
276                created_at,
277                updated_at
278             FROM workflow_runs
279             WHERE workflow_type = $1
280               AND organization_id = $2
281               AND idempotency_key = $3
282             LIMIT 1
283             FOR SHARE"#,
284            payload.workflow_type() as _,
285            organization_id,
286            idempotency_key,
287            enqueue_request,
288        )
289        .fetch_optional(&mut **tx)
290        .await
291    } else {
292        sqlx::query_as!(
293            WorkflowRunRow,
294            r#"SELECT
295                id,
296                workflow_type,
297                organization_id,
298                status::text AS "status!",
299                idempotency_key,
300                metadata,
301                enqueue_request = $3::jsonb AS "enqueue_request_matches?",
302                started_at,
303                finished_at,
304                created_at,
305                updated_at
306             FROM workflow_runs
307             WHERE workflow_type = $1
308               AND organization_id IS NULL
309               AND idempotency_key = $2
310             LIMIT 1
311             FOR SHARE"#,
312            payload.workflow_type() as _,
313            idempotency_key,
314            enqueue_request,
315        )
316        .fetch_optional(&mut **tx)
317        .await
318    };
319
320    run.map_err(|error| {
321        Error::from_query_sqlx_with_context("load idempotent workflow enqueue", error)
322    })?
323    .ok_or_else(|| {
324        workflow_internal_state_error(
325            "workflow run insert conflicted but matching idempotent workflow run was not found",
326        )
327    })
328}
329
330fn enqueue_workflow_run_idempotency_conflict_clause(
331    payload: &WorkflowRunEnqueue<'_>,
332) -> &'static str {
333    // Keep these predicates aligned with the partial unique indexes
334    // uq_workflow_runs_type_idempotency_org and uq_workflow_runs_type_idempotency_global.
335    match (payload.idempotency_key(), payload.organization_id()) {
336        (Some(_), Some(_)) => {
337            "ON CONFLICT (workflow_type, organization_id, idempotency_key)
338             WHERE idempotency_key IS NOT NULL
339               AND organization_id IS NOT NULL
340             DO NOTHING"
341        }
342        (Some(_), None) => {
343            "ON CONFLICT (workflow_type, idempotency_key)
344             WHERE idempotency_key IS NOT NULL
345               AND organization_id IS NULL
346             DO NOTHING"
347        }
348        (None, _) => "",
349    }
350}
351
352fn validate_existing_idempotent_workflow_run(existing: &WorkflowRunRow) -> Result<()> {
353    match existing.enqueue_request_matches {
354        Some(true) => Ok(()),
355        Some(false) => Err(workflow_enqueue_conflicting_retry_error("request")),
356        None => Err(workflow_legacy_idempotency_snapshot_missing_error(
357            existing.workflow_type.as_str(),
358            existing.id,
359        )),
360    }
361}
362
363fn canonical_workflow_enqueue_request(payload: &WorkflowRunEnqueue<'_>) -> Result<JsonValue> {
364    let mut steps = payload
365        .steps()
366        .iter()
367        .map(|step| {
368            let mut dependencies = step
369                .dependencies()
370                .iter()
371                .map(|dependency| CanonicalWorkflowDependency {
372                    prerequisite_step_key: dependency.prerequisite_step_key.as_str(),
373                    release_mode: dependency
374                        .release_mode
375                        .unwrap_or(WorkflowDependencyReleaseMode::OnTerminal)
376                        .as_db_value(),
377                })
378                .collect::<Vec<_>>();
379            dependencies.sort_by(|left, right| {
380                left.prerequisite_step_key
381                    .cmp(right.prerequisite_step_key)
382                    .then(left.release_mode.cmp(right.release_mode))
383            });
384
385            CanonicalWorkflowStep {
386                step_key: step.step_key().as_str(),
387                execution_kind: step.execution_kind().as_db_value(),
388                job_type: step.job_type().map(|job_type| job_type.as_str()),
389                organization_id: workflow_step_effective_organization_id(
390                    payload.organization_id(),
391                    step,
392                ),
393                payload: step.payload(),
394                priority: step.priority(),
395                max_attempts: step.max_attempts(),
396                timeout_seconds: step.timeout_seconds(),
397                stage: workflow_step_effective_stage(step),
398                dependencies,
399            }
400        })
401        .collect::<Vec<_>>();
402    steps.sort_by(|left, right| left.step_key.cmp(right.step_key));
403
404    serde_json::to_value(CanonicalWorkflowRunEnqueueRequest {
405        metadata: payload.metadata(),
406        steps,
407    })
408    .map_err(|error| {
409        workflow_internal_state_error(format!(
410            "failed to serialize canonical workflow enqueue request: {error}"
411        ))
412    })
413}
414
415pub(crate) async fn enqueue_root_steps_tx(tx: &mut DbTx<'_>, workflow_run_id: Uuid) -> Result<()> {
416    let rows = sqlx::query!(
417        "SELECT
418            id,
419            workflow_run_id,
420            execution_kind::text AS \"execution_kind!\",
421            job_type,
422            organization_id,
423            payload,
424            priority,
425            max_attempts,
426            timeout_seconds,
427            stage
428         FROM workflow_steps
429         WHERE workflow_run_id = $1
430           AND status = 'BLOCKED'
431           AND dependency_count_pending = 0
432         ORDER BY created_at ASC
433         FOR UPDATE",
434        workflow_run_id,
435    )
436    .fetch_all(&mut **tx)
437    .await
438    .map_err(|error| {
439        Error::from_query_sqlx_with_context("lookup root workflow steps for enqueue", error)
440    })?;
441
442    for row in rows {
443        let candidate = StepReleaseCandidate::from_init(StepReleaseCandidateInit {
444            id: row.id,
445            workflow_run_id: row.workflow_run_id,
446            execution_kind: parse_workflow_step_execution_kind(row.execution_kind)?,
447            job_type: row.job_type.map(parse_job_type_name).transpose()?,
448            organization_id: row.organization_id,
449            payload: row.payload,
450            priority: row.priority,
451            max_attempts: row.max_attempts,
452            timeout_seconds: row.timeout_seconds,
453            stage: row.stage.map(parse_job_stage).transpose()?,
454        });
455        release_candidate_step_tx(tx, &candidate).await?;
456    }
457
458    Ok(())
459}
460
461#[cfg(test)]
462mod tests {
463    use runledger_core::jobs::{
464        JobStage, JobType, StepKey, WorkflowRunEnqueueBuilder, WorkflowStepEnqueueBuilder,
465        WorkflowType,
466    };
467    use serde_json::json;
468    use sqlx::types::Uuid;
469
470    use super::canonical_workflow_enqueue_request;
471
472    #[test]
473    fn canonical_workflow_enqueue_request_matches_golden_snapshot() {
474        let run_org = Uuid::now_v7();
475        let step_org = Uuid::now_v7();
476        let metadata = json!({"kind": "golden"});
477        let root_payload = json!({"step": "root"});
478        let child_payload = json!({"step": "child"});
479        let root = WorkflowStepEnqueueBuilder::new_external(StepKey::new("root"), &root_payload)
480            .try_build()
481            .expect("build root step");
482        let child = WorkflowStepEnqueueBuilder::new(
483            StepKey::new("child"),
484            JobType::new("jobs.test.child"),
485            &child_payload,
486        )
487        .organization_id(step_org)
488        .priority(7)
489        .max_attempts(2)
490        .timeout_seconds(45)
491        .stage(JobStage::Scheduled)
492        .depends_on_success(&[StepKey::new("root")])
493        .try_build()
494        .expect("build child step");
495        let workflow =
496            WorkflowRunEnqueueBuilder::new(WorkflowType::new("workflow.test.golden"), &metadata)
497                .organization_id(run_org)
498                .step(child)
499                .step(root)
500                .try_build()
501                .expect("build workflow");
502
503        let canonical =
504            canonical_workflow_enqueue_request(&workflow).expect("canonicalize workflow enqueue");
505
506        assert_eq!(
507            canonical,
508            json!({
509                "metadata": {"kind": "golden"},
510                "steps": [
511                    {
512                        "step_key": "child",
513                        "execution_kind": "JOB",
514                        "job_type": "jobs.test.child",
515                        "organization_id": step_org,
516                        "payload": {"step": "child"},
517                        "priority": 7,
518                        "max_attempts": 2,
519                        "timeout_seconds": 45,
520                        "stage": "scheduled",
521                        "dependencies": [
522                            {
523                                "prerequisite_step_key": "root",
524                                "release_mode": "ON_SUCCESS"
525                            }
526                        ]
527                    },
528                    {
529                        "step_key": "root",
530                        "execution_kind": "EXTERNAL",
531                        "job_type": null,
532                        "organization_id": run_org,
533                        "payload": {"step": "root"},
534                        "priority": null,
535                        "max_attempts": null,
536                        "timeout_seconds": null,
537                        "stage": null,
538                        "dependencies": []
539                    }
540                ]
541            })
542        );
543    }
544}