1use runledger_core::jobs::{
2 WorkflowDependencyReleaseMode, WorkflowRunEnqueue, validate_workflow_run_enqueue,
3};
4use serde::Serialize;
5use serde_json::Value as JsonValue;
6use sqlx::types::Uuid;
7
8use crate::jobs::transaction_isolation::ensure_read_committed_tx;
9use crate::{DbPool, DbTx, Error, Result};
10
11use super::super::row_decode::{
12 parse_job_stage, parse_job_type_name, parse_workflow_run_status,
13 parse_workflow_step_execution_kind, parse_workflow_type_name,
14};
15use super::super::workflow_types::WorkflowRunDbRecord;
16use super::errors::{
17 workflow_enqueue_conflicting_retry_error, workflow_internal_state_error,
18 workflow_legacy_idempotency_snapshot_missing_error,
19};
20use super::read::load_workflow_run_by_id_tx;
21use super::release::{StepReleaseCandidate, StepReleaseCandidateInit, release_candidate_step_tx};
22use super::runtime::recompute_workflow_run_statuses_tx;
23use super::steps::{
24 fetch_job_definition_defaults_tx, insert_workflow_step_dependencies_tx,
25 insert_workflow_steps_tx, workflow_step_effective_organization_id,
26 workflow_step_effective_stage,
27};
28use super::validation::workflow_dag_validation_error;
29
30struct WorkflowRunInsertOutcome {
31 record: WorkflowRunDbRecord,
32 inserted: bool,
33}
34
35#[derive(sqlx::FromRow)]
36struct WorkflowRunRow {
37 id: Uuid,
38 workflow_type: String,
39 organization_id: Option<Uuid>,
40 status: String,
41 idempotency_key: Option<String>,
42 metadata: JsonValue,
43 enqueue_request_matches: Option<bool>,
44 started_at: chrono::DateTime<chrono::Utc>,
45 finished_at: Option<chrono::DateTime<chrono::Utc>>,
46 created_at: chrono::DateTime<chrono::Utc>,
47 updated_at: chrono::DateTime<chrono::Utc>,
48}
49
50#[derive(Serialize)]
51struct CanonicalWorkflowRunEnqueueRequest<'a> {
52 metadata: &'a JsonValue,
53 steps: Vec<CanonicalWorkflowStep<'a>>,
54}
55
56#[derive(Serialize)]
57struct CanonicalWorkflowStep<'a> {
58 step_key: &'a str,
59 execution_kind: &'static str,
60 job_type: Option<&'a str>,
61 organization_id: Option<Uuid>,
62 payload: &'a JsonValue,
63 priority: Option<i32>,
64 max_attempts: Option<i32>,
65 timeout_seconds: Option<i32>,
66 stage: Option<&'static str>,
67 dependencies: Vec<CanonicalWorkflowDependency<'a>>,
68}
69
70#[derive(Serialize)]
71struct CanonicalWorkflowDependency<'a> {
72 prerequisite_step_key: &'a str,
73 release_mode: &'static str,
74}
75
76pub async fn enqueue_workflow_run(
83 pool: &DbPool,
84 payload: &WorkflowRunEnqueue<'_>,
85) -> Result<WorkflowRunDbRecord> {
86 let mut tx = pool
87 .begin()
88 .await
89 .map_err(|error| Error::ConnectionError(error.to_string()))?;
90 let workflow_run = enqueue_workflow_run_tx(&mut tx, payload).await?;
91 tx.commit()
92 .await
93 .map_err(|error| Error::ConnectionError(error.to_string()))?;
94 Ok(workflow_run)
95}
96
97pub async fn enqueue_workflow_run_tx(
109 tx: &mut DbTx<'_>,
110 payload: &WorkflowRunEnqueue<'_>,
111) -> Result<WorkflowRunDbRecord> {
112 validate_workflow_run_enqueue(payload).map_err(workflow_dag_validation_error)?;
113 if payload.idempotency_key().is_some() {
114 ensure_read_committed_tx(
115 tx,
116 "workflow idempotent enqueue",
117 "workflow.enqueue_idempotency_unsupported_isolation",
118 "Workflow idempotent enqueue requires READ COMMITTED transaction isolation.",
119 )
120 .await?;
121 }
122
123 let workflow_run_insert = insert_workflow_run_record_tx(tx, payload).await?;
124 let workflow_run = workflow_run_insert.record;
125 if !workflow_run_insert.inserted {
126 return Ok(workflow_run);
129 }
130
131 let defaults_by_job_type = fetch_job_definition_defaults_tx(tx, payload.steps()).await?;
132 let step_id_by_key =
133 insert_workflow_steps_tx(tx, payload, workflow_run.id, &defaults_by_job_type).await?;
134 insert_workflow_step_dependencies_tx(tx, payload, workflow_run.id, &step_id_by_key).await?;
135
136 enqueue_root_steps_tx(tx, workflow_run.id).await?;
137 recompute_workflow_run_statuses_tx(tx, &std::collections::BTreeSet::from([workflow_run.id]))
138 .await?;
139
140 load_workflow_run_by_id_tx(
141 tx,
142 workflow_run.id,
143 "load workflow run after enqueue recompute",
144 )
145 .await
146}
147
148async fn insert_workflow_run_record_tx(
149 tx: &mut DbTx<'_>,
150 payload: &WorkflowRunEnqueue<'_>,
151) -> Result<WorkflowRunInsertOutcome> {
152 let enqueue_request = payload
153 .idempotency_key()
154 .map(|_| canonical_workflow_enqueue_request(payload))
155 .transpose()?;
156 let insert_sql = format!(
161 "INSERT INTO workflow_runs (
162 workflow_type,
163 organization_id,
164 status,
165 idempotency_key,
166 metadata,
167 enqueue_request,
168 started_at
169 )
170 VALUES ($1, $2, 'RUNNING', $3, $4::jsonb, $5::jsonb, now())
171 {}
172 RETURNING
173 id,
174 workflow_type,
175 organization_id,
176 status::text AS status,
177 idempotency_key,
178 metadata,
179 NULL::boolean AS enqueue_request_matches,
180 started_at,
181 finished_at,
182 created_at,
183 updated_at",
184 enqueue_workflow_run_idempotency_conflict_clause(payload),
185 );
186 let run_row = sqlx::query_as::<_, WorkflowRunRow>(&insert_sql)
187 .bind(payload.workflow_type())
188 .bind(payload.organization_id())
189 .bind(payload.idempotency_key())
190 .bind(payload.metadata())
191 .bind(enqueue_request.as_ref())
192 .fetch_optional(&mut **tx)
193 .await
194 .map_err(|error| Error::from_query_sqlx_with_context("enqueue workflow run", error))?;
195
196 if let Some(run_row) = run_row {
197 return Ok(WorkflowRunInsertOutcome {
198 record: workflow_run_record_from_row(run_row)?,
199 inserted: true,
200 });
201 }
202
203 let (Some(idempotency_key), Some(enqueue_request)) =
204 (payload.idempotency_key(), enqueue_request.as_ref())
205 else {
206 return Err(workflow_internal_state_error(
207 "workflow run insert returned no row without an idempotency key conflict",
208 ));
209 };
210
211 let existing =
212 load_existing_idempotent_workflow_run_tx(tx, payload, idempotency_key, enqueue_request)
213 .await?;
214 validate_existing_idempotent_workflow_run(&existing)?;
215
216 Ok(WorkflowRunInsertOutcome {
217 record: workflow_run_record_from_row(existing)?,
218 inserted: false,
219 })
220}
221
222fn workflow_run_record_from_row(run_row: WorkflowRunRow) -> Result<WorkflowRunDbRecord> {
223 Ok(WorkflowRunDbRecord {
226 id: run_row.id,
227 workflow_type: parse_workflow_type_name(run_row.workflow_type)?,
228 organization_id: run_row.organization_id,
229 status: parse_workflow_run_status(run_row.status)?,
230 idempotency_key: run_row.idempotency_key,
231 metadata: run_row.metadata,
232 started_at: run_row.started_at,
233 finished_at: run_row.finished_at,
234 created_at: run_row.created_at,
235 updated_at: run_row.updated_at,
236 })
237}
238
239async fn load_existing_idempotent_workflow_run_tx(
240 tx: &mut DbTx<'_>,
241 payload: &WorkflowRunEnqueue<'_>,
242 idempotency_key: &str,
243 enqueue_request: &JsonValue,
244) -> Result<WorkflowRunRow> {
245 let run = if let Some(organization_id) = payload.organization_id() {
249 sqlx::query_as!(
250 WorkflowRunRow,
251 r#"SELECT
252 id,
253 workflow_type,
254 organization_id,
255 status::text AS "status!",
256 idempotency_key,
257 metadata,
258 enqueue_request = $4::jsonb AS "enqueue_request_matches?",
259 started_at,
260 finished_at,
261 created_at,
262 updated_at
263 FROM workflow_runs
264 WHERE workflow_type = $1
265 AND organization_id = $2
266 AND idempotency_key = $3
267 LIMIT 1
268 FOR SHARE"#,
269 payload.workflow_type() as _,
270 organization_id,
271 idempotency_key,
272 enqueue_request,
273 )
274 .fetch_optional(&mut **tx)
275 .await
276 } else {
277 sqlx::query_as!(
278 WorkflowRunRow,
279 r#"SELECT
280 id,
281 workflow_type,
282 organization_id,
283 status::text AS "status!",
284 idempotency_key,
285 metadata,
286 enqueue_request = $3::jsonb AS "enqueue_request_matches?",
287 started_at,
288 finished_at,
289 created_at,
290 updated_at
291 FROM workflow_runs
292 WHERE workflow_type = $1
293 AND organization_id IS NULL
294 AND idempotency_key = $2
295 LIMIT 1
296 FOR SHARE"#,
297 payload.workflow_type() as _,
298 idempotency_key,
299 enqueue_request,
300 )
301 .fetch_optional(&mut **tx)
302 .await
303 };
304
305 run.map_err(|error| {
306 Error::from_query_sqlx_with_context("load idempotent workflow enqueue", error)
307 })?
308 .ok_or_else(|| {
309 workflow_internal_state_error(
310 "workflow run insert conflicted but matching idempotent workflow run was not found",
311 )
312 })
313}
314
315fn enqueue_workflow_run_idempotency_conflict_clause(
316 payload: &WorkflowRunEnqueue<'_>,
317) -> &'static str {
318 match (payload.idempotency_key(), payload.organization_id()) {
321 (Some(_), Some(_)) => {
322 "ON CONFLICT (workflow_type, organization_id, idempotency_key)
323 WHERE idempotency_key IS NOT NULL
324 AND organization_id IS NOT NULL
325 DO NOTHING"
326 }
327 (Some(_), None) => {
328 "ON CONFLICT (workflow_type, idempotency_key)
329 WHERE idempotency_key IS NOT NULL
330 AND organization_id IS NULL
331 DO NOTHING"
332 }
333 (None, _) => "",
334 }
335}
336
337fn validate_existing_idempotent_workflow_run(existing: &WorkflowRunRow) -> Result<()> {
338 match existing.enqueue_request_matches {
339 Some(true) => Ok(()),
340 Some(false) => Err(workflow_enqueue_conflicting_retry_error("request")),
341 None => Err(workflow_legacy_idempotency_snapshot_missing_error(
342 existing.workflow_type.as_str(),
343 existing.id,
344 )),
345 }
346}
347
348fn canonical_workflow_enqueue_request(payload: &WorkflowRunEnqueue<'_>) -> Result<JsonValue> {
349 let mut steps = payload
350 .steps()
351 .iter()
352 .map(|step| {
353 let mut dependencies = step
354 .dependencies()
355 .iter()
356 .map(|dependency| CanonicalWorkflowDependency {
357 prerequisite_step_key: dependency.prerequisite_step_key.as_str(),
358 release_mode: dependency
359 .release_mode
360 .unwrap_or(WorkflowDependencyReleaseMode::OnTerminal)
361 .as_db_value(),
362 })
363 .collect::<Vec<_>>();
364 dependencies.sort_by(|left, right| {
365 left.prerequisite_step_key
366 .cmp(right.prerequisite_step_key)
367 .then(left.release_mode.cmp(right.release_mode))
368 });
369
370 CanonicalWorkflowStep {
371 step_key: step.step_key().as_str(),
372 execution_kind: step.execution_kind().as_db_value(),
373 job_type: step.job_type().map(|job_type| job_type.as_str()),
374 organization_id: workflow_step_effective_organization_id(
375 payload.organization_id(),
376 step,
377 ),
378 payload: step.payload(),
379 priority: step.priority(),
380 max_attempts: step.max_attempts(),
381 timeout_seconds: step.timeout_seconds(),
382 stage: workflow_step_effective_stage(step),
383 dependencies,
384 }
385 })
386 .collect::<Vec<_>>();
387 steps.sort_by(|left, right| left.step_key.cmp(right.step_key));
388
389 serde_json::to_value(CanonicalWorkflowRunEnqueueRequest {
390 metadata: payload.metadata(),
391 steps,
392 })
393 .map_err(|error| {
394 workflow_internal_state_error(format!(
395 "failed to serialize canonical workflow enqueue request: {error}"
396 ))
397 })
398}
399
400pub(crate) async fn enqueue_root_steps_tx(tx: &mut DbTx<'_>, workflow_run_id: Uuid) -> Result<()> {
401 let rows = sqlx::query!(
402 "SELECT
403 id,
404 workflow_run_id,
405 execution_kind::text AS \"execution_kind!\",
406 job_type,
407 organization_id,
408 payload,
409 priority,
410 max_attempts,
411 timeout_seconds,
412 stage
413 FROM workflow_steps
414 WHERE workflow_run_id = $1
415 AND status = 'BLOCKED'
416 AND dependency_count_pending = 0
417 ORDER BY created_at ASC
418 FOR UPDATE",
419 workflow_run_id,
420 )
421 .fetch_all(&mut **tx)
422 .await
423 .map_err(|error| {
424 Error::from_query_sqlx_with_context("lookup root workflow steps for enqueue", error)
425 })?;
426
427 for row in rows {
428 let candidate = StepReleaseCandidate::from_init(StepReleaseCandidateInit {
429 id: row.id,
430 workflow_run_id: row.workflow_run_id,
431 execution_kind: parse_workflow_step_execution_kind(row.execution_kind)?,
432 job_type: row.job_type.map(parse_job_type_name).transpose()?,
433 organization_id: row.organization_id,
434 payload: row.payload,
435 priority: row.priority,
436 max_attempts: row.max_attempts,
437 timeout_seconds: row.timeout_seconds,
438 stage: row.stage.map(parse_job_stage).transpose()?,
439 });
440 release_candidate_step_tx(tx, &candidate).await?;
441 }
442
443 Ok(())
444}
445
446#[cfg(test)]
447mod tests {
448 use runledger_core::jobs::{
449 JobStage, JobType, StepKey, WorkflowRunEnqueueBuilder, WorkflowStepEnqueueBuilder,
450 WorkflowType,
451 };
452 use serde_json::json;
453 use sqlx::types::Uuid;
454
455 use super::canonical_workflow_enqueue_request;
456
457 #[test]
458 fn canonical_workflow_enqueue_request_matches_golden_snapshot() {
459 let run_org = Uuid::now_v7();
460 let step_org = Uuid::now_v7();
461 let metadata = json!({"kind": "golden"});
462 let root_payload = json!({"step": "root"});
463 let child_payload = json!({"step": "child"});
464 let root = WorkflowStepEnqueueBuilder::new_external(StepKey::new("root"), &root_payload)
465 .try_build()
466 .expect("build root step");
467 let child = WorkflowStepEnqueueBuilder::new(
468 StepKey::new("child"),
469 JobType::new("jobs.test.child"),
470 &child_payload,
471 )
472 .organization_id(step_org)
473 .priority(7)
474 .max_attempts(2)
475 .timeout_seconds(45)
476 .stage(JobStage::Scheduled)
477 .depends_on_success(&[StepKey::new("root")])
478 .try_build()
479 .expect("build child step");
480 let workflow =
481 WorkflowRunEnqueueBuilder::new(WorkflowType::new("workflow.test.golden"), &metadata)
482 .organization_id(run_org)
483 .step(child)
484 .step(root)
485 .try_build()
486 .expect("build workflow");
487
488 let canonical =
489 canonical_workflow_enqueue_request(&workflow).expect("canonicalize workflow enqueue");
490
491 assert_eq!(
492 canonical,
493 json!({
494 "metadata": {"kind": "golden"},
495 "steps": [
496 {
497 "step_key": "child",
498 "execution_kind": "JOB",
499 "job_type": "jobs.test.child",
500 "organization_id": step_org,
501 "payload": {"step": "child"},
502 "priority": 7,
503 "max_attempts": 2,
504 "timeout_seconds": 45,
505 "stage": "scheduled",
506 "dependencies": [
507 {
508 "prerequisite_step_key": "root",
509 "release_mode": "ON_SUCCESS"
510 }
511 ]
512 },
513 {
514 "step_key": "root",
515 "execution_kind": "EXTERNAL",
516 "job_type": null,
517 "organization_id": run_org,
518 "payload": {"step": "root"},
519 "priority": null,
520 "max_attempts": null,
521 "timeout_seconds": null,
522 "stage": null,
523 "dependencies": []
524 }
525 ]
526 })
527 );
528 }
529}