Skip to main content

runledger_postgres/jobs/workflows/mutate/
append.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use runledger_core::jobs::{
4    StepKey, WorkflowDependencyReleaseMode, WorkflowRunStatus, WorkflowStepEnqueue,
5    WorkflowStepExecutionKind, WorkflowStepStatus, validate_workflow_step_append,
6};
7use sqlx::types::Uuid;
8
9use crate::jobs::row_decode::{
10    parse_job_stage, parse_job_type_name, parse_step_key_name, parse_workflow_step_execution_kind,
11    parse_workflow_step_status,
12};
13use crate::jobs::transaction_isolation::ensure_read_committed_tx;
14use crate::jobs::workflow_types::{
15    AppendWorkflowStepsInput as AppendWorkflowStepsInputRecord,
16    AppendWorkflowStepsOutcome as AppendOutcome, AppendWorkflowStepsResult as AppendResult,
17    WorkflowStepDbRecord,
18};
19use crate::{DbPool, DbTx, Error, Result};
20
21use super::super::errors::{
22    workflow_append_blank_mutation_key_error, workflow_append_conflicting_retry_error,
23    workflow_append_terminal_run_error, workflow_append_window_missing_error,
24    workflow_append_window_not_external_error, workflow_append_window_not_open_error,
25    workflow_internal_state_error, workflow_release_conflict_error,
26};
27use super::super::locking::{
28    LockedWorkflowStepState, lock_workflow_run_for_update_tx, lock_workflow_steps_for_update_tx,
29    try_lock_workflow_run_release_shared_tx,
30};
31use super::super::read::load_workflow_run_by_id_tx;
32use super::super::release::{
33    StepReleaseCandidate, StepReleaseCandidateInit, release_candidate_step_tx,
34};
35use super::super::runtime::{recompute_workflow_run_statuses_tx, resolve_terminal_step_queue_tx};
36use super::super::steps::{
37    WorkflowStepIdsByKey, dependency_count_total, fetch_job_definition_defaults_tx,
38    insert_workflow_step_dependency_record_tx, insert_workflow_step_record_tx, step_id_for_key,
39    workflow_step_defaults, workflow_step_effective_organization_id,
40};
41use super::super::validation::workflow_dag_validation_error;
42use super::idempotency::{
43    canonical_append_request, deserialize_stored_append_request, insert_workflow_mutation_row_tx,
44    load_existing_mutation_request_tx, stored_append_request_matches_tx,
45};
46
47#[derive(Debug)]
48struct AppendedStepState {
49    candidate: StepReleaseCandidate,
50    dependency_count_pending: i32,
51    dependency_count_unsatisfied: i32,
52}
53
54pub async fn append_workflow_steps(
55    pool: &DbPool,
56    input: &AppendWorkflowStepsInputRecord<'_>,
57) -> Result<AppendResult> {
58    let mut tx = pool
59        .begin()
60        .await
61        .map_err(|error| Error::ConnectionError(error.to_string()))?;
62    let result = append_workflow_steps_tx(&mut tx, input).await?;
63    tx.commit()
64        .await
65        .map_err(|error| Error::ConnectionError(error.to_string()))?;
66    Ok(result)
67}
68
69pub async fn append_workflow_steps_tx(
70    tx: &mut DbTx<'_>,
71    input: &AppendWorkflowStepsInputRecord<'_>,
72) -> Result<AppendResult> {
73    if input.mutation_key.trim().is_empty() {
74        return Err(workflow_append_blank_mutation_key_error());
75    }
76    ensure_read_committed_tx(
77        tx,
78        "workflow append mutation",
79        "workflow.append_unsupported_isolation",
80        "Workflow append mutation requires READ COMMITTED transaction isolation.",
81    )
82    .await?;
83
84    let locked_steps =
85        lock_workflow_steps_for_update_tx(tx, input.workflow_run_id, input.organization_id).await?;
86    let workflow_run =
87        lock_workflow_run_for_update_tx(tx, input.workflow_run_id, input.organization_id).await?;
88    let canonical_request = canonical_append_request(
89        input.append_window_step_key,
90        workflow_run.organization_id,
91        &input.steps,
92    )?;
93    let comparable_request =
94        deserialize_stored_append_request(&canonical_request, workflow_run.organization_id)?;
95
96    if let Some(existing_request) =
97        load_existing_mutation_request_tx(tx, workflow_run.id, input.mutation_key).await?
98    {
99        if !stored_append_request_matches_tx(
100            tx,
101            &existing_request,
102            workflow_run.organization_id,
103            &comparable_request,
104        )
105        .await?
106        {
107            return Err(workflow_append_conflicting_retry_error(input.mutation_key));
108        }
109
110        return load_append_result_tx(
111            tx,
112            workflow_run.id,
113            &input.steps,
114            workflow_run.organization_id,
115            AppendOutcome::AlreadyApplied,
116        )
117        .await;
118    }
119
120    ensure_append_window_is_open(&locked_steps, input.append_window_step_key)?;
121
122    if matches!(
123        workflow_run.status,
124        WorkflowRunStatus::Succeeded
125            | WorkflowRunStatus::CompletedWithErrors
126            | WorkflowRunStatus::Canceled
127    ) {
128        return Err(workflow_append_terminal_run_error(workflow_run.status));
129    }
130
131    let existing_step_keys = locked_steps
132        .iter()
133        .map(|step| step.step_key.clone())
134        .collect::<BTreeSet<_>>();
135    validate_workflow_step_append(&existing_step_keys, &input.steps)
136        .map_err(workflow_dag_validation_error)?;
137
138    if !try_lock_workflow_run_release_shared_tx(tx, workflow_run.id).await? {
139        return Err(workflow_release_conflict_error(workflow_run.id));
140    }
141
142    let defaults_by_job_type = fetch_job_definition_defaults_tx(tx, &input.steps).await?;
143    let existing_statuses_by_key = locked_steps
144        .iter()
145        .map(|step| (step.step_key.as_str().to_owned(), step.status))
146        .collect::<BTreeMap<_, _>>();
147    let new_step_keys = input
148        .steps
149        .iter()
150        .map(|step| step.step_key().as_str().to_owned())
151        .collect::<BTreeSet<_>>();
152
153    let mut step_id_by_key = locked_steps
154        .iter()
155        .map(|step| (step.step_key.as_str().to_owned(), step.id))
156        .collect::<WorkflowStepIdsByKey>();
157    let mut appended_step_ids = Vec::with_capacity(input.steps.len());
158
159    for step in &input.steps {
160        let defaults = match step.execution_kind() {
161            WorkflowStepExecutionKind::Job => {
162                Some(workflow_step_defaults(&defaults_by_job_type, step)?)
163            }
164            WorkflowStepExecutionKind::External => None,
165        };
166        let (dependency_count_pending, dependency_count_unsatisfied) =
167            initial_dependency_counters(&existing_statuses_by_key, &new_step_keys, step)?;
168        let step_id = insert_workflow_step_record_tx(
169            tx,
170            workflow_run.id,
171            workflow_step_effective_organization_id(workflow_run.organization_id, step),
172            step,
173            defaults,
174            dependency_count_pending,
175            dependency_count_unsatisfied,
176        )
177        .await?;
178        step_id_by_key.insert(step.step_key().as_str().to_owned(), step_id);
179        appended_step_ids.push(step_id);
180    }
181
182    insert_appended_workflow_step_dependencies_tx(
183        tx,
184        workflow_run.id,
185        &input.steps,
186        &step_id_by_key,
187    )
188    .await?;
189    insert_workflow_mutation_row_tx(
190        tx,
191        workflow_run.id,
192        input.mutation_key,
193        input.mutation_metadata,
194        &canonical_request,
195    )
196    .await?;
197
198    let appended_step_states = load_appended_step_states_tx(tx, &appended_step_ids).await?;
199    let mut touched_run_ids = BTreeSet::from([workflow_run.id]);
200    for step_id in &appended_step_ids {
201        let Some(step_state) = appended_step_states.get(step_id) else {
202            return Err(workflow_internal_state_error(format!(
203                "missing appended workflow step state for step id {step_id}"
204            )));
205        };
206        if step_state.dependency_count_pending != 0 {
207            continue;
208        }
209
210        resolve_appended_step_state_tx(tx, step_state, &mut touched_run_ids).await?;
211    }
212
213    recompute_workflow_run_statuses_tx(tx, &touched_run_ids).await?;
214
215    load_append_result_tx(
216        tx,
217        workflow_run.id,
218        &input.steps,
219        workflow_run.organization_id,
220        AppendOutcome::Appended,
221    )
222    .await
223}
224
225fn ensure_append_window_is_open(
226    locked_steps: &[LockedWorkflowStepState],
227    append_window_step_key: StepKey<'_>,
228) -> Result<()> {
229    let Some(append_window) = locked_steps
230        .iter()
231        .find(|step| step.step_key.as_str() == append_window_step_key.as_str())
232    else {
233        return Err(workflow_append_window_missing_error(
234            append_window_step_key.as_str(),
235        ));
236    };
237
238    if append_window.execution_kind != WorkflowStepExecutionKind::External {
239        return Err(workflow_append_window_not_external_error(
240            append_window.step_key.as_str(),
241        ));
242    }
243
244    if append_window.status != WorkflowStepStatus::WaitingForExternal {
245        return Err(workflow_append_window_not_open_error(
246            append_window.step_key.as_str(),
247            append_window.status,
248        ));
249    }
250
251    Ok(())
252}
253
254fn initial_dependency_counters(
255    existing_statuses_by_key: &BTreeMap<String, WorkflowStepStatus>,
256    new_step_keys: &BTreeSet<String>,
257    step: &WorkflowStepEnqueue<'_>,
258) -> Result<(i32, i32)> {
259    let _ = dependency_count_total(step)?;
260    let mut dependency_count_pending = 0i32;
261    let mut dependency_count_unsatisfied = 0i32;
262
263    for dependency in step.dependencies() {
264        let release_mode = dependency
265            .release_mode
266            .unwrap_or(WorkflowDependencyReleaseMode::OnTerminal);
267        if let Some(status) =
268            existing_statuses_by_key.get(dependency.prerequisite_step_key.as_str())
269        {
270            if !status.is_terminal() {
271                dependency_count_pending += 1;
272            } else if matches!(release_mode, WorkflowDependencyReleaseMode::OnSuccess)
273                && *status != WorkflowStepStatus::Succeeded
274            {
275                dependency_count_unsatisfied += 1;
276            }
277            continue;
278        }
279
280        if new_step_keys.contains(dependency.prerequisite_step_key.as_str()) {
281            dependency_count_pending += 1;
282            continue;
283        }
284
285        return Err(workflow_internal_state_error(format!(
286            "append dependency '{}' for step '{}' was not present in the existing or new step key set",
287            dependency.prerequisite_step_key.as_str(),
288            step.step_key().as_str()
289        )));
290    }
291
292    Ok((dependency_count_pending, dependency_count_unsatisfied))
293}
294
295async fn resolve_appended_step_state_tx(
296    tx: &mut DbTx<'_>,
297    step_state: &AppendedStepState,
298    touched_run_ids: &mut BTreeSet<Uuid>,
299) -> Result<()> {
300    if step_state.dependency_count_unsatisfied == 0 {
301        return release_candidate_step_tx(tx, &step_state.candidate).await;
302    }
303
304    let canceled = sqlx::query!(
305        "UPDATE workflow_steps
306             SET status = 'CANCELED',
307                 finished_at = COALESCE(finished_at, now()),
308                 status_reason = 'workflow.dependency_unsatisfied',
309                 last_error_code = 'workflow.dependency_unsatisfied',
310                 last_error_message = 'Step dependency requirements were not satisfied.',
311                 updated_at = now()
312             WHERE id = $1
313               AND status = 'BLOCKED'
314             RETURNING workflow_run_id",
315        step_state.candidate.id(),
316    )
317    .fetch_optional(&mut **tx)
318    .await
319    .map_err(|error| {
320        Error::from_query_sqlx_with_context("cancel born-unsatisfied appended workflow step", error)
321    })?;
322
323    if canceled.is_some() {
324        resolve_terminal_step_queue_tx(
325            tx,
326            step_state.candidate.id(),
327            WorkflowStepStatus::Canceled,
328            touched_run_ids,
329        )
330        .await?;
331    }
332
333    Ok(())
334}
335
336async fn insert_appended_workflow_step_dependencies_tx(
337    tx: &mut DbTx<'_>,
338    workflow_run_id: Uuid,
339    steps: &[WorkflowStepEnqueue<'_>],
340    step_id_by_key: &WorkflowStepIdsByKey,
341) -> Result<()> {
342    for step in steps {
343        let dependent_step_id = step_id_for_key(
344            step_id_by_key,
345            step.step_key().as_str(),
346            "missing dependent appended workflow step id",
347        )?;
348        for dependency in step.dependencies() {
349            let prerequisite_step_id = step_id_for_key(
350                step_id_by_key,
351                dependency.prerequisite_step_key.as_str(),
352                "missing appended workflow prerequisite step id",
353            )?;
354            let release_mode = dependency
355                .release_mode
356                .unwrap_or(WorkflowDependencyReleaseMode::OnTerminal)
357                .as_db_value();
358            insert_workflow_step_dependency_record_tx(
359                tx,
360                workflow_run_id,
361                prerequisite_step_id,
362                dependent_step_id,
363                release_mode,
364            )
365            .await?;
366        }
367    }
368
369    Ok(())
370}
371
372async fn load_appended_step_states_tx(
373    tx: &mut DbTx<'_>,
374    appended_step_ids: &[Uuid],
375) -> Result<BTreeMap<Uuid, AppendedStepState>> {
376    let rows = sqlx::query!(
377        "SELECT
378            id,
379            workflow_run_id,
380            execution_kind::text AS \"execution_kind!\",
381            job_type,
382            organization_id,
383            payload,
384            priority,
385            max_attempts,
386            timeout_seconds,
387            stage,
388            dependency_count_pending,
389            dependency_count_unsatisfied
390         FROM workflow_steps
391         WHERE id = ANY($1::uuid[])
392         FOR UPDATE",
393        appended_step_ids,
394    )
395    .fetch_all(&mut **tx)
396    .await
397    .map_err(|error| {
398        Error::from_query_sqlx_with_context("load appended workflow step states", error)
399    })?;
400
401    rows.into_iter()
402        .map(|row| {
403            let execution_kind = parse_workflow_step_execution_kind(row.execution_kind)?;
404            let job_type = row.job_type.map(parse_job_type_name).transpose()?;
405            let stage = row.stage.map(parse_job_stage).transpose()?;
406            Ok((
407                row.id,
408                AppendedStepState {
409                    candidate: StepReleaseCandidate::from_init(StepReleaseCandidateInit {
410                        id: row.id,
411                        workflow_run_id: row.workflow_run_id,
412                        execution_kind,
413                        job_type,
414                        organization_id: row.organization_id,
415                        payload: row.payload,
416                        priority: row.priority,
417                        max_attempts: row.max_attempts,
418                        timeout_seconds: row.timeout_seconds,
419                        stage,
420                    }),
421                    dependency_count_pending: row.dependency_count_pending,
422                    dependency_count_unsatisfied: row.dependency_count_unsatisfied,
423                },
424            ))
425        })
426        .collect()
427}
428
429async fn load_workflow_steps_by_keys_tx(
430    tx: &mut DbTx<'_>,
431    workflow_run_id: Uuid,
432    input_steps: &[WorkflowStepEnqueue<'_>],
433    organization_id: Option<Uuid>,
434) -> Result<Vec<WorkflowStepDbRecord>> {
435    let step_keys = input_steps
436        .iter()
437        .map(|step| step.step_key().as_str().to_owned())
438        .collect::<Vec<_>>();
439    let rows = sqlx::query!(
440        "SELECT
441            ws.id,
442            ws.workflow_run_id,
443            ws.step_key,
444            ws.execution_kind::text AS \"execution_kind!\",
445            ws.job_type,
446            ws.organization_id,
447            ws.payload,
448            ws.priority,
449            ws.max_attempts,
450            ws.timeout_seconds,
451            ws.stage,
452            ws.status::text AS \"status!\",
453            ws.job_id,
454            ws.released_at,
455            ws.started_at,
456            ws.finished_at,
457            ws.dependency_count_total,
458            ws.dependency_count_pending,
459            ws.dependency_count_unsatisfied,
460            ws.status_reason,
461            ws.last_error_code,
462            ws.last_error_message,
463            ws.created_at,
464            ws.updated_at
465         FROM workflow_steps ws
466         JOIN workflow_runs wr ON wr.id = ws.workflow_run_id
467         WHERE ws.workflow_run_id = $1
468           AND ws.step_key = ANY($2::text[])
469           AND ($3::uuid IS NULL OR wr.organization_id = $3)",
470        workflow_run_id,
471        &step_keys,
472        organization_id,
473    )
474    .fetch_all(&mut **tx)
475    .await
476    .map_err(|error| {
477        Error::from_query_sqlx_with_context("load appended workflow steps by key", error)
478    })?;
479
480    let steps_by_key = rows
481        .into_iter()
482        .map(|row| {
483            let step_key = parse_step_key_name(row.step_key)?;
484            Ok((
485                step_key.clone(),
486                WorkflowStepDbRecord {
487                    id: row.id,
488                    workflow_run_id: row.workflow_run_id,
489                    step_key,
490                    execution_kind: parse_workflow_step_execution_kind(row.execution_kind)?,
491                    job_type: row.job_type.map(parse_job_type_name).transpose()?,
492                    organization_id: row.organization_id,
493                    payload: row.payload,
494                    priority: row.priority,
495                    max_attempts: row.max_attempts,
496                    timeout_seconds: row.timeout_seconds,
497                    stage: row.stage.map(parse_job_stage).transpose()?,
498                    status: parse_workflow_step_status(row.status)?,
499                    job_id: row.job_id,
500                    released_at: row.released_at,
501                    started_at: row.started_at,
502                    finished_at: row.finished_at,
503                    dependency_count_total: row.dependency_count_total,
504                    dependency_count_pending: row.dependency_count_pending,
505                    dependency_count_unsatisfied: row.dependency_count_unsatisfied,
506                    status_reason: row.status_reason,
507                    last_error_code: row.last_error_code,
508                    last_error_message: row.last_error_message,
509                    created_at: row.created_at,
510                    updated_at: row.updated_at,
511                },
512            ))
513        })
514        .collect::<Result<BTreeMap<_, _>>>()?;
515
516    input_steps
517        .iter()
518        .map(|step| {
519            steps_by_key
520                .get(step.step_key().as_str())
521                .cloned()
522                .ok_or_else(|| {
523                    workflow_internal_state_error(format!(
524                        "workflow append result missing step '{}'",
525                        step.step_key().as_str()
526                    ))
527                })
528        })
529        .collect()
530}
531
532async fn load_append_result_tx(
533    tx: &mut DbTx<'_>,
534    workflow_run_id: Uuid,
535    input_steps: &[WorkflowStepEnqueue<'_>],
536    organization_id: Option<Uuid>,
537    outcome: AppendOutcome,
538) -> Result<AppendResult> {
539    let appended_steps =
540        load_workflow_steps_by_keys_tx(tx, workflow_run_id, input_steps, organization_id).await?;
541
542    Ok(AppendResult {
543        workflow_run: load_workflow_run_by_id_tx(
544            tx,
545            workflow_run_id,
546            "load workflow run after append",
547        )
548        .await?,
549        appended_steps,
550        outcome,
551    })
552}