1use std::collections::{BTreeMap, BTreeSet};
2
3use runledger_core::jobs::WorkflowDependencyReleaseMode;
4use sqlx::types::Uuid;
5
6use crate::{DbPool, DbTx, Error, Result};
7
8use super::super::row_decode::{
9 parse_job_stage, parse_job_type_name, parse_workflow_run_status,
10 parse_workflow_step_execution_kind, parse_workflow_type_name,
11};
12use super::super::workflow_types::WorkflowRunDbRecord;
13use super::runtime::{recompute_workflow_run_statuses_tx, release_candidate_step_tx};
14use super::{
15 JobDefinitionDefaults, workflow_dag_validation_error, workflow_definition_not_available_error,
16 workflow_dependency_count_overflow_error, workflow_internal_state_error,
17};
18use runledger_core::jobs::{
19 WorkflowRunEnqueue, WorkflowStepEnqueue, WorkflowStepExecutionKind,
20 validate_workflow_run_enqueue,
21};
22
23type DefaultsByJobType = BTreeMap<String, JobDefinitionDefaults>;
24pub(crate) type WorkflowStepIdsByKey = BTreeMap<String, Uuid>;
25
26pub async fn enqueue_workflow_run(
27 pool: &DbPool,
28 payload: &WorkflowRunEnqueue<'_>,
29) -> Result<WorkflowRunDbRecord> {
30 let mut tx = pool
31 .begin()
32 .await
33 .map_err(|error| Error::ConnectionError(error.to_string()))?;
34 let workflow_run = enqueue_workflow_run_tx(&mut tx, payload).await?;
35 tx.commit()
36 .await
37 .map_err(|error| Error::ConnectionError(error.to_string()))?;
38 Ok(workflow_run)
39}
40
41pub async fn enqueue_workflow_run_tx(
42 tx: &mut DbTx<'_>,
43 payload: &WorkflowRunEnqueue<'_>,
44) -> Result<WorkflowRunDbRecord> {
45 validate_workflow_run_enqueue(payload).map_err(workflow_dag_validation_error)?;
46
47 let defaults_by_job_type = fetch_job_definition_defaults_tx(tx, payload.steps()).await?;
48 let workflow_run = insert_workflow_run_record_tx(tx, payload).await?;
49 let step_id_by_key =
50 insert_workflow_steps_tx(tx, payload, workflow_run.id, &defaults_by_job_type).await?;
51 insert_workflow_step_dependencies_tx(tx, payload, workflow_run.id, &step_id_by_key).await?;
52
53 enqueue_root_steps_tx(tx, workflow_run.id).await?;
54 recompute_workflow_run_statuses_tx(tx, &std::collections::BTreeSet::from([workflow_run.id]))
55 .await?;
56
57 load_workflow_run_by_id_tx(tx, workflow_run.id).await
58}
59
60async fn insert_workflow_run_record_tx(
61 tx: &mut DbTx<'_>,
62 payload: &WorkflowRunEnqueue<'_>,
63) -> Result<WorkflowRunDbRecord> {
64 let run_row = sqlx::query!(
65 "INSERT INTO workflow_runs (
66 workflow_type,
67 organization_id,
68 status,
69 idempotency_key,
70 metadata,
71 started_at
72 )
73 VALUES ($1, $2, 'RUNNING', $3, $4::jsonb, now())
74 RETURNING
75 id,
76 workflow_type,
77 organization_id,
78 status::text AS \"status!\",
79 idempotency_key,
80 metadata,
81 started_at,
82 finished_at,
83 created_at,
84 updated_at",
85 payload.workflow_type() as _,
86 payload.organization_id(),
87 payload.idempotency_key(),
88 payload.metadata(),
89 )
90 .fetch_one(&mut **tx)
91 .await
92 .map_err(|error| Error::from_query_sqlx_with_context("enqueue workflow run", error))?;
93
94 Ok(WorkflowRunDbRecord {
95 id: run_row.id,
96 workflow_type: parse_workflow_type_name(run_row.workflow_type)?,
97 organization_id: run_row.organization_id,
98 status: parse_workflow_run_status(run_row.status)?,
99 idempotency_key: run_row.idempotency_key,
100 metadata: run_row.metadata,
101 started_at: run_row.started_at,
102 finished_at: run_row.finished_at,
103 created_at: run_row.created_at,
104 updated_at: run_row.updated_at,
105 })
106}
107
108pub(crate) fn dependency_count_total(step: &WorkflowStepEnqueue<'_>) -> Result<i32> {
109 i32::try_from(step.dependencies().len())
110 .map_err(|_| workflow_dependency_count_overflow_error(step.step_key().as_str()))
111}
112
113pub(crate) fn workflow_step_effective_organization_id(
114 workflow_organization_id: Option<Uuid>,
115 step: &WorkflowStepEnqueue<'_>,
116) -> Option<Uuid> {
117 step.organization_id().or(workflow_organization_id)
118}
119
120pub(crate) fn workflow_step_defaults<'a>(
121 defaults_by_job_type: &'a DefaultsByJobType,
122 step: &WorkflowStepEnqueue<'_>,
123) -> Result<&'a JobDefinitionDefaults> {
124 let job_type = step
125 .job_type()
126 .ok_or_else(|| workflow_internal_state_error("job workflow step missing job_type"))?;
127
128 defaults_by_job_type
129 .get(job_type.as_str())
130 .ok_or_else(|| workflow_definition_not_available_error(job_type.as_str()))
131}
132
133pub(crate) async fn insert_workflow_step_record_tx(
134 tx: &mut DbTx<'_>,
135 workflow_run_id: Uuid,
136 organization_id: Option<Uuid>,
137 step: &WorkflowStepEnqueue<'_>,
138 defaults: Option<&JobDefinitionDefaults>,
139 dependency_count_pending: i32,
140 dependency_count_unsatisfied: i32,
141) -> Result<Uuid> {
142 let dependency_count_total = dependency_count_total(step)?;
143 let (job_type, priority, max_attempts, timeout_seconds, stage) = match step.execution_kind() {
144 WorkflowStepExecutionKind::Job => {
145 let defaults = defaults.ok_or_else(|| {
146 workflow_internal_state_error("missing job definition defaults for job step")
147 })?;
148 let job_type = step.job_type().ok_or_else(|| {
149 workflow_internal_state_error("missing job_type for job workflow step")
150 })?;
151
152 (
153 Some(job_type.as_str()),
154 Some(step.priority().unwrap_or(defaults.default_priority)),
155 Some(step.max_attempts().unwrap_or(defaults.max_attempts)),
156 Some(
157 step.timeout_seconds()
158 .unwrap_or(defaults.default_timeout_seconds),
159 ),
160 Some(
161 step.stage()
162 .unwrap_or(runledger_core::jobs::JobStage::Queued)
163 .as_db_value(),
164 ),
165 )
166 }
167 WorkflowStepExecutionKind::External => (None, None, None, None, None),
168 };
169 let step_id: Uuid = sqlx::query_scalar!(
170 "INSERT INTO workflow_steps (
171 workflow_run_id,
172 step_key,
173 execution_kind,
174 job_type,
175 organization_id,
176 payload,
177 priority,
178 max_attempts,
179 timeout_seconds,
180 stage,
181 status,
182 dependency_count_total,
183 dependency_count_pending,
184 dependency_count_unsatisfied
185 )
186 VALUES (
187 $1,
188 $2,
189 $3::text::workflow_step_execution_kind,
190 $4,
191 $5,
192 $6::jsonb,
193 $7,
194 $8,
195 $9,
196 $10,
197 'BLOCKED',
198 $11,
199 $12,
200 $13
201 )
202 RETURNING id",
203 workflow_run_id,
204 step.step_key() as _,
205 step.execution_kind().as_db_value(),
206 job_type,
207 organization_id,
208 step.payload(),
209 priority,
210 max_attempts,
211 timeout_seconds,
212 stage,
213 dependency_count_total,
214 dependency_count_pending,
215 dependency_count_unsatisfied,
216 )
217 .fetch_one(&mut **tx)
218 .await
219 .map_err(|error| Error::from_query_sqlx_with_context("insert workflow step", error))?;
220
221 Ok(step_id)
222}
223
224pub(crate) async fn insert_workflow_steps_tx(
225 tx: &mut DbTx<'_>,
226 payload: &WorkflowRunEnqueue<'_>,
227 workflow_run_id: Uuid,
228 defaults_by_job_type: &DefaultsByJobType,
229) -> Result<WorkflowStepIdsByKey> {
230 let mut step_id_by_key = WorkflowStepIdsByKey::new();
231 for step in payload.steps() {
232 let defaults = match step.execution_kind() {
233 WorkflowStepExecutionKind::Job => {
234 Some(workflow_step_defaults(defaults_by_job_type, step)?)
235 }
236 WorkflowStepExecutionKind::External => None,
237 };
238 let step_id = insert_workflow_step_record_tx(
239 tx,
240 workflow_run_id,
241 workflow_step_effective_organization_id(payload.organization_id(), step),
242 step,
243 defaults,
244 dependency_count_total(step)?,
245 0,
246 )
247 .await?;
248 step_id_by_key.insert(step.step_key().as_str().to_owned(), step_id);
249 }
250
251 Ok(step_id_by_key)
252}
253
254pub(crate) fn step_id_for_key(
255 step_id_by_key: &WorkflowStepIdsByKey,
256 step_key: &str,
257 missing_error: &'static str,
258) -> Result<Uuid> {
259 step_id_by_key
260 .get(step_key)
261 .copied()
262 .ok_or_else(|| workflow_internal_state_error(missing_error))
263}
264
265pub(crate) async fn insert_workflow_step_dependency_record_tx(
266 tx: &mut DbTx<'_>,
267 workflow_run_id: Uuid,
268 prerequisite_step_id: Uuid,
269 dependent_step_id: Uuid,
270 release_mode: &str,
271) -> Result<()> {
272 sqlx::query!(
273 "INSERT INTO workflow_step_dependencies (
274 workflow_run_id,
275 prerequisite_step_id,
276 dependent_step_id,
277 release_mode
278 )
279 VALUES ($1, $2, $3, $4::text::workflow_dependency_release_mode)",
280 workflow_run_id,
281 prerequisite_step_id,
282 dependent_step_id,
283 release_mode,
284 )
285 .execute(&mut **tx)
286 .await
287 .map_err(|error| {
288 Error::from_query_sqlx_with_context("insert workflow step dependency", error)
289 })?;
290
291 Ok(())
292}
293
294pub(crate) async fn insert_workflow_step_dependencies_tx(
295 tx: &mut DbTx<'_>,
296 payload: &WorkflowRunEnqueue<'_>,
297 workflow_run_id: Uuid,
298 step_id_by_key: &WorkflowStepIdsByKey,
299) -> Result<()> {
300 for step in payload.steps() {
301 let dependent_step_id = step_id_for_key(
302 step_id_by_key,
303 step.step_key().as_str(),
304 "missing dependent workflow step id",
305 )?;
306 for dependency in step.dependencies() {
307 let prerequisite_step_id = step_id_for_key(
308 step_id_by_key,
309 dependency.prerequisite_step_key.as_str(),
310 "missing prerequisite workflow step id",
311 )?;
312 let release_mode = dependency
313 .release_mode
314 .unwrap_or(WorkflowDependencyReleaseMode::OnTerminal)
315 .as_db_value();
316 insert_workflow_step_dependency_record_tx(
317 tx,
318 workflow_run_id,
319 prerequisite_step_id,
320 dependent_step_id,
321 release_mode,
322 )
323 .await?;
324 }
325 }
326
327 Ok(())
328}
329
330pub(crate) async fn fetch_job_definition_defaults_tx(
331 tx: &mut DbTx<'_>,
332 steps: &[WorkflowStepEnqueue<'_>],
333) -> Result<DefaultsByJobType> {
334 let job_types: Vec<String> = steps
335 .iter()
336 .filter(|step| step.execution_kind() == WorkflowStepExecutionKind::Job)
337 .map(|step| {
338 step.job_type()
339 .map(|job_type| job_type.as_str().to_owned())
340 .ok_or_else(|| workflow_internal_state_error("job workflow step missing job_type"))
341 })
342 .collect::<Result<BTreeSet<_>>>()?
343 .into_iter()
344 .collect();
345
346 let rows = sqlx::query!(
347 "SELECT job_type, default_priority, max_attempts, default_timeout_seconds
348 FROM job_definitions
349 WHERE is_enabled = true
350 AND job_type = ANY($1::text[])",
351 &job_types,
352 )
353 .fetch_all(&mut **tx)
354 .await
355 .map_err(|error| {
356 Error::from_query_sqlx_with_context("lookup workflow step job definition defaults", error)
357 })?;
358
359 let defaults_by_job_type: DefaultsByJobType = rows
360 .into_iter()
361 .map(|row| {
362 (
363 row.job_type,
364 JobDefinitionDefaults {
365 default_priority: row.default_priority,
366 max_attempts: row.max_attempts,
367 default_timeout_seconds: row.default_timeout_seconds,
368 },
369 )
370 })
371 .collect();
372
373 if let Some(step) = steps
374 .iter()
375 .filter(|step| step.execution_kind() == WorkflowStepExecutionKind::Job)
376 .find(|step| {
377 step.job_type()
378 .is_none_or(|job_type| !defaults_by_job_type.contains_key(job_type.as_str()))
379 })
380 {
381 return Err(workflow_definition_not_available_error(
382 step.job_type()
383 .map(|job_type| job_type.as_str())
384 .unwrap_or("<missing-job-type>"),
385 ));
386 }
387
388 Ok(defaults_by_job_type)
389}
390
391pub(crate) async fn enqueue_root_steps_tx(tx: &mut DbTx<'_>, workflow_run_id: Uuid) -> Result<()> {
392 let rows = sqlx::query!(
393 "SELECT
394 id,
395 workflow_run_id,
396 execution_kind::text AS \"execution_kind!\",
397 job_type,
398 organization_id,
399 payload,
400 priority,
401 max_attempts,
402 timeout_seconds,
403 stage
404 FROM workflow_steps
405 WHERE workflow_run_id = $1
406 AND status = 'BLOCKED'
407 AND dependency_count_pending = 0
408 ORDER BY created_at ASC
409 FOR UPDATE",
410 workflow_run_id,
411 )
412 .fetch_all(&mut **tx)
413 .await
414 .map_err(|error| {
415 Error::from_query_sqlx_with_context("lookup root workflow steps for enqueue", error)
416 })?;
417
418 for row in rows {
419 let candidate = super::StepReleaseCandidate {
420 id: row.id,
421 workflow_run_id: row.workflow_run_id,
422 execution_kind: parse_workflow_step_execution_kind(row.execution_kind)?,
423 job_type: row.job_type.map(parse_job_type_name).transpose()?,
424 organization_id: row.organization_id,
425 payload: row.payload,
426 priority: row.priority,
427 max_attempts: row.max_attempts,
428 timeout_seconds: row.timeout_seconds,
429 stage: row.stage.map(parse_job_stage).transpose()?,
430 };
431 release_candidate_step_tx(tx, &candidate).await?;
432 }
433
434 Ok(())
435}
436
437pub(crate) async fn load_workflow_run_by_id_tx(
438 tx: &mut DbTx<'_>,
439 workflow_run_id: Uuid,
440) -> Result<WorkflowRunDbRecord> {
441 let run_row = sqlx::query!(
442 "SELECT
443 id,
444 workflow_type,
445 organization_id,
446 status::text AS \"status!\",
447 idempotency_key,
448 metadata,
449 started_at,
450 finished_at,
451 created_at,
452 updated_at
453 FROM workflow_runs
454 WHERE id = $1",
455 workflow_run_id,
456 )
457 .fetch_one(&mut **tx)
458 .await
459 .map_err(|error| {
460 Error::from_query_sqlx_with_context("load workflow run after enqueue recompute", error)
461 })?;
462
463 Ok(WorkflowRunDbRecord {
464 id: run_row.id,
465 workflow_type: parse_workflow_type_name(run_row.workflow_type)?,
466 organization_id: run_row.organization_id,
467 status: parse_workflow_run_status(run_row.status)?,
468 idempotency_key: run_row.idempotency_key,
469 metadata: run_row.metadata,
470 started_at: run_row.started_at,
471 finished_at: run_row.finished_at,
472 created_at: run_row.created_at,
473 updated_at: run_row.updated_at,
474 })
475}