1use std::collections::BTreeMap;
28use std::time::Duration;
29
30use ff_core::backend::CancelFlowPolicy;
31use ff_core::contracts::{
32 CancelFlowResult, EdgeDependencyPolicy, EdgeDirection, EdgeGroupSnapshot, EdgeGroupState,
33 EdgeSnapshot, FlowSnapshot, FlowStatus, FlowSummary, ListFlowsPage, OnSatisfied,
34 SetEdgeGroupPolicyResult,
35};
36use ff_core::engine_error::{
37 ContentionKind, EngineError, ValidationKind,
38};
39use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
40use ff_core::types::{EdgeId, ExecutionId, FlowId, Namespace, TimestampMs};
41use serde_json::Value as JsonValue;
42use sqlx::postgres::PgRow;
43use sqlx::{PgPool, Row};
44use uuid::Uuid;
45
46use crate::error::map_sqlx_error;
47
48const CANCEL_FLOW_MAX_ATTEMPTS: u32 = 3;
53
54fn partition_index_from_key(key: &PartitionKey) -> Result<i16, EngineError> {
57 let p = key.parse().map_err(|e| EngineError::Validation {
58 kind: ValidationKind::InvalidInput,
59 detail: format!("partition_key: {e}"),
60 })?;
61 Ok(p.index as i16)
62}
63
64fn flow_partition_byte(
68 flow_id: &FlowId,
69 partition_config: &ff_core::partition::PartitionConfig,
70) -> i16 {
71 ff_core::partition::flow_partition(flow_id, partition_config).index as i16
72}
73
74fn raw_str<'a>(raw: &'a JsonValue, key: &str) -> Option<&'a str> {
77 raw.get(key).and_then(|v| v.as_str())
78}
79
80fn raw_u64(raw: &JsonValue, key: &str) -> Option<u64> {
84 raw.get(key).and_then(|v| {
85 v.as_u64()
86 .or_else(|| v.as_str().and_then(|s| s.parse::<u64>().ok()))
87 })
88}
89
90fn raw_u32(raw: &JsonValue, key: &str) -> Option<u32> {
91 raw.get(key).and_then(|v| {
92 v.as_u64()
93 .or_else(|| v.as_str().and_then(|s| s.parse::<u64>().ok()))
94 .and_then(|n| u32::try_from(n).ok())
95 })
96}
97
98fn extract_tags(raw: &JsonValue) -> BTreeMap<String, String> {
101 let mut tags = BTreeMap::new();
102 let Some(obj) = raw.as_object() else {
103 return tags;
104 };
105 for (k, v) in obj {
106 if !is_namespaced_tag_key(k) {
107 continue;
108 }
109 if let Some(s) = v.as_str() {
110 tags.insert(k.clone(), s.to_owned());
111 }
112 }
113 tags
114}
115
116fn is_namespaced_tag_key(k: &str) -> bool {
117 let mut chars = k.chars();
118 let Some(first) = chars.next() else {
119 return false;
120 };
121 if !first.is_ascii_lowercase() {
122 return false;
123 }
124 let mut saw_dot = false;
125 for c in chars {
126 if c == '.' {
127 saw_dot = true;
128 break;
129 }
130 if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
131 return false;
132 }
133 }
134 saw_dot
135}
136
137fn parse_on_satisfied(s: &str) -> OnSatisfied {
138 match s {
139 "let_run" => OnSatisfied::LetRun,
140 _ => OnSatisfied::CancelRemaining,
141 }
142}
143
144fn decode_edge_policy(v: &JsonValue) -> EdgeDependencyPolicy {
148 let kind = v.get("kind").and_then(|k| k.as_str()).unwrap_or("all_of");
149 match kind {
150 "any_of" => {
151 let on = v
152 .get("on_satisfied")
153 .and_then(|x| x.as_str())
154 .map(parse_on_satisfied)
155 .unwrap_or(OnSatisfied::CancelRemaining);
156 EdgeDependencyPolicy::AnyOf { on_satisfied: on }
157 }
158 "quorum" => {
159 let k = v
160 .get("k")
161 .and_then(|x| x.as_u64())
162 .and_then(|n| u32::try_from(n).ok())
163 .unwrap_or(1);
164 let on = v
165 .get("on_satisfied")
166 .and_then(|x| x.as_str())
167 .map(parse_on_satisfied)
168 .unwrap_or(OnSatisfied::CancelRemaining);
169 EdgeDependencyPolicy::Quorum { k, on_satisfied: on }
170 }
171 _ => EdgeDependencyPolicy::AllOf,
172 }
173}
174
175fn encode_edge_policy(p: &EdgeDependencyPolicy) -> JsonValue {
176 match p {
177 EdgeDependencyPolicy::AllOf => serde_json::json!({ "kind": "all_of" }),
178 EdgeDependencyPolicy::AnyOf { on_satisfied } => serde_json::json!({
179 "kind": "any_of",
180 "on_satisfied": on_satisfied.variant_str(),
181 }),
182 EdgeDependencyPolicy::Quorum { k, on_satisfied } => serde_json::json!({
183 "kind": "quorum",
184 "k": k,
185 "on_satisfied": on_satisfied.variant_str(),
186 }),
187 _ => serde_json::json!({ "kind": "all_of" }),
191 }
192}
193
194fn decode_edge_group_row(row: &PgRow) -> Result<EdgeGroupSnapshot, EngineError> {
196 let downstream_uuid: Uuid = row.get("downstream_eid");
197 let policy_raw: JsonValue = row.get("policy");
198 let k_target: i32 = row.get("k_target");
199 let success_count: i32 = row.get("success_count");
200 let fail_count: i32 = row.get("fail_count");
201 let skip_count: i32 = row.get("skip_count");
202 let running_count: i32 = row.get("running_count");
203
204 let part: i16 = row.get("partition_key");
207 let downstream_id = ExecutionId::parse(&format!("{{fp:{part}}}:{downstream_uuid}"))
208 .map_err(|e| EngineError::Validation {
209 kind: ValidationKind::Corruption,
210 detail: format!("ff_edge_group.downstream_eid: {e}"),
211 })?;
212
213 let policy = decode_edge_policy(&policy_raw);
214 let total: u32 = success_count.max(0) as u32
217 + fail_count.max(0) as u32
218 + skip_count.max(0) as u32
219 + running_count.max(0) as u32;
220
221 let state = if k_target > 0 && success_count >= k_target {
223 EdgeGroupState::Satisfied
224 } else {
225 EdgeGroupState::Pending
226 };
227
228 Ok(EdgeGroupSnapshot::new(
229 downstream_id,
230 policy,
231 total,
232 success_count.max(0) as u32,
233 fail_count.max(0) as u32,
234 skip_count.max(0) as u32,
235 running_count.max(0) as u32,
236 state,
237 ))
238}
239
240fn decode_flow_row(
243 flow_id: FlowId,
244 row: &PgRow,
245 edge_groups: Vec<EdgeGroupSnapshot>,
246) -> Result<FlowSnapshot, EngineError> {
247 let public_flow_state: String = row.get("public_flow_state");
248 let graph_revision_i: i64 = row.get("graph_revision");
249 let created_at_ms: i64 = row.get("created_at_ms");
250 let terminal_at_ms: Option<i64> = row.get("terminal_at_ms");
251 let raw_fields: JsonValue = row.get("raw_fields");
252
253 let flow_kind = raw_str(&raw_fields, "flow_kind")
254 .unwrap_or("")
255 .to_owned();
256 let namespace_str = raw_str(&raw_fields, "namespace").unwrap_or("default");
257 let namespace = Namespace::new(namespace_str.to_owned());
258 let node_count = raw_u32(&raw_fields, "node_count").unwrap_or(0);
259 let edge_count = raw_u32(&raw_fields, "edge_count").unwrap_or(0);
260 let last_mutation_at_ms =
261 raw_u64(&raw_fields, "last_mutation_at_ms").map(|n| n as i64).unwrap_or(created_at_ms);
262
263 let cancelled_at = terminal_at_ms.map(TimestampMs);
264 let cancel_reason = raw_str(&raw_fields, "cancel_reason").map(str::to_owned);
265 let cancellation_policy = raw_str(&raw_fields, "cancellation_policy").map(str::to_owned);
266
267 let tags = extract_tags(&raw_fields);
268
269 let graph_revision = u64::try_from(graph_revision_i).unwrap_or(0);
270
271 Ok(FlowSnapshot::new(
272 flow_id,
273 flow_kind,
274 namespace,
275 public_flow_state,
276 graph_revision,
277 node_count,
278 edge_count,
279 TimestampMs(created_at_ms),
280 TimestampMs(last_mutation_at_ms),
281 cancelled_at,
282 cancel_reason,
283 cancellation_policy,
284 tags,
285 edge_groups,
286 ))
287}
288
289fn decode_edge_row(row: &PgRow, flow_id: &FlowId) -> Result<EdgeSnapshot, EngineError> {
295 let edge_uuid: Uuid = row.get("edge_id");
296 let upstream_uuid: Uuid = row.get("upstream_eid");
297 let downstream_uuid: Uuid = row.get("downstream_eid");
298 let part: i16 = row.get("partition_key");
299 let policy_raw: JsonValue = row.get("policy");
300
301 let upstream = ExecutionId::parse(&format!("{{fp:{part}}}:{upstream_uuid}"))
302 .map_err(|e| EngineError::Validation {
303 kind: ValidationKind::Corruption,
304 detail: format!("ff_edge.upstream_eid: {e}"),
305 })?;
306 let downstream = ExecutionId::parse(&format!("{{fp:{part}}}:{downstream_uuid}"))
307 .map_err(|e| EngineError::Validation {
308 kind: ValidationKind::Corruption,
309 detail: format!("ff_edge.downstream_eid: {e}"),
310 })?;
311
312 let dependency_kind = raw_str(&policy_raw, "dependency_kind")
313 .unwrap_or("success_only")
314 .to_owned();
315 let satisfaction_condition = raw_str(&policy_raw, "satisfaction_condition")
316 .unwrap_or("all_required")
317 .to_owned();
318 let data_passing_ref = raw_str(&policy_raw, "data_passing_ref")
319 .filter(|s| !s.is_empty())
320 .map(str::to_owned);
321 let edge_state = raw_str(&policy_raw, "edge_state")
322 .unwrap_or("pending")
323 .to_owned();
324 let created_at_ms =
325 raw_u64(&policy_raw, "created_at_ms").map(|n| n as i64).unwrap_or(0);
326 let created_by = raw_str(&policy_raw, "created_by")
327 .unwrap_or("engine")
328 .to_owned();
329
330 Ok(EdgeSnapshot::new(
331 EdgeId::from_uuid(edge_uuid),
332 flow_id.clone(),
333 upstream,
334 downstream,
335 dependency_kind,
336 satisfaction_condition,
337 data_passing_ref,
338 edge_state,
339 TimestampMs(created_at_ms),
340 created_by,
341 ))
342}
343
344pub async fn describe_flow(
346 pool: &PgPool,
347 partition_config: &ff_core::partition::PartitionConfig,
348 id: &FlowId,
349) -> Result<Option<FlowSnapshot>, EngineError> {
350 let part = flow_partition_byte(id, partition_config);
351 let flow_uuid: Uuid = id.0;
352
353 let flow_row_opt = sqlx::query(
354 "SELECT partition_key, flow_id, graph_revision, public_flow_state, \
355 created_at_ms, terminal_at_ms, raw_fields \
356 FROM ff_flow_core \
357 WHERE partition_key = $1 AND flow_id = $2",
358 )
359 .bind(part)
360 .bind(flow_uuid)
361 .fetch_optional(pool)
362 .await
363 .map_err(map_sqlx_error)?;
364
365 let Some(flow_row) = flow_row_opt else {
366 return Ok(None);
367 };
368
369 let eg_rows = sqlx::query(
370 "SELECT partition_key, flow_id, downstream_eid, policy, \
371 k_target, success_count, fail_count, skip_count, running_count \
372 FROM ff_edge_group \
373 WHERE partition_key = $1 AND flow_id = $2 \
374 ORDER BY downstream_eid",
375 )
376 .bind(part)
377 .bind(flow_uuid)
378 .fetch_all(pool)
379 .await
380 .map_err(map_sqlx_error)?;
381
382 let mut edge_groups = Vec::with_capacity(eg_rows.len());
383 for row in &eg_rows {
384 edge_groups.push(decode_edge_group_row(row)?);
385 }
386
387 decode_flow_row(id.clone(), &flow_row, edge_groups).map(Some)
388}
389
390pub async fn list_flows(
392 pool: &PgPool,
393 partition: PartitionKey,
394 cursor: Option<FlowId>,
395 limit: usize,
396) -> Result<ListFlowsPage, EngineError> {
397 if limit == 0 {
398 return Ok(ListFlowsPage::new(Vec::new(), None));
399 }
400 let part = partition_index_from_key(&partition)?;
401 let cursor_uuid: Option<Uuid> = cursor.as_ref().map(|f| f.0);
402 let fetch_limit = (limit + 1) as i64;
404
405 let rows = sqlx::query(
406 "SELECT flow_id, created_at_ms, public_flow_state \
407 FROM ff_flow_core \
408 WHERE partition_key = $1 \
409 AND ($2::uuid IS NULL OR flow_id > $2) \
410 ORDER BY flow_id \
411 LIMIT $3",
412 )
413 .bind(part)
414 .bind(cursor_uuid)
415 .bind(fetch_limit)
416 .fetch_all(pool)
417 .await
418 .map_err(map_sqlx_error)?;
419
420 let mut flows: Vec<FlowSummary> = Vec::with_capacity(rows.len().min(limit));
421 let mut next_cursor: Option<FlowId> = None;
422 for (idx, row) in rows.iter().enumerate() {
423 if idx >= limit {
424 if let Some(last) = flows.last() {
428 next_cursor = Some(last.flow_id.clone());
429 }
430 break;
431 }
432 let flow_uuid: Uuid = row.get("flow_id");
433 let created_at_ms: i64 = row.get("created_at_ms");
434 let public_state: String = row.get("public_flow_state");
435 let status = FlowStatus::from_public_flow_state(&public_state);
436 flows.push(FlowSummary::new(
437 FlowId::from_uuid(flow_uuid),
438 TimestampMs(created_at_ms),
439 status,
440 ));
441 }
442
443 Ok(ListFlowsPage::new(flows, next_cursor))
444}
445
446pub async fn list_edges(
448 pool: &PgPool,
449 partition_config: &ff_core::partition::PartitionConfig,
450 flow_id: &FlowId,
451 direction: EdgeDirection,
452) -> Result<Vec<EdgeSnapshot>, EngineError> {
453 let part = flow_partition_byte(flow_id, partition_config);
454 let flow_uuid: Uuid = flow_id.0;
455 let (column_filter, subject_eid) = match &direction {
456 EdgeDirection::Outgoing { from_node } => ("upstream_eid", from_node),
457 EdgeDirection::Incoming { to_node } => ("downstream_eid", to_node),
458 };
459 let subject_uuid = parse_exec_uuid(subject_eid)?;
461
462 let sql = format!(
463 "SELECT partition_key, flow_id, edge_id, upstream_eid, downstream_eid, policy \
464 FROM ff_edge \
465 WHERE partition_key = $1 AND flow_id = $2 AND {column_filter} = $3 \
466 ORDER BY edge_id"
467 );
468 let rows = sqlx::query(&sql)
469 .bind(part)
470 .bind(flow_uuid)
471 .bind(subject_uuid)
472 .fetch_all(pool)
473 .await
474 .map_err(map_sqlx_error)?;
475
476 let mut out = Vec::with_capacity(rows.len());
477 for row in &rows {
478 out.push(decode_edge_row(row, flow_id)?);
479 }
480 Ok(out)
481}
482
483pub async fn describe_edge(
485 pool: &PgPool,
486 partition_config: &ff_core::partition::PartitionConfig,
487 flow_id: &FlowId,
488 edge_id: &EdgeId,
489) -> Result<Option<EdgeSnapshot>, EngineError> {
490 let part = flow_partition_byte(flow_id, partition_config);
491 let row_opt = sqlx::query(
492 "SELECT partition_key, flow_id, edge_id, upstream_eid, downstream_eid, policy \
493 FROM ff_edge \
494 WHERE partition_key = $1 AND flow_id = $2 AND edge_id = $3",
495 )
496 .bind(part)
497 .bind(flow_id.0)
498 .bind(edge_id.0)
499 .fetch_optional(pool)
500 .await
501 .map_err(map_sqlx_error)?;
502
503 match row_opt {
504 Some(row) => decode_edge_row(&row, flow_id).map(Some),
505 None => Ok(None),
506 }
507}
508
509fn parse_exec_uuid(eid: &ExecutionId) -> Result<Uuid, EngineError> {
511 let s = eid.as_str();
512 let Some(colon) = s.rfind("}:") else {
513 return Err(EngineError::Validation {
514 kind: ValidationKind::InvalidInput,
515 detail: format!("execution_id missing '}}:' delimiter: {s}"),
516 });
517 };
518 let tail = &s[colon + 2..];
519 Uuid::parse_str(tail).map_err(|e| EngineError::Validation {
520 kind: ValidationKind::InvalidInput,
521 detail: format!("execution_id uuid suffix parse: {e}"),
522 })
523}
524
525fn cancel_policy_to_str(p: CancelFlowPolicy) -> &'static str {
526 match p {
527 CancelFlowPolicy::FlowOnly => "cancel_flow_only",
528 CancelFlowPolicy::CancelAll => "cancel_all",
529 CancelFlowPolicy::CancelPending => "cancel_pending",
530 _ => "cancel_all",
533 }
534}
535
536pub async fn cancel_flow(
556 pool: &PgPool,
557 partition_config: &ff_core::partition::PartitionConfig,
558 id: &FlowId,
559 policy: CancelFlowPolicy,
560) -> Result<CancelFlowResult, EngineError> {
561 let part = flow_partition_byte(id, partition_config);
562 let flow_uuid: Uuid = id.0;
563 let policy_str = cancel_policy_to_str(policy);
564
565 let mut last_transport: Option<EngineError> = None;
566 for attempt in 0..CANCEL_FLOW_MAX_ATTEMPTS {
567 match cancel_flow_once(pool, part, flow_uuid, policy, policy_str).await {
568 Ok(result) => return Ok(result),
569 Err(err) => {
570 if is_serialization_conflict(&err) {
571 if attempt + 1 < CANCEL_FLOW_MAX_ATTEMPTS {
573 let ms = 5u64 * (1u64 << attempt).saturating_sub(0);
574 tokio::time::sleep(Duration::from_millis(ms)).await;
575 }
576 last_transport = Some(err);
577 continue;
578 }
579 return Err(err);
580 }
581 }
582 }
583 let _ = last_transport; Err(EngineError::Contention(ContentionKind::RetryExhausted))
586}
587
588fn is_serialization_conflict(err: &EngineError) -> bool {
594 matches!(
595 err,
596 EngineError::Contention(ContentionKind::LeaseConflict)
597 )
598}
599
600async fn cancel_flow_once(
601 pool: &PgPool,
602 part: i16,
603 flow_uuid: Uuid,
604 policy: CancelFlowPolicy,
605 policy_str: &str,
606) -> Result<CancelFlowResult, EngineError> {
607 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
608
609 sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
610 .execute(&mut *tx)
611 .await
612 .map_err(map_sqlx_error)?;
613
614 let flow_found = sqlx::query(
619 "UPDATE ff_flow_core \
620 SET public_flow_state = 'cancelled', \
621 terminal_at_ms = COALESCE(terminal_at_ms, \
622 (extract(epoch from clock_timestamp())*1000)::bigint), \
623 raw_fields = raw_fields \
624 || jsonb_build_object('cancellation_policy', $3::text) \
625 WHERE partition_key = $1 AND flow_id = $2 \
626 RETURNING flow_id",
627 )
628 .bind(part)
629 .bind(flow_uuid)
630 .bind(policy_str)
631 .fetch_optional(&mut *tx)
632 .await
633 .map_err(map_sqlx_error)?;
634
635 if flow_found.is_none() {
636 tx.commit().await.map_err(map_sqlx_error)?;
637 return Ok(CancelFlowResult::Cancelled {
638 cancellation_policy: policy_str.to_owned(),
639 member_execution_ids: Vec::new(),
640 });
641 }
642
643 let member_rows = if matches!(policy, CancelFlowPolicy::FlowOnly) {
645 Vec::new()
646 } else {
647 let state_filter: &str = match policy {
648 CancelFlowPolicy::CancelAll => {
649 "lifecycle_phase NOT IN ('completed','failed','cancelled','expired')"
650 }
651 CancelFlowPolicy::CancelPending => {
652 "lifecycle_phase IN ('pending','blocked','eligible')"
653 }
654 _ => "lifecycle_phase NOT IN ('completed','failed','cancelled','expired')",
655 };
656 let sql = format!(
657 "SELECT execution_id FROM ff_exec_core \
658 WHERE partition_key = $1 AND flow_id = $2 AND {state_filter}"
659 );
660 sqlx::query(&sql)
661 .bind(part)
662 .bind(flow_uuid)
663 .fetch_all(&mut *tx)
664 .await
665 .map_err(map_sqlx_error)?
666 };
667
668 let mut member_execution_ids: Vec<String> = Vec::with_capacity(member_rows.len());
669 for row in &member_rows {
670 let exec_uuid: Uuid = row.get("execution_id");
671 sqlx::query(
673 "UPDATE ff_exec_core \
674 SET lifecycle_phase = 'cancelled', \
675 eligibility_state = 'cancelled', \
676 public_state = 'cancelled', \
677 terminal_at_ms = COALESCE(terminal_at_ms, \
678 (extract(epoch from clock_timestamp())*1000)::bigint), \
679 cancellation_reason = COALESCE(cancellation_reason, 'flow_cancelled'), \
680 cancelled_by = COALESCE(cancelled_by, 'cancel_flow') \
681 WHERE partition_key = $1 AND execution_id = $2",
682 )
683 .bind(part)
684 .bind(exec_uuid)
685 .execute(&mut *tx)
686 .await
687 .map_err(map_sqlx_error)?;
688
689 sqlx::query(
696 "UPDATE ff_attempt \
697 SET outcome = NULL \
698 WHERE partition_key = $1 \
699 AND execution_id = $2 \
700 AND attempt_index = (SELECT attempt_index FROM ff_exec_core \
701 WHERE partition_key = $1 AND execution_id = $2)",
702 )
703 .bind(part)
704 .bind(exec_uuid)
705 .execute(&mut *tx)
706 .await
707 .map_err(map_sqlx_error)?;
708
709 sqlx::query(
711 "INSERT INTO ff_completion_event \
712 (partition_key, execution_id, flow_id, outcome, occurred_at_ms) \
713 VALUES ($1, $2, $3, 'cancelled', \
714 (extract(epoch from clock_timestamp())*1000)::bigint)",
715 )
716 .bind(part)
717 .bind(exec_uuid)
718 .bind(flow_uuid)
719 .execute(&mut *tx)
720 .await
721 .map_err(map_sqlx_error)?;
722
723 sqlx::query(
727 "INSERT INTO ff_lease_event \
728 (execution_id, lease_id, event_type, occurred_at_ms, partition_key) \
729 VALUES ($1, NULL, 'revoked', \
730 (extract(epoch from clock_timestamp())*1000)::bigint, $2)",
731 )
732 .bind(exec_uuid.to_string())
733 .bind(i32::from(part))
734 .execute(&mut *tx)
735 .await
736 .map_err(map_sqlx_error)?;
737
738 member_execution_ids.push(format!("{{fp:{part}}}:{exec_uuid}"));
739 }
740
741 if matches!(policy, CancelFlowPolicy::CancelPending) {
745 sqlx::query(
746 "INSERT INTO ff_pending_cancel_groups \
747 (partition_key, flow_id, downstream_eid, enqueued_at_ms) \
748 SELECT partition_key, flow_id, downstream_eid, \
749 (extract(epoch from clock_timestamp())*1000)::bigint \
750 FROM ff_edge_group \
751 WHERE partition_key = $1 AND flow_id = $2 AND running_count > 0 \
752 ON CONFLICT DO NOTHING",
753 )
754 .bind(part)
755 .bind(flow_uuid)
756 .execute(&mut *tx)
757 .await
758 .map_err(map_sqlx_error)?;
759 }
760
761 tx.commit().await.map_err(map_sqlx_error)?;
762
763 Ok(CancelFlowResult::Cancelled {
764 cancellation_policy: policy_str.to_owned(),
765 member_execution_ids,
766 })
767}
768
769pub async fn set_edge_group_policy(
783 pool: &PgPool,
784 partition_config: &ff_core::partition::PartitionConfig,
785 flow_id: &FlowId,
786 downstream_execution_id: &ExecutionId,
787 policy: EdgeDependencyPolicy,
788) -> Result<SetEdgeGroupPolicyResult, EngineError> {
789 match &policy {
791 EdgeDependencyPolicy::AllOf => {}
792 EdgeDependencyPolicy::AnyOf { .. } => {}
793 EdgeDependencyPolicy::Quorum { k, .. } => {
794 if *k == 0 {
795 return Err(EngineError::Validation {
796 kind: ValidationKind::InvalidInput,
797 detail: "quorum k must be >= 1".to_string(),
798 });
799 }
800 }
801 _ => {
802 return Err(EngineError::Validation {
803 kind: ValidationKind::InvalidInput,
804 detail: "unknown EdgeDependencyPolicy variant".to_string(),
805 });
806 }
807 }
808
809 let part = flow_partition_byte(flow_id, partition_config);
810 let flow_uuid: Uuid = flow_id.0;
811 let downstream_uuid = parse_exec_uuid(downstream_execution_id)?;
812
813 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
814
815 let already_staged: i64 = sqlx::query_scalar(
817 "SELECT COUNT(*) FROM ff_edge \
818 WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3",
819 )
820 .bind(part)
821 .bind(flow_uuid)
822 .bind(downstream_uuid)
823 .fetch_one(&mut *tx)
824 .await
825 .map_err(map_sqlx_error)?;
826
827 if already_staged > 0 {
828 let _ = tx.rollback().await;
829 return Err(EngineError::Validation {
830 kind: ValidationKind::InvalidInput,
831 detail: "edge_group_policy_already_fixed: dependencies already staged".to_string(),
832 });
833 }
834
835 let existing: Option<JsonValue> = sqlx::query_scalar(
838 "SELECT policy FROM ff_edge_group \
839 WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3",
840 )
841 .bind(part)
842 .bind(flow_uuid)
843 .bind(downstream_uuid)
844 .fetch_optional(&mut *tx)
845 .await
846 .map_err(map_sqlx_error)?;
847
848 let encoded = encode_edge_policy(&policy);
849 if let Some(existing_policy) = existing
850 && existing_policy == encoded
851 {
852 tx.commit().await.map_err(map_sqlx_error)?;
853 return Ok(SetEdgeGroupPolicyResult::AlreadySet);
854 }
855
856 sqlx::query(
857 "INSERT INTO ff_edge_group \
858 (partition_key, flow_id, downstream_eid, policy) \
859 VALUES ($1, $2, $3, $4) \
860 ON CONFLICT (partition_key, flow_id, downstream_eid) \
861 DO UPDATE SET policy = EXCLUDED.policy",
862 )
863 .bind(part)
864 .bind(flow_uuid)
865 .bind(downstream_uuid)
866 .bind(&encoded)
867 .execute(&mut *tx)
868 .await
869 .map_err(map_sqlx_error)?;
870
871 tx.commit().await.map_err(map_sqlx_error)?;
872 Ok(SetEdgeGroupPolicyResult::Set)
873}
874
875#[allow(dead_code)]
878fn _unused_imports_anchor(_p: Partition, _f: PartitionFamily) {}
879
880pub(super) async fn set_flow_tag_impl(
893 pool: &sqlx::PgPool,
894 partition_config: &ff_core::partition::PartitionConfig,
895 flow_id: &FlowId,
896 key: &str,
897 value: &str,
898) -> Result<(), EngineError> {
899 let part = flow_partition_byte(flow_id, partition_config);
900 let flow_uuid: Uuid = flow_id.0;
901
902 let result = sqlx::query(
903 r#"
904 UPDATE ff_flow_core
905 SET raw_fields = jsonb_set(
906 COALESCE(raw_fields, '{}'::jsonb),
907 ARRAY[$3::text],
908 to_jsonb($4::text),
909 true
910 )
911 WHERE partition_key = $1 AND flow_id = $2
912 "#,
913 )
914 .bind(part)
915 .bind(flow_uuid)
916 .bind(key)
917 .bind(value)
918 .execute(pool)
919 .await
920 .map_err(map_sqlx_error)?;
921
922 if result.rows_affected() == 0 {
923 return Err(EngineError::NotFound { entity: "flow" });
924 }
925 Ok(())
926}
927
928pub(super) async fn get_flow_tag_impl(
932 pool: &sqlx::PgPool,
933 partition_config: &ff_core::partition::PartitionConfig,
934 flow_id: &FlowId,
935 key: &str,
936) -> Result<Option<String>, EngineError> {
937 let part = flow_partition_byte(flow_id, partition_config);
938 let flow_uuid: Uuid = flow_id.0;
939
940 let row: Option<(Option<String>,)> = sqlx::query_as(
941 r#"
942 SELECT raw_fields->>$3
943 FROM ff_flow_core
944 WHERE partition_key = $1 AND flow_id = $2
945 "#,
946 )
947 .bind(part)
948 .bind(flow_uuid)
949 .bind(key)
950 .fetch_optional(pool)
951 .await
952 .map_err(map_sqlx_error)?;
953
954 Ok(row.and_then(|(tag,)| tag))
955}
956
957pub(super) async fn project_flow_summary_impl(
970 pool: &PgPool,
971 partition_key: i16,
972 flow_uuid: Uuid,
973 now_ms: i64,
974) -> Result<bool, EngineError> {
975 let row = sqlx::query_as::<_, (
982 i64, i64, i64, i64, i64, i64, i64, i64, i64, i64, i64, i64,
983 )>(
984 r#"
985 SELECT
986 COUNT(*) FILTER (WHERE public_state = 'completed'),
987 COUNT(*) FILTER (WHERE public_state = 'failed'),
988 COUNT(*) FILTER (WHERE public_state = 'cancelled'),
989 COUNT(*) FILTER (WHERE public_state = 'expired'),
990 COUNT(*) FILTER (WHERE public_state = 'skipped'),
991 COUNT(*) FILTER (WHERE public_state = 'active'),
992 COUNT(*) FILTER (WHERE public_state = 'suspended'),
993 COUNT(*) FILTER (WHERE public_state = 'waiting'),
994 COUNT(*) FILTER (WHERE public_state = 'delayed'),
995 COUNT(*) FILTER (WHERE public_state = 'rate_limited'),
996 COUNT(*) FILTER (WHERE public_state = 'waiting_children'),
997 COUNT(*)
998 FROM ff_exec_core
999 WHERE partition_key = $1 AND flow_id = $2
1000 "#,
1001 )
1002 .bind(partition_key)
1003 .bind(flow_uuid)
1004 .fetch_one(pool)
1005 .await
1006 .map_err(map_sqlx_error)?;
1007
1008 let (completed, failed, cancelled, expired, skipped, active, suspended,
1009 waiting, delayed, rate_limited, waiting_children, total_members) = row;
1010
1011 if total_members == 0 {
1012 return Ok(false);
1013 }
1014
1015 let sampled_i32 = i32::try_from(total_members).unwrap_or(i32::MAX);
1019 let terminal_count = completed + failed + cancelled + expired + skipped;
1020 let all_terminal = terminal_count == total_members;
1021
1022 let flow_state: &str = if all_terminal {
1023 if failed > 0 || cancelled > 0 || expired > 0 {
1024 "failed"
1025 } else {
1026 "completed"
1027 }
1028 } else if active > 0 {
1029 "running"
1030 } else if suspended > 0 || delayed > 0 || rate_limited > 0 || waiting_children > 0 {
1031 "blocked"
1032 } else {
1033 "open"
1034 };
1035
1036 sqlx::query(
1038 r#"
1039 INSERT INTO ff_flow_summary (
1040 partition_key, flow_id,
1041 total_members, sampled_members,
1042 members_completed, members_failed, members_cancelled,
1043 members_expired, members_skipped, members_active,
1044 members_suspended, members_waiting, members_delayed,
1045 members_rate_limited, members_waiting_children,
1046 public_flow_state, last_summary_update_at
1047 ) VALUES (
1048 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
1049 $11, $12, $13, $14, $15, $16, $17
1050 )
1051 ON CONFLICT (partition_key, flow_id) DO UPDATE SET
1052 total_members = EXCLUDED.total_members,
1053 sampled_members = EXCLUDED.sampled_members,
1054 members_completed = EXCLUDED.members_completed,
1055 members_failed = EXCLUDED.members_failed,
1056 members_cancelled = EXCLUDED.members_cancelled,
1057 members_expired = EXCLUDED.members_expired,
1058 members_skipped = EXCLUDED.members_skipped,
1059 members_active = EXCLUDED.members_active,
1060 members_suspended = EXCLUDED.members_suspended,
1061 members_waiting = EXCLUDED.members_waiting,
1062 members_delayed = EXCLUDED.members_delayed,
1063 members_rate_limited = EXCLUDED.members_rate_limited,
1064 members_waiting_children = EXCLUDED.members_waiting_children,
1065 public_flow_state = EXCLUDED.public_flow_state,
1066 last_summary_update_at = EXCLUDED.last_summary_update_at
1067 "#,
1068 )
1069 .bind(partition_key)
1070 .bind(flow_uuid)
1071 .bind(total_members)
1072 .bind(sampled_i32)
1073 .bind(i32::try_from(completed).unwrap_or(i32::MAX))
1074 .bind(i32::try_from(failed).unwrap_or(i32::MAX))
1075 .bind(i32::try_from(cancelled).unwrap_or(i32::MAX))
1076 .bind(i32::try_from(expired).unwrap_or(i32::MAX))
1077 .bind(i32::try_from(skipped).unwrap_or(i32::MAX))
1078 .bind(i32::try_from(active).unwrap_or(i32::MAX))
1079 .bind(i32::try_from(suspended).unwrap_or(i32::MAX))
1080 .bind(i32::try_from(waiting).unwrap_or(i32::MAX))
1081 .bind(i32::try_from(delayed).unwrap_or(i32::MAX))
1082 .bind(i32::try_from(rate_limited).unwrap_or(i32::MAX))
1083 .bind(i32::try_from(waiting_children).unwrap_or(i32::MAX))
1084 .bind(flow_state)
1085 .bind(now_ms)
1086 .execute(pool)
1087 .await
1088 .map_err(map_sqlx_error)?;
1089
1090 Ok(true)
1091}