1use std::time::Duration;
43
44use ff_core::contracts::{EdgeDependencyPolicy, OnSatisfied};
45use ff_core::engine_error::{ContentionKind, EngineError};
46use serde_json::Value as JsonValue;
47use sqlx::{PgPool, Row};
48use uuid::Uuid;
49
50use crate::error::map_sqlx_error;
51
52const ADVANCE_MAX_ATTEMPTS: u32 = 3;
55
56#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum DispatchOutcome {
61 NoOp,
64 Advanced(usize),
68}
69
70#[tracing::instrument(name = "pg.dispatch_completion", skip(pool))]
77pub async fn dispatch_completion(
78 pool: &PgPool,
79 event_id: i64,
80) -> Result<DispatchOutcome, EngineError> {
81 let now = now_ms();
84 let row = sqlx::query(
85 r#"
86 UPDATE ff_completion_event
87 SET dispatched_at_ms = $2
88 WHERE event_id = $1
89 AND dispatched_at_ms IS NULL
90 RETURNING partition_key, execution_id, flow_id, outcome
91 "#,
92 )
93 .bind(event_id)
94 .bind(now)
95 .fetch_optional(pool)
96 .await
97 .map_err(map_sqlx_error)?;
98
99 let Some(row) = row else {
100 return Ok(DispatchOutcome::NoOp);
101 };
102
103 let partition_key: i16 = row.get("partition_key");
104 let execution_id: Uuid = row.get("execution_id");
105 let flow_id: Option<Uuid> = row.get("flow_id");
106 let outcome: String = row.get("outcome");
107
108 let Some(flow_id) = flow_id else {
109 return Ok(DispatchOutcome::Advanced(0));
112 };
113
114 let edges = sqlx::query(
118 r#"
119 SELECT edge_id, downstream_eid
120 FROM ff_edge
121 WHERE partition_key = $1 AND flow_id = $2 AND upstream_eid = $3
122 "#,
123 )
124 .bind(partition_key)
125 .bind(flow_id)
126 .bind(execution_id)
127 .fetch_all(pool)
128 .await
129 .map_err(map_sqlx_error)?;
130
131 if edges.is_empty() {
132 return Ok(DispatchOutcome::Advanced(0));
133 }
134
135 let outcome_kind = OutcomeKind::from_str(&outcome);
136
137 let mut advanced: usize = 0;
139 for edge in &edges {
140 let downstream_eid: Uuid = edge.get("downstream_eid");
141 advance_edge_group_with_retry(
142 pool,
143 partition_key,
144 flow_id,
145 execution_id,
146 downstream_eid,
147 outcome_kind,
148 )
149 .await?;
150 advanced += 1;
151 }
152
153 Ok(DispatchOutcome::Advanced(advanced))
154}
155
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
165enum OutcomeKind {
166 Success,
167 Fail,
168 Skip,
169}
170
171impl OutcomeKind {
172 fn from_str(s: &str) -> Self {
173 match s {
174 "success" => Self::Success,
175 "skipped" => Self::Skip,
176 _ => Self::Fail,
177 }
178 }
179}
180
181async fn advance_edge_group_with_retry(
184 pool: &PgPool,
185 partition_key: i16,
186 flow_id: Uuid,
187 upstream_eid: Uuid,
188 downstream_eid: Uuid,
189 outcome: OutcomeKind,
190) -> Result<(), EngineError> {
191 for attempt in 0..ADVANCE_MAX_ATTEMPTS {
192 match advance_edge_group(
193 pool,
194 partition_key,
195 flow_id,
196 upstream_eid,
197 downstream_eid,
198 outcome,
199 )
200 .await
201 {
202 Ok(()) => return Ok(()),
203 Err(err) if is_serialization_conflict(&err) => {
204 if attempt + 1 < ADVANCE_MAX_ATTEMPTS {
205 let ms = 5u64 * (1u64 << attempt);
206 tokio::time::sleep(Duration::from_millis(ms)).await;
207 continue;
208 }
209 return Err(EngineError::Contention(ContentionKind::RetryExhausted));
210 }
211 Err(err) => return Err(err),
212 }
213 }
214 Err(EngineError::Contention(ContentionKind::RetryExhausted))
215}
216
217fn is_serialization_conflict(err: &EngineError) -> bool {
218 if matches!(err, EngineError::Contention(ContentionKind::LeaseConflict)) {
225 return true;
226 }
227 if let EngineError::Transport { source, .. } = err
228 && let Some(sqlx_err) = source.downcast_ref::<sqlx::Error>()
229 && let Some(db) = sqlx_err.as_database_error()
230 && let Some(code) = db.code()
231 && code.as_ref() == "55P03"
232 {
233 return true;
236 }
237 false
238}
239
240#[tracing::instrument(
244 name = "pg.advance_edge_group",
245 skip(pool),
246 fields(
247 part = partition_key,
248 flow = %flow_id,
249 downstream = %downstream_eid,
250 )
251)]
252async fn advance_edge_group(
253 pool: &PgPool,
254 partition_key: i16,
255 flow_id: Uuid,
256 upstream_eid: Uuid,
257 downstream_eid: Uuid,
258 outcome: OutcomeKind,
259) -> Result<(), EngineError> {
260 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
261
262 sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
263 .execute(&mut *tx)
264 .await
265 .map_err(map_sqlx_error)?;
266
267 let row = sqlx::query(
269 r#"
270 SELECT policy, success_count, fail_count, skip_count, running_count,
271 cancel_siblings_pending_flag
272 FROM ff_edge_group
273 WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3
274 FOR UPDATE
275 "#,
276 )
277 .bind(partition_key)
278 .bind(flow_id)
279 .bind(downstream_eid)
280 .fetch_optional(&mut *tx)
281 .await
282 .map_err(map_sqlx_error)?;
283
284 let Some(row) = row else {
285 tx.commit().await.map_err(map_sqlx_error)?;
290 return Ok(());
291 };
292
293 let policy_raw: JsonValue = row.get("policy");
294 let mut success: i32 = row.get("success_count");
295 let mut fail: i32 = row.get("fail_count");
296 let mut skip: i32 = row.get("skip_count");
297 let mut running: i32 = row.get("running_count");
298 let already_flagged: bool = row.get("cancel_siblings_pending_flag");
299
300 if running > 0 {
305 running -= 1;
306 }
307 match outcome {
308 OutcomeKind::Success => success += 1,
309 OutcomeKind::Fail => fail += 1,
310 OutcomeKind::Skip => skip += 1,
311 }
312
313 let policy = decode_policy(&policy_raw);
314 let total = success + fail + skip + running.max(0);
315 let decision = evaluate(&policy, success, fail, skip, total);
316
317 sqlx::query(
321 r#"
322 UPDATE ff_edge_group
323 SET success_count = $4,
324 fail_count = $5,
325 skip_count = $6,
326 running_count = $7
327 WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3
328 "#,
329 )
330 .bind(partition_key)
331 .bind(flow_id)
332 .bind(downstream_eid)
333 .bind(success)
334 .bind(fail)
335 .bind(skip)
336 .bind(running.max(0))
337 .execute(&mut *tx)
338 .await
339 .map_err(map_sqlx_error)?;
340
341 let now = now_ms();
342
343 match decision {
344 Decision::Pending => { }
345 Decision::Satisfied { cancel_siblings } => {
346 sqlx::query(
348 r#"
349 UPDATE ff_exec_core
350 SET eligibility_state = 'eligible_now',
351 lifecycle_phase = CASE
352 WHEN lifecycle_phase = 'blocked' THEN 'runnable'
353 ELSE lifecycle_phase
354 END
355 WHERE partition_key = $1 AND execution_id = $2
356 AND lifecycle_phase NOT IN ('terminal','cancelled')
357 "#,
358 )
359 .bind(partition_key)
360 .bind(downstream_eid)
361 .execute(&mut *tx)
362 .await
363 .map_err(map_sqlx_error)?;
364
365 if cancel_siblings && !already_flagged {
366 let sibling_rows = sqlx::query(
373 r#"
374 SELECT ff_exec_core.execution_id
375 FROM ff_exec_core
376 JOIN ff_edge ON ff_edge.upstream_eid = ff_exec_core.execution_id
377 WHERE ff_exec_core.partition_key = $1
378 AND ff_edge.partition_key = $1
379 AND ff_edge.flow_id = $2
380 AND ff_edge.downstream_eid = $3
381 AND ff_exec_core.lifecycle_phase NOT IN ('terminal','cancelled')
382 AND ff_exec_core.public_state <> 'skipped'
383 AND ff_exec_core.execution_id <> $4
384 "#,
385 )
386 .bind(partition_key)
387 .bind(flow_id)
388 .bind(downstream_eid)
389 .bind(upstream_eid)
390 .fetch_all(&mut *tx)
391 .await
392 .map_err(map_sqlx_error)?;
393 let members: Vec<String> = sibling_rows
394 .iter()
395 .map(|r| {
396 let u: Uuid = r.get("execution_id");
397 u.to_string()
398 })
399 .collect();
400
401 sqlx::query(
402 r#"
403 INSERT INTO ff_pending_cancel_groups
404 (partition_key, flow_id, downstream_eid, enqueued_at_ms)
405 VALUES ($1, $2, $3, $4)
406 ON CONFLICT DO NOTHING
407 "#,
408 )
409 .bind(partition_key)
410 .bind(flow_id)
411 .bind(downstream_eid)
412 .bind(now)
413 .execute(&mut *tx)
414 .await
415 .map_err(map_sqlx_error)?;
416
417 sqlx::query(
418 r#"
419 UPDATE ff_edge_group
420 SET cancel_siblings_pending_flag = TRUE,
421 cancel_siblings_pending_members = $4
422 WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3
423 "#,
424 )
425 .bind(partition_key)
426 .bind(flow_id)
427 .bind(downstream_eid)
428 .bind(&members)
429 .execute(&mut *tx)
430 .await
431 .map_err(map_sqlx_error)?;
432 }
433 }
434 Decision::Impossible => {
435 let updated = sqlx::query(
440 r#"
441 UPDATE ff_exec_core
442 SET lifecycle_phase = 'terminal',
443 eligibility_state = 'not_applicable',
444 public_state = 'skipped',
445 attempt_state = 'attempt_terminal',
446 terminal_at_ms = COALESCE(terminal_at_ms, $3)
447 WHERE partition_key = $1 AND execution_id = $2
448 AND lifecycle_phase NOT IN ('terminal','cancelled')
449 RETURNING execution_id
450 "#,
451 )
452 .bind(partition_key)
453 .bind(downstream_eid)
454 .bind(now)
455 .fetch_optional(&mut *tx)
456 .await
457 .map_err(map_sqlx_error)?;
458
459 if updated.is_some() {
460 sqlx::query(
461 r#"
462 INSERT INTO ff_completion_event
463 (partition_key, execution_id, flow_id, outcome, occurred_at_ms)
464 VALUES ($1, $2, $3, 'skipped', $4)
465 "#,
466 )
467 .bind(partition_key)
468 .bind(downstream_eid)
469 .bind(flow_id)
470 .bind(now)
471 .execute(&mut *tx)
472 .await
473 .map_err(map_sqlx_error)?;
474 }
475 }
476 }
477
478 tx.commit().await.map_err(map_sqlx_error)?;
479 Ok(())
480}
481
482#[derive(Debug, Clone, Copy, PartialEq, Eq)]
484enum Decision {
485 Pending,
487 Satisfied { cancel_siblings: bool },
490 Impossible,
492}
493
494fn evaluate(
495 policy: &EdgeDependencyPolicy,
496 success: i32,
497 fail: i32,
498 skip: i32,
499 total: i32,
500) -> Decision {
501 let nonsuccess = fail + skip;
502 match policy {
503 EdgeDependencyPolicy::AllOf => {
504 if success == total && total > 0 {
505 Decision::Satisfied { cancel_siblings: false }
506 } else if nonsuccess > 0 {
507 Decision::Impossible
509 } else {
510 Decision::Pending
511 }
512 }
513 EdgeDependencyPolicy::AnyOf { on_satisfied } => {
514 if success >= 1 {
515 Decision::Satisfied {
516 cancel_siblings: matches!(on_satisfied, OnSatisfied::CancelRemaining),
517 }
518 } else if nonsuccess >= total && total > 0 {
519 Decision::Impossible
520 } else {
521 Decision::Pending
522 }
523 }
524 EdgeDependencyPolicy::Quorum { k, on_satisfied } => {
525 let k = *k as i32;
526 if success >= k {
527 Decision::Satisfied {
528 cancel_siblings: matches!(on_satisfied, OnSatisfied::CancelRemaining),
529 }
530 } else if total - nonsuccess < k {
531 Decision::Impossible
533 } else {
534 Decision::Pending
535 }
536 }
537 _ => Decision::Pending,
540 }
541}
542
543fn decode_policy(v: &JsonValue) -> EdgeDependencyPolicy {
544 let kind = v.get("kind").and_then(|k| k.as_str()).unwrap_or("all_of");
545 match kind {
546 "any_of" => EdgeDependencyPolicy::AnyOf {
547 on_satisfied: parse_on_satisfied(v),
548 },
549 "quorum" => {
550 let k = v
551 .get("k")
552 .and_then(|x| x.as_u64())
553 .and_then(|n| u32::try_from(n).ok())
554 .unwrap_or(1);
555 EdgeDependencyPolicy::Quorum {
556 k,
557 on_satisfied: parse_on_satisfied(v),
558 }
559 }
560 _ => EdgeDependencyPolicy::AllOf,
561 }
562}
563
564fn parse_on_satisfied(v: &JsonValue) -> OnSatisfied {
565 match v.get("on_satisfied").and_then(|x| x.as_str()) {
566 Some("let_run") => OnSatisfied::LetRun,
567 _ => OnSatisfied::CancelRemaining,
568 }
569}
570
571fn now_ms() -> i64 {
572 i64::try_from(
573 std::time::SystemTime::now()
574 .duration_since(std::time::UNIX_EPOCH)
575 .map(|d| d.as_millis())
576 .unwrap_or(0),
577 )
578 .unwrap_or(i64::MAX)
579}
580
581#[cfg(test)]
584mod tests {
585 use super::*;
586
587 #[test]
588 fn evaluate_all_of_satisfied() {
589 let d = evaluate(&EdgeDependencyPolicy::AllOf, 3, 0, 0, 3);
590 assert_eq!(d, Decision::Satisfied { cancel_siblings: false });
591 }
592
593 #[test]
594 fn evaluate_all_of_impossible_on_fail() {
595 let d = evaluate(&EdgeDependencyPolicy::AllOf, 2, 1, 0, 3);
596 assert_eq!(d, Decision::Impossible);
597 }
598
599 #[test]
600 fn evaluate_all_of_pending() {
601 let d = evaluate(&EdgeDependencyPolicy::AllOf, 1, 0, 0, 3);
602 assert_eq!(d, Decision::Pending);
603 }
604
605 #[test]
606 fn evaluate_any_of_cancels_siblings() {
607 let d = evaluate(
608 &EdgeDependencyPolicy::AnyOf {
609 on_satisfied: OnSatisfied::CancelRemaining,
610 },
611 1, 0, 0, 3,
612 );
613 assert_eq!(d, Decision::Satisfied { cancel_siblings: true });
614 }
615
616 #[test]
617 fn evaluate_any_of_let_run() {
618 let d = evaluate(
619 &EdgeDependencyPolicy::AnyOf {
620 on_satisfied: OnSatisfied::LetRun,
621 },
622 1, 0, 0, 3,
623 );
624 assert_eq!(d, Decision::Satisfied { cancel_siblings: false });
625 }
626
627 #[test]
628 fn evaluate_any_of_impossible_when_all_fail() {
629 let d = evaluate(
630 &EdgeDependencyPolicy::AnyOf {
631 on_satisfied: OnSatisfied::CancelRemaining,
632 },
633 0, 3, 0, 3,
634 );
635 assert_eq!(d, Decision::Impossible);
636 }
637
638 #[test]
639 fn evaluate_quorum_satisfied_at_k() {
640 let d = evaluate(
641 &EdgeDependencyPolicy::Quorum {
642 k: 2,
643 on_satisfied: OnSatisfied::LetRun,
644 },
645 2, 0, 1, 3,
646 );
647 assert_eq!(d, Decision::Satisfied { cancel_siblings: false });
648 }
649
650 #[test]
651 fn evaluate_quorum_impossible_when_headroom_exhausted() {
652 let d = evaluate(
654 &EdgeDependencyPolicy::Quorum {
655 k: 3,
656 on_satisfied: OnSatisfied::CancelRemaining,
657 },
658 0, 3, 0, 5,
659 );
660 assert_eq!(d, Decision::Impossible);
661 }
662
663 #[test]
664 fn outcome_kind_mapping() {
665 assert_eq!(OutcomeKind::from_str("success"), OutcomeKind::Success);
666 assert_eq!(OutcomeKind::from_str("failed"), OutcomeKind::Fail);
667 assert_eq!(OutcomeKind::from_str("skipped"), OutcomeKind::Skip);
668 assert_eq!(OutcomeKind::from_str("cancelled"), OutcomeKind::Fail);
669 }
670}