1use runledger_core::jobs::{
2 WorkflowDependencyReleaseMode, WorkflowRunEnqueue, validate_workflow_run_enqueue,
3};
4use serde::Serialize;
5use serde_json::Value as JsonValue;
6use sqlx::types::Uuid;
7
8use crate::jobs::transaction_isolation::ensure_read_committed_tx;
9use crate::{DbPool, DbTx, Error, Result};
10
11use super::super::row_decode::{
12 parse_job_stage, parse_job_type_name, parse_workflow_run_status,
13 parse_workflow_step_execution_kind, parse_workflow_type_name,
14};
15use super::super::workflow_types::WorkflowRunDbRecord;
16use super::errors::{
17 workflow_enqueue_conflicting_retry_error, workflow_internal_state_error,
18 workflow_legacy_idempotency_snapshot_missing_error,
19};
20use super::read::load_workflow_run_by_id_tx;
21use super::release::{StepReleaseCandidate, StepReleaseCandidateInit, release_candidate_step_tx};
22use super::runtime::recompute_workflow_run_statuses_tx;
23use super::steps::{
24 fetch_job_definition_defaults_tx, insert_workflow_step_dependencies_tx,
25 insert_workflow_steps_tx, workflow_step_effective_organization_id,
26 workflow_step_effective_stage,
27};
28use super::validation::workflow_dag_validation_error;
29
30struct WorkflowRunInsertOutcome {
31 record: WorkflowRunDbRecord,
32 inserted: bool,
33}
34
35#[derive(sqlx::FromRow)]
36struct WorkflowRunRow {
37 id: Uuid,
38 workflow_type: String,
39 organization_id: Option<Uuid>,
40 status: String,
41 idempotency_key: Option<String>,
42 metadata: JsonValue,
43 enqueue_request_matches: Option<bool>,
44 started_at: chrono::DateTime<chrono::Utc>,
45 finished_at: Option<chrono::DateTime<chrono::Utc>>,
46 created_at: chrono::DateTime<chrono::Utc>,
47 updated_at: chrono::DateTime<chrono::Utc>,
48}
49
50#[derive(Serialize)]
51struct CanonicalWorkflowRunEnqueueRequest<'a> {
52 metadata: &'a JsonValue,
53 steps: Vec<CanonicalWorkflowStep<'a>>,
54}
55
56#[derive(Serialize)]
57struct CanonicalWorkflowStep<'a> {
58 step_key: &'a str,
59 execution_kind: &'static str,
60 job_type: Option<&'a str>,
61 organization_id: Option<Uuid>,
62 payload: &'a JsonValue,
63 priority: Option<i32>,
64 max_attempts: Option<i32>,
65 timeout_seconds: Option<i32>,
66 stage: Option<&'static str>,
67 dependencies: Vec<CanonicalWorkflowDependency<'a>>,
68}
69
70#[derive(Serialize)]
71struct CanonicalWorkflowDependency<'a> {
72 prerequisite_step_key: &'a str,
73 release_mode: &'static str,
74}
75
76#[doc(alias = "dag")]
88#[doc(alias = "orchestration")]
89#[doc(alias = "dependencies")]
90pub async fn enqueue_workflow_run(
91 pool: &DbPool,
92 payload: &WorkflowRunEnqueue<'_>,
93) -> Result<WorkflowRunDbRecord> {
94 let mut tx = pool
95 .begin()
96 .await
97 .map_err(|error| Error::ConnectionError(error.to_string()))?;
98 let workflow_run = enqueue_workflow_run_tx(&mut tx, payload).await?;
99 tx.commit()
100 .await
101 .map_err(|error| Error::ConnectionError(error.to_string()))?;
102 Ok(workflow_run)
103}
104
105#[doc(alias = "dag")]
121#[doc(alias = "orchestration")]
122#[doc(alias = "dependencies")]
123pub async fn enqueue_workflow_run_tx(
124 tx: &mut DbTx<'_>,
125 payload: &WorkflowRunEnqueue<'_>,
126) -> Result<WorkflowRunDbRecord> {
127 validate_workflow_run_enqueue(payload).map_err(workflow_dag_validation_error)?;
128 if payload.idempotency_key().is_some() {
129 ensure_read_committed_tx(
130 tx,
131 "workflow idempotent enqueue",
132 "workflow.enqueue_idempotency_unsupported_isolation",
133 "Workflow idempotent enqueue requires READ COMMITTED transaction isolation.",
134 )
135 .await?;
136 }
137
138 let workflow_run_insert = insert_workflow_run_record_tx(tx, payload).await?;
139 let workflow_run = workflow_run_insert.record;
140 if !workflow_run_insert.inserted {
141 return Ok(workflow_run);
144 }
145
146 let defaults_by_job_type = fetch_job_definition_defaults_tx(tx, payload.steps()).await?;
147 let step_id_by_key =
148 insert_workflow_steps_tx(tx, payload, workflow_run.id, &defaults_by_job_type).await?;
149 insert_workflow_step_dependencies_tx(tx, payload, workflow_run.id, &step_id_by_key).await?;
150
151 enqueue_root_steps_tx(tx, workflow_run.id).await?;
152 recompute_workflow_run_statuses_tx(tx, &std::collections::BTreeSet::from([workflow_run.id]))
153 .await?;
154
155 load_workflow_run_by_id_tx(
156 tx,
157 workflow_run.id,
158 "load workflow run after enqueue recompute",
159 )
160 .await
161}
162
163async fn insert_workflow_run_record_tx(
164 tx: &mut DbTx<'_>,
165 payload: &WorkflowRunEnqueue<'_>,
166) -> Result<WorkflowRunInsertOutcome> {
167 let enqueue_request = payload
168 .idempotency_key()
169 .map(|_| canonical_workflow_enqueue_request(payload))
170 .transpose()?;
171 let insert_sql = format!(
176 "INSERT INTO workflow_runs (
177 workflow_type,
178 organization_id,
179 status,
180 idempotency_key,
181 metadata,
182 enqueue_request,
183 started_at
184 )
185 VALUES ($1, $2, 'RUNNING', $3, $4::jsonb, $5::jsonb, now())
186 {}
187 RETURNING
188 id,
189 workflow_type,
190 organization_id,
191 status::text AS status,
192 idempotency_key,
193 metadata,
194 NULL::boolean AS enqueue_request_matches,
195 started_at,
196 finished_at,
197 created_at,
198 updated_at",
199 enqueue_workflow_run_idempotency_conflict_clause(payload),
200 );
201 let run_row = sqlx::query_as::<_, WorkflowRunRow>(&insert_sql)
202 .bind(payload.workflow_type())
203 .bind(payload.organization_id())
204 .bind(payload.idempotency_key())
205 .bind(payload.metadata())
206 .bind(enqueue_request.as_ref())
207 .fetch_optional(&mut **tx)
208 .await
209 .map_err(|error| Error::from_query_sqlx_with_context("enqueue workflow run", error))?;
210
211 if let Some(run_row) = run_row {
212 return Ok(WorkflowRunInsertOutcome {
213 record: workflow_run_record_from_row(run_row)?,
214 inserted: true,
215 });
216 }
217
218 let (Some(idempotency_key), Some(enqueue_request)) =
219 (payload.idempotency_key(), enqueue_request.as_ref())
220 else {
221 return Err(workflow_internal_state_error(
222 "workflow run insert returned no row without an idempotency key conflict",
223 ));
224 };
225
226 let existing =
227 load_existing_idempotent_workflow_run_tx(tx, payload, idempotency_key, enqueue_request)
228 .await?;
229 validate_existing_idempotent_workflow_run(&existing)?;
230
231 Ok(WorkflowRunInsertOutcome {
232 record: workflow_run_record_from_row(existing)?,
233 inserted: false,
234 })
235}
236
237fn workflow_run_record_from_row(run_row: WorkflowRunRow) -> Result<WorkflowRunDbRecord> {
238 Ok(WorkflowRunDbRecord {
241 id: run_row.id,
242 workflow_type: parse_workflow_type_name(run_row.workflow_type)?,
243 organization_id: run_row.organization_id,
244 status: parse_workflow_run_status(run_row.status)?,
245 idempotency_key: run_row.idempotency_key,
246 metadata: run_row.metadata,
247 started_at: run_row.started_at,
248 finished_at: run_row.finished_at,
249 created_at: run_row.created_at,
250 updated_at: run_row.updated_at,
251 })
252}
253
254async fn load_existing_idempotent_workflow_run_tx(
255 tx: &mut DbTx<'_>,
256 payload: &WorkflowRunEnqueue<'_>,
257 idempotency_key: &str,
258 enqueue_request: &JsonValue,
259) -> Result<WorkflowRunRow> {
260 let run = if let Some(organization_id) = payload.organization_id() {
264 sqlx::query_as!(
265 WorkflowRunRow,
266 r#"SELECT
267 id,
268 workflow_type,
269 organization_id,
270 status::text AS "status!",
271 idempotency_key,
272 metadata,
273 enqueue_request = $4::jsonb AS "enqueue_request_matches?",
274 started_at,
275 finished_at,
276 created_at,
277 updated_at
278 FROM workflow_runs
279 WHERE workflow_type = $1
280 AND organization_id = $2
281 AND idempotency_key = $3
282 LIMIT 1
283 FOR SHARE"#,
284 payload.workflow_type() as _,
285 organization_id,
286 idempotency_key,
287 enqueue_request,
288 )
289 .fetch_optional(&mut **tx)
290 .await
291 } else {
292 sqlx::query_as!(
293 WorkflowRunRow,
294 r#"SELECT
295 id,
296 workflow_type,
297 organization_id,
298 status::text AS "status!",
299 idempotency_key,
300 metadata,
301 enqueue_request = $3::jsonb AS "enqueue_request_matches?",
302 started_at,
303 finished_at,
304 created_at,
305 updated_at
306 FROM workflow_runs
307 WHERE workflow_type = $1
308 AND organization_id IS NULL
309 AND idempotency_key = $2
310 LIMIT 1
311 FOR SHARE"#,
312 payload.workflow_type() as _,
313 idempotency_key,
314 enqueue_request,
315 )
316 .fetch_optional(&mut **tx)
317 .await
318 };
319
320 run.map_err(|error| {
321 Error::from_query_sqlx_with_context("load idempotent workflow enqueue", error)
322 })?
323 .ok_or_else(|| {
324 workflow_internal_state_error(
325 "workflow run insert conflicted but matching idempotent workflow run was not found",
326 )
327 })
328}
329
330fn enqueue_workflow_run_idempotency_conflict_clause(
331 payload: &WorkflowRunEnqueue<'_>,
332) -> &'static str {
333 match (payload.idempotency_key(), payload.organization_id()) {
336 (Some(_), Some(_)) => {
337 "ON CONFLICT (workflow_type, organization_id, idempotency_key)
338 WHERE idempotency_key IS NOT NULL
339 AND organization_id IS NOT NULL
340 DO NOTHING"
341 }
342 (Some(_), None) => {
343 "ON CONFLICT (workflow_type, idempotency_key)
344 WHERE idempotency_key IS NOT NULL
345 AND organization_id IS NULL
346 DO NOTHING"
347 }
348 (None, _) => "",
349 }
350}
351
352fn validate_existing_idempotent_workflow_run(existing: &WorkflowRunRow) -> Result<()> {
353 match existing.enqueue_request_matches {
354 Some(true) => Ok(()),
355 Some(false) => Err(workflow_enqueue_conflicting_retry_error("request")),
356 None => Err(workflow_legacy_idempotency_snapshot_missing_error(
357 existing.workflow_type.as_str(),
358 existing.id,
359 )),
360 }
361}
362
363fn canonical_workflow_enqueue_request(payload: &WorkflowRunEnqueue<'_>) -> Result<JsonValue> {
364 let mut steps = payload
365 .steps()
366 .iter()
367 .map(|step| {
368 let mut dependencies = step
369 .dependencies()
370 .iter()
371 .map(|dependency| CanonicalWorkflowDependency {
372 prerequisite_step_key: dependency.prerequisite_step_key.as_str(),
373 release_mode: dependency
374 .release_mode
375 .unwrap_or(WorkflowDependencyReleaseMode::OnTerminal)
376 .as_db_value(),
377 })
378 .collect::<Vec<_>>();
379 dependencies.sort_by(|left, right| {
380 left.prerequisite_step_key
381 .cmp(right.prerequisite_step_key)
382 .then(left.release_mode.cmp(right.release_mode))
383 });
384
385 CanonicalWorkflowStep {
386 step_key: step.step_key().as_str(),
387 execution_kind: step.execution_kind().as_db_value(),
388 job_type: step.job_type().map(|job_type| job_type.as_str()),
389 organization_id: workflow_step_effective_organization_id(
390 payload.organization_id(),
391 step,
392 ),
393 payload: step.payload(),
394 priority: step.priority(),
395 max_attempts: step.max_attempts(),
396 timeout_seconds: step.timeout_seconds(),
397 stage: workflow_step_effective_stage(step),
398 dependencies,
399 }
400 })
401 .collect::<Vec<_>>();
402 steps.sort_by(|left, right| left.step_key.cmp(right.step_key));
403
404 serde_json::to_value(CanonicalWorkflowRunEnqueueRequest {
405 metadata: payload.metadata(),
406 steps,
407 })
408 .map_err(|error| {
409 workflow_internal_state_error(format!(
410 "failed to serialize canonical workflow enqueue request: {error}"
411 ))
412 })
413}
414
415pub(crate) async fn enqueue_root_steps_tx(tx: &mut DbTx<'_>, workflow_run_id: Uuid) -> Result<()> {
416 let rows = sqlx::query!(
417 "SELECT
418 id,
419 workflow_run_id,
420 execution_kind::text AS \"execution_kind!\",
421 job_type,
422 organization_id,
423 payload,
424 priority,
425 max_attempts,
426 timeout_seconds,
427 stage
428 FROM workflow_steps
429 WHERE workflow_run_id = $1
430 AND status = 'BLOCKED'
431 AND dependency_count_pending = 0
432 ORDER BY created_at ASC
433 FOR UPDATE",
434 workflow_run_id,
435 )
436 .fetch_all(&mut **tx)
437 .await
438 .map_err(|error| {
439 Error::from_query_sqlx_with_context("lookup root workflow steps for enqueue", error)
440 })?;
441
442 for row in rows {
443 let candidate = StepReleaseCandidate::from_init(StepReleaseCandidateInit {
444 id: row.id,
445 workflow_run_id: row.workflow_run_id,
446 execution_kind: parse_workflow_step_execution_kind(row.execution_kind)?,
447 job_type: row.job_type.map(parse_job_type_name).transpose()?,
448 organization_id: row.organization_id,
449 payload: row.payload,
450 priority: row.priority,
451 max_attempts: row.max_attempts,
452 timeout_seconds: row.timeout_seconds,
453 stage: row.stage.map(parse_job_stage).transpose()?,
454 });
455 release_candidate_step_tx(tx, &candidate).await?;
456 }
457
458 Ok(())
459}
460
461#[cfg(test)]
462mod tests {
463 use runledger_core::jobs::{
464 JobStage, JobType, StepKey, WorkflowRunEnqueueBuilder, WorkflowStepEnqueueBuilder,
465 WorkflowType,
466 };
467 use serde_json::json;
468 use sqlx::types::Uuid;
469
470 use super::canonical_workflow_enqueue_request;
471
472 #[test]
473 fn canonical_workflow_enqueue_request_matches_golden_snapshot() {
474 let run_org = Uuid::now_v7();
475 let step_org = Uuid::now_v7();
476 let metadata = json!({"kind": "golden"});
477 let root_payload = json!({"step": "root"});
478 let child_payload = json!({"step": "child"});
479 let root = WorkflowStepEnqueueBuilder::new_external(StepKey::new("root"), &root_payload)
480 .try_build()
481 .expect("build root step");
482 let child = WorkflowStepEnqueueBuilder::new(
483 StepKey::new("child"),
484 JobType::new("jobs.test.child"),
485 &child_payload,
486 )
487 .organization_id(step_org)
488 .priority(7)
489 .max_attempts(2)
490 .timeout_seconds(45)
491 .stage(JobStage::Scheduled)
492 .depends_on_success(&[StepKey::new("root")])
493 .try_build()
494 .expect("build child step");
495 let workflow =
496 WorkflowRunEnqueueBuilder::new(WorkflowType::new("workflow.test.golden"), &metadata)
497 .organization_id(run_org)
498 .step(child)
499 .step(root)
500 .try_build()
501 .expect("build workflow");
502
503 let canonical =
504 canonical_workflow_enqueue_request(&workflow).expect("canonicalize workflow enqueue");
505
506 assert_eq!(
507 canonical,
508 json!({
509 "metadata": {"kind": "golden"},
510 "steps": [
511 {
512 "step_key": "child",
513 "execution_kind": "JOB",
514 "job_type": "jobs.test.child",
515 "organization_id": step_org,
516 "payload": {"step": "child"},
517 "priority": 7,
518 "max_attempts": 2,
519 "timeout_seconds": 45,
520 "stage": "scheduled",
521 "dependencies": [
522 {
523 "prerequisite_step_key": "root",
524 "release_mode": "ON_SUCCESS"
525 }
526 ]
527 },
528 {
529 "step_key": "root",
530 "execution_kind": "EXTERNAL",
531 "job_type": null,
532 "organization_id": run_org,
533 "payload": {"step": "root"},
534 "priority": null,
535 "max_attempts": null,
536 "timeout_seconds": null,
537 "stage": null,
538 "dependencies": []
539 }
540 ]
541 })
542 );
543 }
544}