1use std::collections::{BTreeMap, BTreeSet};
2
3use runledger_core::jobs::{
4 StepKey, WorkflowDependencyReleaseMode, WorkflowRunStatus, WorkflowStepEnqueue,
5 WorkflowStepExecutionKind, WorkflowStepStatus, validate_workflow_step_append,
6};
7use sqlx::types::Uuid;
8
9use crate::jobs::row_decode::{
10 parse_job_stage, parse_job_type_name, parse_step_key_name, parse_workflow_step_execution_kind,
11 parse_workflow_step_status,
12};
13use crate::jobs::transaction_isolation::ensure_read_committed_tx;
14use crate::jobs::workflow_types::{
15 AppendWorkflowStepsInput as AppendWorkflowStepsInputRecord,
16 AppendWorkflowStepsOutcome as AppendOutcome, AppendWorkflowStepsResult as AppendResult,
17 WorkflowStepDbRecord,
18};
19use crate::{DbPool, DbTx, Error, Result};
20
21use super::super::errors::{
22 workflow_append_blank_mutation_key_error, workflow_append_conflicting_retry_error,
23 workflow_append_terminal_run_error, workflow_append_window_missing_error,
24 workflow_append_window_not_external_error, workflow_append_window_not_open_error,
25 workflow_internal_state_error, workflow_release_conflict_error,
26};
27use super::super::locking::{
28 LockedWorkflowStepState, lock_workflow_run_for_update_tx, lock_workflow_steps_for_update_tx,
29 try_lock_workflow_run_release_shared_tx,
30};
31use super::super::read::load_workflow_run_by_id_tx;
32use super::super::release::{
33 StepReleaseCandidate, StepReleaseCandidateInit, release_candidate_step_tx,
34};
35use super::super::runtime::{recompute_workflow_run_statuses_tx, resolve_terminal_step_queue_tx};
36use super::super::steps::{
37 WorkflowStepIdsByKey, dependency_count_total, fetch_job_definition_defaults_tx,
38 insert_workflow_step_dependency_record_tx, insert_workflow_step_record_tx, step_id_for_key,
39 workflow_step_defaults, workflow_step_effective_organization_id,
40};
41use super::super::validation::workflow_dag_validation_error;
42use super::idempotency::{
43 canonical_append_request, deserialize_stored_append_request, insert_workflow_mutation_row_tx,
44 load_existing_mutation_request_tx, stored_append_request_matches_tx,
45};
46
47#[derive(Debug)]
48struct AppendedStepState {
49 candidate: StepReleaseCandidate,
50 dependency_count_pending: i32,
51 dependency_count_unsatisfied: i32,
52}
53
54pub async fn append_workflow_steps(
55 pool: &DbPool,
56 input: &AppendWorkflowStepsInputRecord<'_>,
57) -> Result<AppendResult> {
58 let mut tx = pool
59 .begin()
60 .await
61 .map_err(|error| Error::ConnectionError(error.to_string()))?;
62 let result = append_workflow_steps_tx(&mut tx, input).await?;
63 tx.commit()
64 .await
65 .map_err(|error| Error::ConnectionError(error.to_string()))?;
66 Ok(result)
67}
68
69pub async fn append_workflow_steps_tx(
70 tx: &mut DbTx<'_>,
71 input: &AppendWorkflowStepsInputRecord<'_>,
72) -> Result<AppendResult> {
73 if input.mutation_key.trim().is_empty() {
74 return Err(workflow_append_blank_mutation_key_error());
75 }
76 ensure_read_committed_tx(
77 tx,
78 "workflow append mutation",
79 "workflow.append_unsupported_isolation",
80 "Workflow append mutation requires READ COMMITTED transaction isolation.",
81 )
82 .await?;
83
84 let locked_steps =
85 lock_workflow_steps_for_update_tx(tx, input.workflow_run_id, input.organization_id).await?;
86 let workflow_run =
87 lock_workflow_run_for_update_tx(tx, input.workflow_run_id, input.organization_id).await?;
88 let canonical_request = canonical_append_request(
89 input.append_window_step_key,
90 workflow_run.organization_id,
91 &input.steps,
92 )?;
93 let comparable_request =
94 deserialize_stored_append_request(&canonical_request, workflow_run.organization_id)?;
95
96 if let Some(existing_request) =
97 load_existing_mutation_request_tx(tx, workflow_run.id, input.mutation_key).await?
98 {
99 if !stored_append_request_matches_tx(
100 tx,
101 &existing_request,
102 workflow_run.organization_id,
103 &comparable_request,
104 )
105 .await?
106 {
107 return Err(workflow_append_conflicting_retry_error(input.mutation_key));
108 }
109
110 return load_append_result_tx(
111 tx,
112 workflow_run.id,
113 &input.steps,
114 workflow_run.organization_id,
115 AppendOutcome::AlreadyApplied,
116 )
117 .await;
118 }
119
120 ensure_append_window_is_open(&locked_steps, input.append_window_step_key)?;
121
122 if matches!(
123 workflow_run.status,
124 WorkflowRunStatus::Succeeded
125 | WorkflowRunStatus::CompletedWithErrors
126 | WorkflowRunStatus::Canceled
127 ) {
128 return Err(workflow_append_terminal_run_error(workflow_run.status));
129 }
130
131 let existing_step_keys = locked_steps
132 .iter()
133 .map(|step| step.step_key.clone())
134 .collect::<BTreeSet<_>>();
135 validate_workflow_step_append(&existing_step_keys, &input.steps)
136 .map_err(workflow_dag_validation_error)?;
137
138 if !try_lock_workflow_run_release_shared_tx(tx, workflow_run.id).await? {
139 return Err(workflow_release_conflict_error(workflow_run.id));
140 }
141
142 let defaults_by_job_type = fetch_job_definition_defaults_tx(tx, &input.steps).await?;
143 let existing_statuses_by_key = locked_steps
144 .iter()
145 .map(|step| (step.step_key.as_str().to_owned(), step.status))
146 .collect::<BTreeMap<_, _>>();
147 let new_step_keys = input
148 .steps
149 .iter()
150 .map(|step| step.step_key().as_str().to_owned())
151 .collect::<BTreeSet<_>>();
152
153 let mut step_id_by_key = locked_steps
154 .iter()
155 .map(|step| (step.step_key.as_str().to_owned(), step.id))
156 .collect::<WorkflowStepIdsByKey>();
157 let mut appended_step_ids = Vec::with_capacity(input.steps.len());
158
159 for step in &input.steps {
160 let defaults = match step.execution_kind() {
161 WorkflowStepExecutionKind::Job => {
162 Some(workflow_step_defaults(&defaults_by_job_type, step)?)
163 }
164 WorkflowStepExecutionKind::External => None,
165 };
166 let (dependency_count_pending, dependency_count_unsatisfied) =
167 initial_dependency_counters(&existing_statuses_by_key, &new_step_keys, step)?;
168 let step_id = insert_workflow_step_record_tx(
169 tx,
170 workflow_run.id,
171 workflow_step_effective_organization_id(workflow_run.organization_id, step),
172 step,
173 defaults,
174 dependency_count_pending,
175 dependency_count_unsatisfied,
176 )
177 .await?;
178 step_id_by_key.insert(step.step_key().as_str().to_owned(), step_id);
179 appended_step_ids.push(step_id);
180 }
181
182 insert_appended_workflow_step_dependencies_tx(
183 tx,
184 workflow_run.id,
185 &input.steps,
186 &step_id_by_key,
187 )
188 .await?;
189 insert_workflow_mutation_row_tx(
190 tx,
191 workflow_run.id,
192 input.mutation_key,
193 input.mutation_metadata,
194 &canonical_request,
195 )
196 .await?;
197
198 let appended_step_states = load_appended_step_states_tx(tx, &appended_step_ids).await?;
199 let mut touched_run_ids = BTreeSet::from([workflow_run.id]);
200 for step_id in &appended_step_ids {
201 let Some(step_state) = appended_step_states.get(step_id) else {
202 return Err(workflow_internal_state_error(format!(
203 "missing appended workflow step state for step id {step_id}"
204 )));
205 };
206 if step_state.dependency_count_pending != 0 {
207 continue;
208 }
209
210 resolve_appended_step_state_tx(tx, step_state, &mut touched_run_ids).await?;
211 }
212
213 recompute_workflow_run_statuses_tx(tx, &touched_run_ids).await?;
214
215 load_append_result_tx(
216 tx,
217 workflow_run.id,
218 &input.steps,
219 workflow_run.organization_id,
220 AppendOutcome::Appended,
221 )
222 .await
223}
224
225fn ensure_append_window_is_open(
226 locked_steps: &[LockedWorkflowStepState],
227 append_window_step_key: StepKey<'_>,
228) -> Result<()> {
229 let Some(append_window) = locked_steps
230 .iter()
231 .find(|step| step.step_key.as_str() == append_window_step_key.as_str())
232 else {
233 return Err(workflow_append_window_missing_error(
234 append_window_step_key.as_str(),
235 ));
236 };
237
238 if append_window.execution_kind != WorkflowStepExecutionKind::External {
239 return Err(workflow_append_window_not_external_error(
240 append_window.step_key.as_str(),
241 ));
242 }
243
244 if append_window.status != WorkflowStepStatus::WaitingForExternal {
245 return Err(workflow_append_window_not_open_error(
246 append_window.step_key.as_str(),
247 append_window.status,
248 ));
249 }
250
251 Ok(())
252}
253
254fn initial_dependency_counters(
255 existing_statuses_by_key: &BTreeMap<String, WorkflowStepStatus>,
256 new_step_keys: &BTreeSet<String>,
257 step: &WorkflowStepEnqueue<'_>,
258) -> Result<(i32, i32)> {
259 let _ = dependency_count_total(step)?;
260 let mut dependency_count_pending = 0i32;
261 let mut dependency_count_unsatisfied = 0i32;
262
263 for dependency in step.dependencies() {
264 let release_mode = dependency
265 .release_mode
266 .unwrap_or(WorkflowDependencyReleaseMode::OnTerminal);
267 if let Some(status) =
268 existing_statuses_by_key.get(dependency.prerequisite_step_key.as_str())
269 {
270 if !status.is_terminal() {
271 dependency_count_pending += 1;
272 } else if matches!(release_mode, WorkflowDependencyReleaseMode::OnSuccess)
273 && *status != WorkflowStepStatus::Succeeded
274 {
275 dependency_count_unsatisfied += 1;
276 }
277 continue;
278 }
279
280 if new_step_keys.contains(dependency.prerequisite_step_key.as_str()) {
281 dependency_count_pending += 1;
282 continue;
283 }
284
285 return Err(workflow_internal_state_error(format!(
286 "append dependency '{}' for step '{}' was not present in the existing or new step key set",
287 dependency.prerequisite_step_key.as_str(),
288 step.step_key().as_str()
289 )));
290 }
291
292 Ok((dependency_count_pending, dependency_count_unsatisfied))
293}
294
295async fn resolve_appended_step_state_tx(
296 tx: &mut DbTx<'_>,
297 step_state: &AppendedStepState,
298 touched_run_ids: &mut BTreeSet<Uuid>,
299) -> Result<()> {
300 if step_state.dependency_count_unsatisfied == 0 {
301 return release_candidate_step_tx(tx, &step_state.candidate).await;
302 }
303
304 let canceled = sqlx::query!(
305 "UPDATE workflow_steps
306 SET status = 'CANCELED',
307 finished_at = COALESCE(finished_at, now()),
308 status_reason = 'workflow.dependency_unsatisfied',
309 last_error_code = 'workflow.dependency_unsatisfied',
310 last_error_message = 'Step dependency requirements were not satisfied.',
311 updated_at = now()
312 WHERE id = $1
313 AND status = 'BLOCKED'
314 RETURNING workflow_run_id",
315 step_state.candidate.id(),
316 )
317 .fetch_optional(&mut **tx)
318 .await
319 .map_err(|error| {
320 Error::from_query_sqlx_with_context("cancel born-unsatisfied appended workflow step", error)
321 })?;
322
323 if canceled.is_some() {
324 resolve_terminal_step_queue_tx(
325 tx,
326 step_state.candidate.id(),
327 WorkflowStepStatus::Canceled,
328 touched_run_ids,
329 )
330 .await?;
331 }
332
333 Ok(())
334}
335
336async fn insert_appended_workflow_step_dependencies_tx(
337 tx: &mut DbTx<'_>,
338 workflow_run_id: Uuid,
339 steps: &[WorkflowStepEnqueue<'_>],
340 step_id_by_key: &WorkflowStepIdsByKey,
341) -> Result<()> {
342 for step in steps {
343 let dependent_step_id = step_id_for_key(
344 step_id_by_key,
345 step.step_key().as_str(),
346 "missing dependent appended workflow step id",
347 )?;
348 for dependency in step.dependencies() {
349 let prerequisite_step_id = step_id_for_key(
350 step_id_by_key,
351 dependency.prerequisite_step_key.as_str(),
352 "missing appended workflow prerequisite step id",
353 )?;
354 let release_mode = dependency
355 .release_mode
356 .unwrap_or(WorkflowDependencyReleaseMode::OnTerminal)
357 .as_db_value();
358 insert_workflow_step_dependency_record_tx(
359 tx,
360 workflow_run_id,
361 prerequisite_step_id,
362 dependent_step_id,
363 release_mode,
364 )
365 .await?;
366 }
367 }
368
369 Ok(())
370}
371
372async fn load_appended_step_states_tx(
373 tx: &mut DbTx<'_>,
374 appended_step_ids: &[Uuid],
375) -> Result<BTreeMap<Uuid, AppendedStepState>> {
376 let rows = sqlx::query!(
377 "SELECT
378 id,
379 workflow_run_id,
380 execution_kind::text AS \"execution_kind!\",
381 job_type,
382 organization_id,
383 payload,
384 priority,
385 max_attempts,
386 timeout_seconds,
387 stage,
388 dependency_count_pending,
389 dependency_count_unsatisfied
390 FROM workflow_steps
391 WHERE id = ANY($1::uuid[])
392 FOR UPDATE",
393 appended_step_ids,
394 )
395 .fetch_all(&mut **tx)
396 .await
397 .map_err(|error| {
398 Error::from_query_sqlx_with_context("load appended workflow step states", error)
399 })?;
400
401 rows.into_iter()
402 .map(|row| {
403 let execution_kind = parse_workflow_step_execution_kind(row.execution_kind)?;
404 let job_type = row.job_type.map(parse_job_type_name).transpose()?;
405 let stage = row.stage.map(parse_job_stage).transpose()?;
406 Ok((
407 row.id,
408 AppendedStepState {
409 candidate: StepReleaseCandidate::from_init(StepReleaseCandidateInit {
410 id: row.id,
411 workflow_run_id: row.workflow_run_id,
412 execution_kind,
413 job_type,
414 organization_id: row.organization_id,
415 payload: row.payload,
416 priority: row.priority,
417 max_attempts: row.max_attempts,
418 timeout_seconds: row.timeout_seconds,
419 stage,
420 }),
421 dependency_count_pending: row.dependency_count_pending,
422 dependency_count_unsatisfied: row.dependency_count_unsatisfied,
423 },
424 ))
425 })
426 .collect()
427}
428
429async fn load_workflow_steps_by_keys_tx(
430 tx: &mut DbTx<'_>,
431 workflow_run_id: Uuid,
432 input_steps: &[WorkflowStepEnqueue<'_>],
433 organization_id: Option<Uuid>,
434) -> Result<Vec<WorkflowStepDbRecord>> {
435 let step_keys = input_steps
436 .iter()
437 .map(|step| step.step_key().as_str().to_owned())
438 .collect::<Vec<_>>();
439 let rows = sqlx::query!(
440 "SELECT
441 ws.id,
442 ws.workflow_run_id,
443 ws.step_key,
444 ws.execution_kind::text AS \"execution_kind!\",
445 ws.job_type,
446 ws.organization_id,
447 ws.payload,
448 ws.priority,
449 ws.max_attempts,
450 ws.timeout_seconds,
451 ws.stage,
452 ws.status::text AS \"status!\",
453 ws.job_id,
454 ws.released_at,
455 ws.started_at,
456 ws.finished_at,
457 ws.dependency_count_total,
458 ws.dependency_count_pending,
459 ws.dependency_count_unsatisfied,
460 ws.status_reason,
461 ws.last_error_code,
462 ws.last_error_message,
463 ws.created_at,
464 ws.updated_at
465 FROM workflow_steps ws
466 JOIN workflow_runs wr ON wr.id = ws.workflow_run_id
467 WHERE ws.workflow_run_id = $1
468 AND ws.step_key = ANY($2::text[])
469 AND ($3::uuid IS NULL OR wr.organization_id = $3)",
470 workflow_run_id,
471 &step_keys,
472 organization_id,
473 )
474 .fetch_all(&mut **tx)
475 .await
476 .map_err(|error| {
477 Error::from_query_sqlx_with_context("load appended workflow steps by key", error)
478 })?;
479
480 let steps_by_key = rows
481 .into_iter()
482 .map(|row| {
483 let step_key = parse_step_key_name(row.step_key)?;
484 Ok((
485 step_key.clone(),
486 WorkflowStepDbRecord {
487 id: row.id,
488 workflow_run_id: row.workflow_run_id,
489 step_key,
490 execution_kind: parse_workflow_step_execution_kind(row.execution_kind)?,
491 job_type: row.job_type.map(parse_job_type_name).transpose()?,
492 organization_id: row.organization_id,
493 payload: row.payload,
494 priority: row.priority,
495 max_attempts: row.max_attempts,
496 timeout_seconds: row.timeout_seconds,
497 stage: row.stage.map(parse_job_stage).transpose()?,
498 status: parse_workflow_step_status(row.status)?,
499 job_id: row.job_id,
500 released_at: row.released_at,
501 started_at: row.started_at,
502 finished_at: row.finished_at,
503 dependency_count_total: row.dependency_count_total,
504 dependency_count_pending: row.dependency_count_pending,
505 dependency_count_unsatisfied: row.dependency_count_unsatisfied,
506 status_reason: row.status_reason,
507 last_error_code: row.last_error_code,
508 last_error_message: row.last_error_message,
509 created_at: row.created_at,
510 updated_at: row.updated_at,
511 },
512 ))
513 })
514 .collect::<Result<BTreeMap<_, _>>>()?;
515
516 input_steps
517 .iter()
518 .map(|step| {
519 steps_by_key
520 .get(step.step_key().as_str())
521 .cloned()
522 .ok_or_else(|| {
523 workflow_internal_state_error(format!(
524 "workflow append result missing step '{}'",
525 step.step_key().as_str()
526 ))
527 })
528 })
529 .collect()
530}
531
532async fn load_append_result_tx(
533 tx: &mut DbTx<'_>,
534 workflow_run_id: Uuid,
535 input_steps: &[WorkflowStepEnqueue<'_>],
536 organization_id: Option<Uuid>,
537 outcome: AppendOutcome,
538) -> Result<AppendResult> {
539 let appended_steps =
540 load_workflow_steps_by_keys_tx(tx, workflow_run_id, input_steps, organization_id).await?;
541
542 Ok(AppendResult {
543 workflow_run: load_workflow_run_by_id_tx(
544 tx,
545 workflow_run_id,
546 "load workflow run after append",
547 )
548 .await?,
549 appended_steps,
550 outcome,
551 })
552}