1use std::collections::{BTreeMap, BTreeSet};
2
3use runledger_core::jobs::{
4 StepKey, WorkflowDependencyReleaseMode, WorkflowRunStatus, WorkflowStepEnqueue,
5 WorkflowStepExecutionKind, WorkflowStepStatus, validate_workflow_step_append,
6};
7use sqlx::types::Uuid;
8
9use crate::jobs::row_decode::{
10 parse_job_stage, parse_job_type_name, parse_step_key_name, parse_workflow_step_execution_kind,
11 parse_workflow_step_status,
12};
13use crate::jobs::workflow_types::{
14 AppendWorkflowStepsInput as AppendWorkflowStepsInputRecord,
15 AppendWorkflowStepsOutcome as AppendOutcome, AppendWorkflowStepsResult as AppendResult,
16 WorkflowStepDbRecord,
17};
18use crate::{DbPool, DbTx, Error, Result};
19
20use super::super::enqueue::{
21 WorkflowStepIdsByKey, dependency_count_total, fetch_job_definition_defaults_tx,
22 insert_workflow_step_dependency_record_tx, insert_workflow_step_record_tx,
23 load_workflow_run_by_id_tx, step_id_for_key, workflow_step_defaults,
24 workflow_step_effective_organization_id,
25};
26use super::super::runtime::{
27 recompute_workflow_run_statuses_tx, release_candidate_step_tx, resolve_terminal_step_queue_tx,
28};
29use super::super::{
30 StepReleaseCandidate, workflow_append_blank_mutation_key_error,
31 workflow_append_conflicting_retry_error, workflow_append_terminal_run_error,
32 workflow_append_window_missing_error, workflow_append_window_not_external_error,
33 workflow_append_window_not_open_error, workflow_dag_validation_error,
34 workflow_internal_state_error,
35};
36use super::idempotency::{
37 canonical_append_request, deserialize_stored_append_request, insert_workflow_mutation_row_tx,
38 load_existing_mutation_request_tx, stored_append_request_matches,
39};
40use super::{
41 LockedWorkflowStepState, lock_workflow_run_for_update_tx, lock_workflow_steps_for_update_tx,
42};
43
44#[derive(Debug)]
45struct AppendedStepState {
46 candidate: StepReleaseCandidate,
47 dependency_count_pending: i32,
48 dependency_count_unsatisfied: i32,
49}
50
51pub async fn append_workflow_steps(
52 pool: &DbPool,
53 input: &AppendWorkflowStepsInputRecord<'_>,
54) -> Result<AppendResult> {
55 let mut tx = pool
56 .begin()
57 .await
58 .map_err(|error| Error::ConnectionError(error.to_string()))?;
59 let result = append_workflow_steps_tx(&mut tx, input).await?;
60 tx.commit()
61 .await
62 .map_err(|error| Error::ConnectionError(error.to_string()))?;
63 Ok(result)
64}
65
66pub async fn append_workflow_steps_tx(
67 tx: &mut DbTx<'_>,
68 input: &AppendWorkflowStepsInputRecord<'_>,
69) -> Result<AppendResult> {
70 if input.mutation_key.trim().is_empty() {
71 return Err(workflow_append_blank_mutation_key_error());
72 }
73
74 let locked_steps =
75 lock_workflow_steps_for_update_tx(tx, input.workflow_run_id, input.organization_id).await?;
76 let workflow_run =
77 lock_workflow_run_for_update_tx(tx, input.workflow_run_id, input.organization_id).await?;
78 let canonical_request = canonical_append_request(
79 input.append_window_step_key,
80 workflow_run.organization_id,
81 &input.steps,
82 )?;
83 let comparable_request =
84 deserialize_stored_append_request(&canonical_request, workflow_run.organization_id)?;
85
86 if let Some(existing_request) =
87 load_existing_mutation_request_tx(tx, workflow_run.id, input.mutation_key).await?
88 {
89 if !stored_append_request_matches(
90 &existing_request,
91 workflow_run.organization_id,
92 &comparable_request,
93 )? {
94 return Err(workflow_append_conflicting_retry_error(input.mutation_key));
95 }
96
97 return load_append_result_tx(
98 tx,
99 workflow_run.id,
100 &input.steps,
101 workflow_run.organization_id,
102 AppendOutcome::AlreadyApplied,
103 )
104 .await;
105 }
106
107 ensure_append_window_is_open(&locked_steps, input.append_window_step_key)?;
108
109 if matches!(
110 workflow_run.status,
111 WorkflowRunStatus::Succeeded
112 | WorkflowRunStatus::CompletedWithErrors
113 | WorkflowRunStatus::Canceled
114 ) {
115 return Err(workflow_append_terminal_run_error(workflow_run.status));
116 }
117
118 let existing_step_keys = locked_steps
119 .iter()
120 .map(|step| step.step_key.clone())
121 .collect::<BTreeSet<_>>();
122 validate_workflow_step_append(&existing_step_keys, &input.steps)
123 .map_err(workflow_dag_validation_error)?;
124
125 let defaults_by_job_type = fetch_job_definition_defaults_tx(tx, &input.steps).await?;
126 let existing_statuses_by_key = locked_steps
127 .iter()
128 .map(|step| (step.step_key.as_str().to_owned(), step.status))
129 .collect::<BTreeMap<_, _>>();
130 let new_step_keys = input
131 .steps
132 .iter()
133 .map(|step| step.step_key().as_str().to_owned())
134 .collect::<BTreeSet<_>>();
135
136 let mut step_id_by_key = locked_steps
137 .iter()
138 .map(|step| (step.step_key.as_str().to_owned(), step.id))
139 .collect::<WorkflowStepIdsByKey>();
140 let mut appended_step_ids = Vec::with_capacity(input.steps.len());
141
142 for step in &input.steps {
143 let defaults = match step.execution_kind() {
144 WorkflowStepExecutionKind::Job => {
145 Some(workflow_step_defaults(&defaults_by_job_type, step)?)
146 }
147 WorkflowStepExecutionKind::External => None,
148 };
149 let (dependency_count_pending, dependency_count_unsatisfied) =
150 initial_dependency_counters(&existing_statuses_by_key, &new_step_keys, step)?;
151 let step_id = insert_workflow_step_record_tx(
152 tx,
153 workflow_run.id,
154 workflow_step_effective_organization_id(workflow_run.organization_id, step),
155 step,
156 defaults,
157 dependency_count_pending,
158 dependency_count_unsatisfied,
159 )
160 .await?;
161 step_id_by_key.insert(step.step_key().as_str().to_owned(), step_id);
162 appended_step_ids.push(step_id);
163 }
164
165 insert_appended_workflow_step_dependencies_tx(
166 tx,
167 workflow_run.id,
168 &input.steps,
169 &step_id_by_key,
170 )
171 .await?;
172 insert_workflow_mutation_row_tx(
173 tx,
174 workflow_run.id,
175 input.mutation_key,
176 input.mutation_metadata,
177 &canonical_request,
178 )
179 .await?;
180
181 let appended_step_states = load_appended_step_states_tx(tx, &appended_step_ids).await?;
182 let mut touched_run_ids = BTreeSet::from([workflow_run.id]);
183 for step_id in &appended_step_ids {
184 let Some(step_state) = appended_step_states.get(step_id) else {
185 return Err(workflow_internal_state_error(format!(
186 "missing appended workflow step state for step id {step_id}"
187 )));
188 };
189 if step_state.dependency_count_pending != 0 {
190 continue;
191 }
192
193 resolve_appended_step_state_tx(tx, step_state, &mut touched_run_ids).await?;
194 }
195
196 recompute_workflow_run_statuses_tx(tx, &touched_run_ids).await?;
197
198 load_append_result_tx(
199 tx,
200 workflow_run.id,
201 &input.steps,
202 workflow_run.organization_id,
203 AppendOutcome::Appended,
204 )
205 .await
206}
207
208fn ensure_append_window_is_open(
209 locked_steps: &[LockedWorkflowStepState],
210 append_window_step_key: StepKey<'_>,
211) -> Result<()> {
212 let Some(append_window) = locked_steps
213 .iter()
214 .find(|step| step.step_key.as_str() == append_window_step_key.as_str())
215 else {
216 return Err(workflow_append_window_missing_error(
217 append_window_step_key.as_str(),
218 ));
219 };
220
221 if append_window.execution_kind != WorkflowStepExecutionKind::External {
222 return Err(workflow_append_window_not_external_error(
223 append_window.step_key.as_str(),
224 ));
225 }
226
227 if append_window.status != WorkflowStepStatus::WaitingForExternal {
228 return Err(workflow_append_window_not_open_error(
229 append_window.step_key.as_str(),
230 append_window.status,
231 ));
232 }
233
234 Ok(())
235}
236
237fn initial_dependency_counters(
238 existing_statuses_by_key: &BTreeMap<String, WorkflowStepStatus>,
239 new_step_keys: &BTreeSet<String>,
240 step: &WorkflowStepEnqueue<'_>,
241) -> Result<(i32, i32)> {
242 let _ = dependency_count_total(step)?;
243 let mut dependency_count_pending = 0i32;
244 let mut dependency_count_unsatisfied = 0i32;
245
246 for dependency in step.dependencies() {
247 let release_mode = dependency
248 .release_mode
249 .unwrap_or(WorkflowDependencyReleaseMode::OnTerminal);
250 if let Some(status) =
251 existing_statuses_by_key.get(dependency.prerequisite_step_key.as_str())
252 {
253 if !status.is_terminal() {
254 dependency_count_pending += 1;
255 } else if matches!(release_mode, WorkflowDependencyReleaseMode::OnSuccess)
256 && *status != WorkflowStepStatus::Succeeded
257 {
258 dependency_count_unsatisfied += 1;
259 }
260 continue;
261 }
262
263 if new_step_keys.contains(dependency.prerequisite_step_key.as_str()) {
264 dependency_count_pending += 1;
265 continue;
266 }
267
268 return Err(workflow_internal_state_error(format!(
269 "append dependency '{}' for step '{}' was not present in the existing or new step key set",
270 dependency.prerequisite_step_key.as_str(),
271 step.step_key().as_str()
272 )));
273 }
274
275 Ok((dependency_count_pending, dependency_count_unsatisfied))
276}
277
278async fn resolve_appended_step_state_tx(
279 tx: &mut DbTx<'_>,
280 step_state: &AppendedStepState,
281 touched_run_ids: &mut BTreeSet<Uuid>,
282) -> Result<()> {
283 if step_state.dependency_count_unsatisfied == 0 {
284 return release_candidate_step_tx(tx, &step_state.candidate).await;
285 }
286
287 let canceled = sqlx::query!(
288 "UPDATE workflow_steps
289 SET status = 'CANCELED',
290 finished_at = COALESCE(finished_at, now()),
291 status_reason = 'workflow.dependency_unsatisfied',
292 last_error_code = 'workflow.dependency_unsatisfied',
293 last_error_message = 'Step dependency requirements were not satisfied.',
294 updated_at = now()
295 WHERE id = $1
296 AND status = 'BLOCKED'
297 RETURNING workflow_run_id",
298 step_state.candidate.id,
299 )
300 .fetch_optional(&mut **tx)
301 .await
302 .map_err(|error| {
303 Error::from_query_sqlx_with_context("cancel born-unsatisfied appended workflow step", error)
304 })?;
305
306 if canceled.is_some() {
307 resolve_terminal_step_queue_tx(
308 tx,
309 step_state.candidate.id,
310 WorkflowStepStatus::Canceled,
311 touched_run_ids,
312 )
313 .await?;
314 }
315
316 Ok(())
317}
318
319async fn insert_appended_workflow_step_dependencies_tx(
320 tx: &mut DbTx<'_>,
321 workflow_run_id: Uuid,
322 steps: &[WorkflowStepEnqueue<'_>],
323 step_id_by_key: &WorkflowStepIdsByKey,
324) -> Result<()> {
325 for step in steps {
326 let dependent_step_id = step_id_for_key(
327 step_id_by_key,
328 step.step_key().as_str(),
329 "missing dependent appended workflow step id",
330 )?;
331 for dependency in step.dependencies() {
332 let prerequisite_step_id = step_id_for_key(
333 step_id_by_key,
334 dependency.prerequisite_step_key.as_str(),
335 "missing appended workflow prerequisite step id",
336 )?;
337 let release_mode = dependency
338 .release_mode
339 .unwrap_or(WorkflowDependencyReleaseMode::OnTerminal)
340 .as_db_value();
341 insert_workflow_step_dependency_record_tx(
342 tx,
343 workflow_run_id,
344 prerequisite_step_id,
345 dependent_step_id,
346 release_mode,
347 )
348 .await?;
349 }
350 }
351
352 Ok(())
353}
354
355async fn load_appended_step_states_tx(
356 tx: &mut DbTx<'_>,
357 appended_step_ids: &[Uuid],
358) -> Result<BTreeMap<Uuid, AppendedStepState>> {
359 let rows = sqlx::query!(
360 "SELECT
361 id,
362 workflow_run_id,
363 execution_kind::text AS \"execution_kind!\",
364 job_type,
365 organization_id,
366 payload,
367 priority,
368 max_attempts,
369 timeout_seconds,
370 stage,
371 dependency_count_pending,
372 dependency_count_unsatisfied
373 FROM workflow_steps
374 WHERE id = ANY($1::uuid[])
375 FOR UPDATE",
376 appended_step_ids,
377 )
378 .fetch_all(&mut **tx)
379 .await
380 .map_err(|error| {
381 Error::from_query_sqlx_with_context("load appended workflow step states", error)
382 })?;
383
384 rows.into_iter()
385 .map(|row| {
386 let execution_kind = parse_workflow_step_execution_kind(row.execution_kind)?;
387 let job_type = row.job_type.map(parse_job_type_name).transpose()?;
388 let stage = row.stage.map(parse_job_stage).transpose()?;
389 Ok((
390 row.id,
391 AppendedStepState {
392 candidate: StepReleaseCandidate {
393 id: row.id,
394 workflow_run_id: row.workflow_run_id,
395 execution_kind,
396 job_type,
397 organization_id: row.organization_id,
398 payload: row.payload,
399 priority: row.priority,
400 max_attempts: row.max_attempts,
401 timeout_seconds: row.timeout_seconds,
402 stage,
403 },
404 dependency_count_pending: row.dependency_count_pending,
405 dependency_count_unsatisfied: row.dependency_count_unsatisfied,
406 },
407 ))
408 })
409 .collect()
410}
411
412async fn load_workflow_steps_by_keys_tx(
413 tx: &mut DbTx<'_>,
414 workflow_run_id: Uuid,
415 input_steps: &[WorkflowStepEnqueue<'_>],
416 organization_id: Option<Uuid>,
417) -> Result<Vec<WorkflowStepDbRecord>> {
418 let step_keys = input_steps
419 .iter()
420 .map(|step| step.step_key().as_str().to_owned())
421 .collect::<Vec<_>>();
422 let rows = sqlx::query!(
423 "SELECT
424 ws.id,
425 ws.workflow_run_id,
426 ws.step_key,
427 ws.execution_kind::text AS \"execution_kind!\",
428 ws.job_type,
429 ws.organization_id,
430 ws.payload,
431 ws.priority,
432 ws.max_attempts,
433 ws.timeout_seconds,
434 ws.stage,
435 ws.status::text AS \"status!\",
436 ws.job_id,
437 ws.released_at,
438 ws.started_at,
439 ws.finished_at,
440 ws.dependency_count_total,
441 ws.dependency_count_pending,
442 ws.dependency_count_unsatisfied,
443 ws.status_reason,
444 ws.last_error_code,
445 ws.last_error_message,
446 ws.created_at,
447 ws.updated_at
448 FROM workflow_steps ws
449 JOIN workflow_runs wr ON wr.id = ws.workflow_run_id
450 WHERE ws.workflow_run_id = $1
451 AND ws.step_key = ANY($2::text[])
452 AND ($3::uuid IS NULL OR wr.organization_id = $3)",
453 workflow_run_id,
454 &step_keys,
455 organization_id,
456 )
457 .fetch_all(&mut **tx)
458 .await
459 .map_err(|error| {
460 Error::from_query_sqlx_with_context("load appended workflow steps by key", error)
461 })?;
462
463 let steps_by_key = rows
464 .into_iter()
465 .map(|row| {
466 let step_key = parse_step_key_name(row.step_key)?;
467 Ok((
468 step_key.clone(),
469 WorkflowStepDbRecord {
470 id: row.id,
471 workflow_run_id: row.workflow_run_id,
472 step_key,
473 execution_kind: parse_workflow_step_execution_kind(row.execution_kind)?,
474 job_type: row.job_type.map(parse_job_type_name).transpose()?,
475 organization_id: row.organization_id,
476 payload: row.payload,
477 priority: row.priority,
478 max_attempts: row.max_attempts,
479 timeout_seconds: row.timeout_seconds,
480 stage: row.stage.map(parse_job_stage).transpose()?,
481 status: parse_workflow_step_status(row.status)?,
482 job_id: row.job_id,
483 released_at: row.released_at,
484 started_at: row.started_at,
485 finished_at: row.finished_at,
486 dependency_count_total: row.dependency_count_total,
487 dependency_count_pending: row.dependency_count_pending,
488 dependency_count_unsatisfied: row.dependency_count_unsatisfied,
489 status_reason: row.status_reason,
490 last_error_code: row.last_error_code,
491 last_error_message: row.last_error_message,
492 created_at: row.created_at,
493 updated_at: row.updated_at,
494 },
495 ))
496 })
497 .collect::<Result<BTreeMap<_, _>>>()?;
498
499 input_steps
500 .iter()
501 .map(|step| {
502 steps_by_key
503 .get(step.step_key().as_str())
504 .cloned()
505 .ok_or_else(|| {
506 workflow_internal_state_error(format!(
507 "workflow append result missing step '{}'",
508 step.step_key().as_str()
509 ))
510 })
511 })
512 .collect()
513}
514
515async fn load_append_result_tx(
516 tx: &mut DbTx<'_>,
517 workflow_run_id: Uuid,
518 input_steps: &[WorkflowStepEnqueue<'_>],
519 organization_id: Option<Uuid>,
520 outcome: AppendOutcome,
521) -> Result<AppendResult> {
522 let appended_steps =
523 load_workflow_steps_by_keys_tx(tx, workflow_run_id, input_steps, organization_id).await?;
524
525 Ok(AppendResult {
526 workflow_run: load_workflow_run_by_id_tx(tx, workflow_run_id).await?,
527 appended_steps,
528 outcome,
529 })
530}