Skip to main content

runledger_postgres/jobs/workflows/
enqueue.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use runledger_core::jobs::WorkflowDependencyReleaseMode;
4use sqlx::types::Uuid;
5
6use crate::{DbPool, DbTx, Error, Result};
7
8use super::super::row_decode::{
9    parse_job_stage, parse_job_type_name, parse_workflow_run_status,
10    parse_workflow_step_execution_kind, parse_workflow_type_name,
11};
12use super::super::workflow_types::WorkflowRunDbRecord;
13use super::runtime::{recompute_workflow_run_statuses_tx, release_candidate_step_tx};
14use super::{
15    JobDefinitionDefaults, workflow_dag_validation_error, workflow_definition_not_available_error,
16    workflow_dependency_count_overflow_error, workflow_internal_state_error,
17};
18use runledger_core::jobs::{
19    WorkflowRunEnqueue, WorkflowStepEnqueue, WorkflowStepExecutionKind,
20    validate_workflow_run_enqueue,
21};
22
23type DefaultsByJobType = BTreeMap<String, JobDefinitionDefaults>;
24pub(crate) type WorkflowStepIdsByKey = BTreeMap<String, Uuid>;
25
26pub async fn enqueue_workflow_run(
27    pool: &DbPool,
28    payload: &WorkflowRunEnqueue<'_>,
29) -> Result<WorkflowRunDbRecord> {
30    let mut tx = pool
31        .begin()
32        .await
33        .map_err(|error| Error::ConnectionError(error.to_string()))?;
34    let workflow_run = enqueue_workflow_run_tx(&mut tx, payload).await?;
35    tx.commit()
36        .await
37        .map_err(|error| Error::ConnectionError(error.to_string()))?;
38    Ok(workflow_run)
39}
40
41pub async fn enqueue_workflow_run_tx(
42    tx: &mut DbTx<'_>,
43    payload: &WorkflowRunEnqueue<'_>,
44) -> Result<WorkflowRunDbRecord> {
45    validate_workflow_run_enqueue(payload).map_err(workflow_dag_validation_error)?;
46
47    let defaults_by_job_type = fetch_job_definition_defaults_tx(tx, payload.steps()).await?;
48    let workflow_run = insert_workflow_run_record_tx(tx, payload).await?;
49    let step_id_by_key =
50        insert_workflow_steps_tx(tx, payload, workflow_run.id, &defaults_by_job_type).await?;
51    insert_workflow_step_dependencies_tx(tx, payload, workflow_run.id, &step_id_by_key).await?;
52
53    enqueue_root_steps_tx(tx, workflow_run.id).await?;
54    recompute_workflow_run_statuses_tx(tx, &std::collections::BTreeSet::from([workflow_run.id]))
55        .await?;
56
57    load_workflow_run_by_id_tx(tx, workflow_run.id).await
58}
59
60async fn insert_workflow_run_record_tx(
61    tx: &mut DbTx<'_>,
62    payload: &WorkflowRunEnqueue<'_>,
63) -> Result<WorkflowRunDbRecord> {
64    let run_row = sqlx::query!(
65        "INSERT INTO workflow_runs (
66            workflow_type,
67            organization_id,
68            status,
69            idempotency_key,
70            metadata,
71            started_at
72         )
73         VALUES ($1, $2, 'RUNNING', $3, $4::jsonb, now())
74         RETURNING
75            id,
76            workflow_type,
77            organization_id,
78            status::text AS \"status!\",
79            idempotency_key,
80            metadata,
81            started_at,
82            finished_at,
83            created_at,
84            updated_at",
85        payload.workflow_type() as _,
86        payload.organization_id(),
87        payload.idempotency_key(),
88        payload.metadata(),
89    )
90    .fetch_one(&mut **tx)
91    .await
92    .map_err(|error| Error::from_query_sqlx_with_context("enqueue workflow run", error))?;
93
94    Ok(WorkflowRunDbRecord {
95        id: run_row.id,
96        workflow_type: parse_workflow_type_name(run_row.workflow_type)?,
97        organization_id: run_row.organization_id,
98        status: parse_workflow_run_status(run_row.status)?,
99        idempotency_key: run_row.idempotency_key,
100        metadata: run_row.metadata,
101        started_at: run_row.started_at,
102        finished_at: run_row.finished_at,
103        created_at: run_row.created_at,
104        updated_at: run_row.updated_at,
105    })
106}
107
108pub(crate) fn dependency_count_total(step: &WorkflowStepEnqueue<'_>) -> Result<i32> {
109    i32::try_from(step.dependencies().len())
110        .map_err(|_| workflow_dependency_count_overflow_error(step.step_key().as_str()))
111}
112
113pub(crate) fn workflow_step_effective_organization_id(
114    workflow_organization_id: Option<Uuid>,
115    step: &WorkflowStepEnqueue<'_>,
116) -> Option<Uuid> {
117    step.organization_id().or(workflow_organization_id)
118}
119
120pub(crate) fn workflow_step_defaults<'a>(
121    defaults_by_job_type: &'a DefaultsByJobType,
122    step: &WorkflowStepEnqueue<'_>,
123) -> Result<&'a JobDefinitionDefaults> {
124    let job_type = step
125        .job_type()
126        .ok_or_else(|| workflow_internal_state_error("job workflow step missing job_type"))?;
127
128    defaults_by_job_type
129        .get(job_type.as_str())
130        .ok_or_else(|| workflow_definition_not_available_error(job_type.as_str()))
131}
132
133pub(crate) async fn insert_workflow_step_record_tx(
134    tx: &mut DbTx<'_>,
135    workflow_run_id: Uuid,
136    organization_id: Option<Uuid>,
137    step: &WorkflowStepEnqueue<'_>,
138    defaults: Option<&JobDefinitionDefaults>,
139    dependency_count_pending: i32,
140    dependency_count_unsatisfied: i32,
141) -> Result<Uuid> {
142    let dependency_count_total = dependency_count_total(step)?;
143    let (job_type, priority, max_attempts, timeout_seconds, stage) = match step.execution_kind() {
144        WorkflowStepExecutionKind::Job => {
145            let defaults = defaults.ok_or_else(|| {
146                workflow_internal_state_error("missing job definition defaults for job step")
147            })?;
148            let job_type = step.job_type().ok_or_else(|| {
149                workflow_internal_state_error("missing job_type for job workflow step")
150            })?;
151
152            (
153                Some(job_type.as_str()),
154                Some(step.priority().unwrap_or(defaults.default_priority)),
155                Some(step.max_attempts().unwrap_or(defaults.max_attempts)),
156                Some(
157                    step.timeout_seconds()
158                        .unwrap_or(defaults.default_timeout_seconds),
159                ),
160                Some(
161                    step.stage()
162                        .unwrap_or(runledger_core::jobs::JobStage::Queued)
163                        .as_db_value(),
164                ),
165            )
166        }
167        WorkflowStepExecutionKind::External => (None, None, None, None, None),
168    };
169    let step_id: Uuid = sqlx::query_scalar!(
170        "INSERT INTO workflow_steps (
171            workflow_run_id,
172            step_key,
173            execution_kind,
174            job_type,
175            organization_id,
176            payload,
177            priority,
178            max_attempts,
179            timeout_seconds,
180            stage,
181            status,
182            dependency_count_total,
183            dependency_count_pending,
184            dependency_count_unsatisfied
185         )
186         VALUES (
187            $1,
188            $2,
189            $3::text::workflow_step_execution_kind,
190            $4,
191            $5,
192            $6::jsonb,
193            $7,
194            $8,
195            $9,
196            $10,
197            'BLOCKED',
198            $11,
199            $12,
200            $13
201         )
202         RETURNING id",
203        workflow_run_id,
204        step.step_key() as _,
205        step.execution_kind().as_db_value(),
206        job_type,
207        organization_id,
208        step.payload(),
209        priority,
210        max_attempts,
211        timeout_seconds,
212        stage,
213        dependency_count_total,
214        dependency_count_pending,
215        dependency_count_unsatisfied,
216    )
217    .fetch_one(&mut **tx)
218    .await
219    .map_err(|error| Error::from_query_sqlx_with_context("insert workflow step", error))?;
220
221    Ok(step_id)
222}
223
224pub(crate) async fn insert_workflow_steps_tx(
225    tx: &mut DbTx<'_>,
226    payload: &WorkflowRunEnqueue<'_>,
227    workflow_run_id: Uuid,
228    defaults_by_job_type: &DefaultsByJobType,
229) -> Result<WorkflowStepIdsByKey> {
230    let mut step_id_by_key = WorkflowStepIdsByKey::new();
231    for step in payload.steps() {
232        let defaults = match step.execution_kind() {
233            WorkflowStepExecutionKind::Job => {
234                Some(workflow_step_defaults(defaults_by_job_type, step)?)
235            }
236            WorkflowStepExecutionKind::External => None,
237        };
238        let step_id = insert_workflow_step_record_tx(
239            tx,
240            workflow_run_id,
241            workflow_step_effective_organization_id(payload.organization_id(), step),
242            step,
243            defaults,
244            dependency_count_total(step)?,
245            0,
246        )
247        .await?;
248        step_id_by_key.insert(step.step_key().as_str().to_owned(), step_id);
249    }
250
251    Ok(step_id_by_key)
252}
253
254pub(crate) fn step_id_for_key(
255    step_id_by_key: &WorkflowStepIdsByKey,
256    step_key: &str,
257    missing_error: &'static str,
258) -> Result<Uuid> {
259    step_id_by_key
260        .get(step_key)
261        .copied()
262        .ok_or_else(|| workflow_internal_state_error(missing_error))
263}
264
265pub(crate) async fn insert_workflow_step_dependency_record_tx(
266    tx: &mut DbTx<'_>,
267    workflow_run_id: Uuid,
268    prerequisite_step_id: Uuid,
269    dependent_step_id: Uuid,
270    release_mode: &str,
271) -> Result<()> {
272    sqlx::query!(
273        "INSERT INTO workflow_step_dependencies (
274            workflow_run_id,
275            prerequisite_step_id,
276            dependent_step_id,
277            release_mode
278         )
279         VALUES ($1, $2, $3, $4::text::workflow_dependency_release_mode)",
280        workflow_run_id,
281        prerequisite_step_id,
282        dependent_step_id,
283        release_mode,
284    )
285    .execute(&mut **tx)
286    .await
287    .map_err(|error| {
288        Error::from_query_sqlx_with_context("insert workflow step dependency", error)
289    })?;
290
291    Ok(())
292}
293
294pub(crate) async fn insert_workflow_step_dependencies_tx(
295    tx: &mut DbTx<'_>,
296    payload: &WorkflowRunEnqueue<'_>,
297    workflow_run_id: Uuid,
298    step_id_by_key: &WorkflowStepIdsByKey,
299) -> Result<()> {
300    for step in payload.steps() {
301        let dependent_step_id = step_id_for_key(
302            step_id_by_key,
303            step.step_key().as_str(),
304            "missing dependent workflow step id",
305        )?;
306        for dependency in step.dependencies() {
307            let prerequisite_step_id = step_id_for_key(
308                step_id_by_key,
309                dependency.prerequisite_step_key.as_str(),
310                "missing prerequisite workflow step id",
311            )?;
312            let release_mode = dependency
313                .release_mode
314                .unwrap_or(WorkflowDependencyReleaseMode::OnTerminal)
315                .as_db_value();
316            insert_workflow_step_dependency_record_tx(
317                tx,
318                workflow_run_id,
319                prerequisite_step_id,
320                dependent_step_id,
321                release_mode,
322            )
323            .await?;
324        }
325    }
326
327    Ok(())
328}
329
330pub(crate) async fn fetch_job_definition_defaults_tx(
331    tx: &mut DbTx<'_>,
332    steps: &[WorkflowStepEnqueue<'_>],
333) -> Result<DefaultsByJobType> {
334    let job_types: Vec<String> = steps
335        .iter()
336        .filter(|step| step.execution_kind() == WorkflowStepExecutionKind::Job)
337        .map(|step| {
338            step.job_type()
339                .map(|job_type| job_type.as_str().to_owned())
340                .ok_or_else(|| workflow_internal_state_error("job workflow step missing job_type"))
341        })
342        .collect::<Result<BTreeSet<_>>>()?
343        .into_iter()
344        .collect();
345
346    let rows = sqlx::query!(
347        "SELECT job_type, default_priority, max_attempts, default_timeout_seconds
348         FROM job_definitions
349         WHERE is_enabled = true
350           AND job_type = ANY($1::text[])",
351        &job_types,
352    )
353    .fetch_all(&mut **tx)
354    .await
355    .map_err(|error| {
356        Error::from_query_sqlx_with_context("lookup workflow step job definition defaults", error)
357    })?;
358
359    let defaults_by_job_type: DefaultsByJobType = rows
360        .into_iter()
361        .map(|row| {
362            (
363                row.job_type,
364                JobDefinitionDefaults {
365                    default_priority: row.default_priority,
366                    max_attempts: row.max_attempts,
367                    default_timeout_seconds: row.default_timeout_seconds,
368                },
369            )
370        })
371        .collect();
372
373    if let Some(step) = steps
374        .iter()
375        .filter(|step| step.execution_kind() == WorkflowStepExecutionKind::Job)
376        .find(|step| {
377            step.job_type()
378                .is_none_or(|job_type| !defaults_by_job_type.contains_key(job_type.as_str()))
379        })
380    {
381        return Err(workflow_definition_not_available_error(
382            step.job_type()
383                .map(|job_type| job_type.as_str())
384                .unwrap_or("<missing-job-type>"),
385        ));
386    }
387
388    Ok(defaults_by_job_type)
389}
390
391pub(crate) async fn enqueue_root_steps_tx(tx: &mut DbTx<'_>, workflow_run_id: Uuid) -> Result<()> {
392    let rows = sqlx::query!(
393        "SELECT
394            id,
395            workflow_run_id,
396            execution_kind::text AS \"execution_kind!\",
397            job_type,
398            organization_id,
399            payload,
400            priority,
401            max_attempts,
402            timeout_seconds,
403            stage
404         FROM workflow_steps
405         WHERE workflow_run_id = $1
406           AND status = 'BLOCKED'
407           AND dependency_count_pending = 0
408         ORDER BY created_at ASC
409         FOR UPDATE",
410        workflow_run_id,
411    )
412    .fetch_all(&mut **tx)
413    .await
414    .map_err(|error| {
415        Error::from_query_sqlx_with_context("lookup root workflow steps for enqueue", error)
416    })?;
417
418    for row in rows {
419        let candidate = super::StepReleaseCandidate {
420            id: row.id,
421            workflow_run_id: row.workflow_run_id,
422            execution_kind: parse_workflow_step_execution_kind(row.execution_kind)?,
423            job_type: row.job_type.map(parse_job_type_name).transpose()?,
424            organization_id: row.organization_id,
425            payload: row.payload,
426            priority: row.priority,
427            max_attempts: row.max_attempts,
428            timeout_seconds: row.timeout_seconds,
429            stage: row.stage.map(parse_job_stage).transpose()?,
430        };
431        release_candidate_step_tx(tx, &candidate).await?;
432    }
433
434    Ok(())
435}
436
437pub(crate) async fn load_workflow_run_by_id_tx(
438    tx: &mut DbTx<'_>,
439    workflow_run_id: Uuid,
440) -> Result<WorkflowRunDbRecord> {
441    let run_row = sqlx::query!(
442        "SELECT
443            id,
444            workflow_type,
445            organization_id,
446            status::text AS \"status!\",
447            idempotency_key,
448            metadata,
449            started_at,
450            finished_at,
451            created_at,
452            updated_at
453         FROM workflow_runs
454         WHERE id = $1",
455        workflow_run_id,
456    )
457    .fetch_one(&mut **tx)
458    .await
459    .map_err(|error| {
460        Error::from_query_sqlx_with_context("load workflow run after enqueue recompute", error)
461    })?;
462
463    Ok(WorkflowRunDbRecord {
464        id: run_row.id,
465        workflow_type: parse_workflow_type_name(run_row.workflow_type)?,
466        organization_id: run_row.organization_id,
467        status: parse_workflow_run_status(run_row.status)?,
468        idempotency_key: run_row.idempotency_key,
469        metadata: run_row.metadata,
470        started_at: run_row.started_at,
471        finished_at: run_row.finished_at,
472        created_at: run_row.created_at,
473        updated_at: run_row.updated_at,
474    })
475}