Skip to main content

runledger_postgres/jobs/workflows/mutate/
append.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use runledger_core::jobs::{
4    StepKey, WorkflowDependencyReleaseMode, WorkflowRunStatus, WorkflowStepEnqueue,
5    WorkflowStepExecutionKind, WorkflowStepStatus, validate_workflow_step_append,
6};
7use sqlx::types::Uuid;
8
9use crate::jobs::row_decode::{
10    parse_job_stage, parse_job_type_name, parse_step_key_name, parse_workflow_step_execution_kind,
11    parse_workflow_step_status,
12};
13use crate::jobs::workflow_types::{
14    AppendWorkflowStepsInput as AppendWorkflowStepsInputRecord,
15    AppendWorkflowStepsOutcome as AppendOutcome, AppendWorkflowStepsResult as AppendResult,
16    WorkflowStepDbRecord,
17};
18use crate::{DbPool, DbTx, Error, Result};
19
20use super::super::enqueue::{
21    WorkflowStepIdsByKey, dependency_count_total, fetch_job_definition_defaults_tx,
22    insert_workflow_step_dependency_record_tx, insert_workflow_step_record_tx,
23    load_workflow_run_by_id_tx, step_id_for_key, workflow_step_defaults,
24    workflow_step_effective_organization_id,
25};
26use super::super::runtime::{
27    recompute_workflow_run_statuses_tx, release_candidate_step_tx, resolve_terminal_step_queue_tx,
28};
29use super::super::{
30    StepReleaseCandidate, workflow_append_blank_mutation_key_error,
31    workflow_append_conflicting_retry_error, workflow_append_terminal_run_error,
32    workflow_append_window_missing_error, workflow_append_window_not_external_error,
33    workflow_append_window_not_open_error, workflow_dag_validation_error,
34    workflow_internal_state_error,
35};
36use super::idempotency::{
37    canonical_append_request, deserialize_stored_append_request, insert_workflow_mutation_row_tx,
38    load_existing_mutation_request_tx, stored_append_request_matches,
39};
40use super::{
41    LockedWorkflowStepState, lock_workflow_run_for_update_tx, lock_workflow_steps_for_update_tx,
42};
43
44#[derive(Debug)]
45struct AppendedStepState {
46    candidate: StepReleaseCandidate,
47    dependency_count_pending: i32,
48    dependency_count_unsatisfied: i32,
49}
50
51pub async fn append_workflow_steps(
52    pool: &DbPool,
53    input: &AppendWorkflowStepsInputRecord<'_>,
54) -> Result<AppendResult> {
55    let mut tx = pool
56        .begin()
57        .await
58        .map_err(|error| Error::ConnectionError(error.to_string()))?;
59    let result = append_workflow_steps_tx(&mut tx, input).await?;
60    tx.commit()
61        .await
62        .map_err(|error| Error::ConnectionError(error.to_string()))?;
63    Ok(result)
64}
65
66pub async fn append_workflow_steps_tx(
67    tx: &mut DbTx<'_>,
68    input: &AppendWorkflowStepsInputRecord<'_>,
69) -> Result<AppendResult> {
70    if input.mutation_key.trim().is_empty() {
71        return Err(workflow_append_blank_mutation_key_error());
72    }
73
74    let locked_steps =
75        lock_workflow_steps_for_update_tx(tx, input.workflow_run_id, input.organization_id).await?;
76    let workflow_run =
77        lock_workflow_run_for_update_tx(tx, input.workflow_run_id, input.organization_id).await?;
78    let canonical_request = canonical_append_request(
79        input.append_window_step_key,
80        workflow_run.organization_id,
81        &input.steps,
82    )?;
83    let comparable_request =
84        deserialize_stored_append_request(&canonical_request, workflow_run.organization_id)?;
85
86    if let Some(existing_request) =
87        load_existing_mutation_request_tx(tx, workflow_run.id, input.mutation_key).await?
88    {
89        if !stored_append_request_matches(
90            &existing_request,
91            workflow_run.organization_id,
92            &comparable_request,
93        )? {
94            return Err(workflow_append_conflicting_retry_error(input.mutation_key));
95        }
96
97        return load_append_result_tx(
98            tx,
99            workflow_run.id,
100            &input.steps,
101            workflow_run.organization_id,
102            AppendOutcome::AlreadyApplied,
103        )
104        .await;
105    }
106
107    ensure_append_window_is_open(&locked_steps, input.append_window_step_key)?;
108
109    if matches!(
110        workflow_run.status,
111        WorkflowRunStatus::Succeeded
112            | WorkflowRunStatus::CompletedWithErrors
113            | WorkflowRunStatus::Canceled
114    ) {
115        return Err(workflow_append_terminal_run_error(workflow_run.status));
116    }
117
118    let existing_step_keys = locked_steps
119        .iter()
120        .map(|step| step.step_key.clone())
121        .collect::<BTreeSet<_>>();
122    validate_workflow_step_append(&existing_step_keys, &input.steps)
123        .map_err(workflow_dag_validation_error)?;
124
125    let defaults_by_job_type = fetch_job_definition_defaults_tx(tx, &input.steps).await?;
126    let existing_statuses_by_key = locked_steps
127        .iter()
128        .map(|step| (step.step_key.as_str().to_owned(), step.status))
129        .collect::<BTreeMap<_, _>>();
130    let new_step_keys = input
131        .steps
132        .iter()
133        .map(|step| step.step_key().as_str().to_owned())
134        .collect::<BTreeSet<_>>();
135
136    let mut step_id_by_key = locked_steps
137        .iter()
138        .map(|step| (step.step_key.as_str().to_owned(), step.id))
139        .collect::<WorkflowStepIdsByKey>();
140    let mut appended_step_ids = Vec::with_capacity(input.steps.len());
141
142    for step in &input.steps {
143        let defaults = match step.execution_kind() {
144            WorkflowStepExecutionKind::Job => {
145                Some(workflow_step_defaults(&defaults_by_job_type, step)?)
146            }
147            WorkflowStepExecutionKind::External => None,
148        };
149        let (dependency_count_pending, dependency_count_unsatisfied) =
150            initial_dependency_counters(&existing_statuses_by_key, &new_step_keys, step)?;
151        let step_id = insert_workflow_step_record_tx(
152            tx,
153            workflow_run.id,
154            workflow_step_effective_organization_id(workflow_run.organization_id, step),
155            step,
156            defaults,
157            dependency_count_pending,
158            dependency_count_unsatisfied,
159        )
160        .await?;
161        step_id_by_key.insert(step.step_key().as_str().to_owned(), step_id);
162        appended_step_ids.push(step_id);
163    }
164
165    insert_appended_workflow_step_dependencies_tx(
166        tx,
167        workflow_run.id,
168        &input.steps,
169        &step_id_by_key,
170    )
171    .await?;
172    insert_workflow_mutation_row_tx(
173        tx,
174        workflow_run.id,
175        input.mutation_key,
176        input.mutation_metadata,
177        &canonical_request,
178    )
179    .await?;
180
181    let appended_step_states = load_appended_step_states_tx(tx, &appended_step_ids).await?;
182    let mut touched_run_ids = BTreeSet::from([workflow_run.id]);
183    for step_id in &appended_step_ids {
184        let Some(step_state) = appended_step_states.get(step_id) else {
185            return Err(workflow_internal_state_error(format!(
186                "missing appended workflow step state for step id {step_id}"
187            )));
188        };
189        if step_state.dependency_count_pending != 0 {
190            continue;
191        }
192
193        resolve_appended_step_state_tx(tx, step_state, &mut touched_run_ids).await?;
194    }
195
196    recompute_workflow_run_statuses_tx(tx, &touched_run_ids).await?;
197
198    load_append_result_tx(
199        tx,
200        workflow_run.id,
201        &input.steps,
202        workflow_run.organization_id,
203        AppendOutcome::Appended,
204    )
205    .await
206}
207
208fn ensure_append_window_is_open(
209    locked_steps: &[LockedWorkflowStepState],
210    append_window_step_key: StepKey<'_>,
211) -> Result<()> {
212    let Some(append_window) = locked_steps
213        .iter()
214        .find(|step| step.step_key.as_str() == append_window_step_key.as_str())
215    else {
216        return Err(workflow_append_window_missing_error(
217            append_window_step_key.as_str(),
218        ));
219    };
220
221    if append_window.execution_kind != WorkflowStepExecutionKind::External {
222        return Err(workflow_append_window_not_external_error(
223            append_window.step_key.as_str(),
224        ));
225    }
226
227    if append_window.status != WorkflowStepStatus::WaitingForExternal {
228        return Err(workflow_append_window_not_open_error(
229            append_window.step_key.as_str(),
230            append_window.status,
231        ));
232    }
233
234    Ok(())
235}
236
237fn initial_dependency_counters(
238    existing_statuses_by_key: &BTreeMap<String, WorkflowStepStatus>,
239    new_step_keys: &BTreeSet<String>,
240    step: &WorkflowStepEnqueue<'_>,
241) -> Result<(i32, i32)> {
242    let _ = dependency_count_total(step)?;
243    let mut dependency_count_pending = 0i32;
244    let mut dependency_count_unsatisfied = 0i32;
245
246    for dependency in step.dependencies() {
247        let release_mode = dependency
248            .release_mode
249            .unwrap_or(WorkflowDependencyReleaseMode::OnTerminal);
250        if let Some(status) =
251            existing_statuses_by_key.get(dependency.prerequisite_step_key.as_str())
252        {
253            if !status.is_terminal() {
254                dependency_count_pending += 1;
255            } else if matches!(release_mode, WorkflowDependencyReleaseMode::OnSuccess)
256                && *status != WorkflowStepStatus::Succeeded
257            {
258                dependency_count_unsatisfied += 1;
259            }
260            continue;
261        }
262
263        if new_step_keys.contains(dependency.prerequisite_step_key.as_str()) {
264            dependency_count_pending += 1;
265            continue;
266        }
267
268        return Err(workflow_internal_state_error(format!(
269            "append dependency '{}' for step '{}' was not present in the existing or new step key set",
270            dependency.prerequisite_step_key.as_str(),
271            step.step_key().as_str()
272        )));
273    }
274
275    Ok((dependency_count_pending, dependency_count_unsatisfied))
276}
277
278async fn resolve_appended_step_state_tx(
279    tx: &mut DbTx<'_>,
280    step_state: &AppendedStepState,
281    touched_run_ids: &mut BTreeSet<Uuid>,
282) -> Result<()> {
283    if step_state.dependency_count_unsatisfied == 0 {
284        return release_candidate_step_tx(tx, &step_state.candidate).await;
285    }
286
287    let canceled = sqlx::query!(
288        "UPDATE workflow_steps
289             SET status = 'CANCELED',
290                 finished_at = COALESCE(finished_at, now()),
291                 status_reason = 'workflow.dependency_unsatisfied',
292                 last_error_code = 'workflow.dependency_unsatisfied',
293                 last_error_message = 'Step dependency requirements were not satisfied.',
294                 updated_at = now()
295             WHERE id = $1
296               AND status = 'BLOCKED'
297             RETURNING workflow_run_id",
298        step_state.candidate.id,
299    )
300    .fetch_optional(&mut **tx)
301    .await
302    .map_err(|error| {
303        Error::from_query_sqlx_with_context("cancel born-unsatisfied appended workflow step", error)
304    })?;
305
306    if canceled.is_some() {
307        resolve_terminal_step_queue_tx(
308            tx,
309            step_state.candidate.id,
310            WorkflowStepStatus::Canceled,
311            touched_run_ids,
312        )
313        .await?;
314    }
315
316    Ok(())
317}
318
319async fn insert_appended_workflow_step_dependencies_tx(
320    tx: &mut DbTx<'_>,
321    workflow_run_id: Uuid,
322    steps: &[WorkflowStepEnqueue<'_>],
323    step_id_by_key: &WorkflowStepIdsByKey,
324) -> Result<()> {
325    for step in steps {
326        let dependent_step_id = step_id_for_key(
327            step_id_by_key,
328            step.step_key().as_str(),
329            "missing dependent appended workflow step id",
330        )?;
331        for dependency in step.dependencies() {
332            let prerequisite_step_id = step_id_for_key(
333                step_id_by_key,
334                dependency.prerequisite_step_key.as_str(),
335                "missing appended workflow prerequisite step id",
336            )?;
337            let release_mode = dependency
338                .release_mode
339                .unwrap_or(WorkflowDependencyReleaseMode::OnTerminal)
340                .as_db_value();
341            insert_workflow_step_dependency_record_tx(
342                tx,
343                workflow_run_id,
344                prerequisite_step_id,
345                dependent_step_id,
346                release_mode,
347            )
348            .await?;
349        }
350    }
351
352    Ok(())
353}
354
355async fn load_appended_step_states_tx(
356    tx: &mut DbTx<'_>,
357    appended_step_ids: &[Uuid],
358) -> Result<BTreeMap<Uuid, AppendedStepState>> {
359    let rows = sqlx::query!(
360        "SELECT
361            id,
362            workflow_run_id,
363            execution_kind::text AS \"execution_kind!\",
364            job_type,
365            organization_id,
366            payload,
367            priority,
368            max_attempts,
369            timeout_seconds,
370            stage,
371            dependency_count_pending,
372            dependency_count_unsatisfied
373         FROM workflow_steps
374         WHERE id = ANY($1::uuid[])
375         FOR UPDATE",
376        appended_step_ids,
377    )
378    .fetch_all(&mut **tx)
379    .await
380    .map_err(|error| {
381        Error::from_query_sqlx_with_context("load appended workflow step states", error)
382    })?;
383
384    rows.into_iter()
385        .map(|row| {
386            let execution_kind = parse_workflow_step_execution_kind(row.execution_kind)?;
387            let job_type = row.job_type.map(parse_job_type_name).transpose()?;
388            let stage = row.stage.map(parse_job_stage).transpose()?;
389            Ok((
390                row.id,
391                AppendedStepState {
392                    candidate: StepReleaseCandidate {
393                        id: row.id,
394                        workflow_run_id: row.workflow_run_id,
395                        execution_kind,
396                        job_type,
397                        organization_id: row.organization_id,
398                        payload: row.payload,
399                        priority: row.priority,
400                        max_attempts: row.max_attempts,
401                        timeout_seconds: row.timeout_seconds,
402                        stage,
403                    },
404                    dependency_count_pending: row.dependency_count_pending,
405                    dependency_count_unsatisfied: row.dependency_count_unsatisfied,
406                },
407            ))
408        })
409        .collect()
410}
411
412async fn load_workflow_steps_by_keys_tx(
413    tx: &mut DbTx<'_>,
414    workflow_run_id: Uuid,
415    input_steps: &[WorkflowStepEnqueue<'_>],
416    organization_id: Option<Uuid>,
417) -> Result<Vec<WorkflowStepDbRecord>> {
418    let step_keys = input_steps
419        .iter()
420        .map(|step| step.step_key().as_str().to_owned())
421        .collect::<Vec<_>>();
422    let rows = sqlx::query!(
423        "SELECT
424            ws.id,
425            ws.workflow_run_id,
426            ws.step_key,
427            ws.execution_kind::text AS \"execution_kind!\",
428            ws.job_type,
429            ws.organization_id,
430            ws.payload,
431            ws.priority,
432            ws.max_attempts,
433            ws.timeout_seconds,
434            ws.stage,
435            ws.status::text AS \"status!\",
436            ws.job_id,
437            ws.released_at,
438            ws.started_at,
439            ws.finished_at,
440            ws.dependency_count_total,
441            ws.dependency_count_pending,
442            ws.dependency_count_unsatisfied,
443            ws.status_reason,
444            ws.last_error_code,
445            ws.last_error_message,
446            ws.created_at,
447            ws.updated_at
448         FROM workflow_steps ws
449         JOIN workflow_runs wr ON wr.id = ws.workflow_run_id
450         WHERE ws.workflow_run_id = $1
451           AND ws.step_key = ANY($2::text[])
452           AND ($3::uuid IS NULL OR wr.organization_id = $3)",
453        workflow_run_id,
454        &step_keys,
455        organization_id,
456    )
457    .fetch_all(&mut **tx)
458    .await
459    .map_err(|error| {
460        Error::from_query_sqlx_with_context("load appended workflow steps by key", error)
461    })?;
462
463    let steps_by_key = rows
464        .into_iter()
465        .map(|row| {
466            let step_key = parse_step_key_name(row.step_key)?;
467            Ok((
468                step_key.clone(),
469                WorkflowStepDbRecord {
470                    id: row.id,
471                    workflow_run_id: row.workflow_run_id,
472                    step_key,
473                    execution_kind: parse_workflow_step_execution_kind(row.execution_kind)?,
474                    job_type: row.job_type.map(parse_job_type_name).transpose()?,
475                    organization_id: row.organization_id,
476                    payload: row.payload,
477                    priority: row.priority,
478                    max_attempts: row.max_attempts,
479                    timeout_seconds: row.timeout_seconds,
480                    stage: row.stage.map(parse_job_stage).transpose()?,
481                    status: parse_workflow_step_status(row.status)?,
482                    job_id: row.job_id,
483                    released_at: row.released_at,
484                    started_at: row.started_at,
485                    finished_at: row.finished_at,
486                    dependency_count_total: row.dependency_count_total,
487                    dependency_count_pending: row.dependency_count_pending,
488                    dependency_count_unsatisfied: row.dependency_count_unsatisfied,
489                    status_reason: row.status_reason,
490                    last_error_code: row.last_error_code,
491                    last_error_message: row.last_error_message,
492                    created_at: row.created_at,
493                    updated_at: row.updated_at,
494                },
495            ))
496        })
497        .collect::<Result<BTreeMap<_, _>>>()?;
498
499    input_steps
500        .iter()
501        .map(|step| {
502            steps_by_key
503                .get(step.step_key().as_str())
504                .cloned()
505                .ok_or_else(|| {
506                    workflow_internal_state_error(format!(
507                        "workflow append result missing step '{}'",
508                        step.step_key().as_str()
509                    ))
510                })
511        })
512        .collect()
513}
514
515async fn load_append_result_tx(
516    tx: &mut DbTx<'_>,
517    workflow_run_id: Uuid,
518    input_steps: &[WorkflowStepEnqueue<'_>],
519    organization_id: Option<Uuid>,
520    outcome: AppendOutcome,
521) -> Result<AppendResult> {
522    let appended_steps =
523        load_workflow_steps_by_keys_tx(tx, workflow_run_id, input_steps, organization_id).await?;
524
525    Ok(AppendResult {
526        workflow_run: load_workflow_run_by_id_tx(tx, workflow_run_id).await?,
527        appended_steps,
528        outcome,
529    })
530}