1use std::collections::BTreeMap;
28use std::time::Duration;
29
30use ff_core::backend::{CancelFlowPolicy, CancelFlowWait};
31use ff_core::contracts::{
32 CancelFlowResult, EdgeDependencyPolicy, EdgeDirection, EdgeGroupSnapshot, EdgeGroupState,
33 EdgeSnapshot, FlowSnapshot, FlowStatus, FlowSummary, ListFlowsPage, OnSatisfied,
34 SetEdgeGroupPolicyResult,
35};
36use ff_core::engine_error::{
37 ContentionKind, EngineError, ValidationKind,
38};
39use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
40use ff_core::types::{EdgeId, ExecutionId, FlowId, Namespace, TimestampMs};
41use serde_json::Value as JsonValue;
42use sqlx::postgres::PgRow;
43use sqlx::{PgPool, Row};
44use uuid::Uuid;
45
46use crate::error::map_sqlx_error;
47
48const CANCEL_FLOW_MAX_ATTEMPTS: u32 = 3;
53
54fn partition_index_from_key(key: &PartitionKey) -> Result<i16, EngineError> {
57 let p = key.parse().map_err(|e| EngineError::Validation {
58 kind: ValidationKind::InvalidInput,
59 detail: format!("partition_key: {e}"),
60 })?;
61 Ok(p.index as i16)
62}
63
64fn flow_partition_byte(
68 flow_id: &FlowId,
69 partition_config: &ff_core::partition::PartitionConfig,
70) -> i16 {
71 ff_core::partition::flow_partition(flow_id, partition_config).index as i16
72}
73
74fn raw_str<'a>(raw: &'a JsonValue, key: &str) -> Option<&'a str> {
77 raw.get(key).and_then(|v| v.as_str())
78}
79
80fn raw_u64(raw: &JsonValue, key: &str) -> Option<u64> {
84 raw.get(key).and_then(|v| {
85 v.as_u64()
86 .or_else(|| v.as_str().and_then(|s| s.parse::<u64>().ok()))
87 })
88}
89
90fn raw_u32(raw: &JsonValue, key: &str) -> Option<u32> {
91 raw.get(key).and_then(|v| {
92 v.as_u64()
93 .or_else(|| v.as_str().and_then(|s| s.parse::<u64>().ok()))
94 .and_then(|n| u32::try_from(n).ok())
95 })
96}
97
98fn extract_tags(raw: &JsonValue) -> BTreeMap<String, String> {
101 let mut tags = BTreeMap::new();
102 let Some(obj) = raw.as_object() else {
103 return tags;
104 };
105 for (k, v) in obj {
106 if !is_namespaced_tag_key(k) {
107 continue;
108 }
109 if let Some(s) = v.as_str() {
110 tags.insert(k.clone(), s.to_owned());
111 }
112 }
113 tags
114}
115
116fn is_namespaced_tag_key(k: &str) -> bool {
117 let mut chars = k.chars();
118 let Some(first) = chars.next() else {
119 return false;
120 };
121 if !first.is_ascii_lowercase() {
122 return false;
123 }
124 let mut saw_dot = false;
125 for c in chars {
126 if c == '.' {
127 saw_dot = true;
128 break;
129 }
130 if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
131 return false;
132 }
133 }
134 saw_dot
135}
136
137fn parse_on_satisfied(s: &str) -> OnSatisfied {
138 match s {
139 "let_run" => OnSatisfied::LetRun,
140 _ => OnSatisfied::CancelRemaining,
141 }
142}
143
144fn decode_edge_policy(v: &JsonValue) -> EdgeDependencyPolicy {
148 let kind = v.get("kind").and_then(|k| k.as_str()).unwrap_or("all_of");
149 match kind {
150 "any_of" => {
151 let on = v
152 .get("on_satisfied")
153 .and_then(|x| x.as_str())
154 .map(parse_on_satisfied)
155 .unwrap_or(OnSatisfied::CancelRemaining);
156 EdgeDependencyPolicy::AnyOf { on_satisfied: on }
157 }
158 "quorum" => {
159 let k = v
160 .get("k")
161 .and_then(|x| x.as_u64())
162 .and_then(|n| u32::try_from(n).ok())
163 .unwrap_or(1);
164 let on = v
165 .get("on_satisfied")
166 .and_then(|x| x.as_str())
167 .map(parse_on_satisfied)
168 .unwrap_or(OnSatisfied::CancelRemaining);
169 EdgeDependencyPolicy::Quorum { k, on_satisfied: on }
170 }
171 _ => EdgeDependencyPolicy::AllOf,
172 }
173}
174
175fn encode_edge_policy(p: &EdgeDependencyPolicy) -> JsonValue {
176 match p {
177 EdgeDependencyPolicy::AllOf => serde_json::json!({ "kind": "all_of" }),
178 EdgeDependencyPolicy::AnyOf { on_satisfied } => serde_json::json!({
179 "kind": "any_of",
180 "on_satisfied": on_satisfied.variant_str(),
181 }),
182 EdgeDependencyPolicy::Quorum { k, on_satisfied } => serde_json::json!({
183 "kind": "quorum",
184 "k": k,
185 "on_satisfied": on_satisfied.variant_str(),
186 }),
187 _ => serde_json::json!({ "kind": "all_of" }),
191 }
192}
193
194fn decode_edge_group_row(row: &PgRow) -> Result<EdgeGroupSnapshot, EngineError> {
196 let downstream_uuid: Uuid = row.get("downstream_eid");
197 let policy_raw: JsonValue = row.get("policy");
198 let k_target: i32 = row.get("k_target");
199 let success_count: i32 = row.get("success_count");
200 let fail_count: i32 = row.get("fail_count");
201 let skip_count: i32 = row.get("skip_count");
202 let running_count: i32 = row.get("running_count");
203
204 let part: i16 = row.get("partition_key");
207 let downstream_id = ExecutionId::parse(&format!("{{fp:{part}}}:{downstream_uuid}"))
208 .map_err(|e| EngineError::Validation {
209 kind: ValidationKind::Corruption,
210 detail: format!("ff_edge_group.downstream_eid: {e}"),
211 })?;
212
213 let policy = decode_edge_policy(&policy_raw);
214 let total: u32 = success_count.max(0) as u32
217 + fail_count.max(0) as u32
218 + skip_count.max(0) as u32
219 + running_count.max(0) as u32;
220
221 let state = if k_target > 0 && success_count >= k_target {
223 EdgeGroupState::Satisfied
224 } else {
225 EdgeGroupState::Pending
226 };
227
228 Ok(EdgeGroupSnapshot::new(
229 downstream_id,
230 policy,
231 total,
232 success_count.max(0) as u32,
233 fail_count.max(0) as u32,
234 skip_count.max(0) as u32,
235 running_count.max(0) as u32,
236 state,
237 ))
238}
239
240fn decode_flow_row(
243 flow_id: FlowId,
244 row: &PgRow,
245 edge_groups: Vec<EdgeGroupSnapshot>,
246) -> Result<FlowSnapshot, EngineError> {
247 let public_flow_state: String = row.get("public_flow_state");
248 let graph_revision_i: i64 = row.get("graph_revision");
249 let created_at_ms: i64 = row.get("created_at_ms");
250 let terminal_at_ms: Option<i64> = row.get("terminal_at_ms");
251 let raw_fields: JsonValue = row.get("raw_fields");
252
253 let flow_kind = raw_str(&raw_fields, "flow_kind")
254 .unwrap_or("")
255 .to_owned();
256 let namespace_str = raw_str(&raw_fields, "namespace").unwrap_or("default");
257 let namespace = Namespace::new(namespace_str.to_owned());
258 let node_count = raw_u32(&raw_fields, "node_count").unwrap_or(0);
259 let edge_count = raw_u32(&raw_fields, "edge_count").unwrap_or(0);
260 let last_mutation_at_ms =
261 raw_u64(&raw_fields, "last_mutation_at_ms").map(|n| n as i64).unwrap_or(created_at_ms);
262
263 let cancelled_at = terminal_at_ms.map(TimestampMs);
264 let cancel_reason = raw_str(&raw_fields, "cancel_reason").map(str::to_owned);
265 let cancellation_policy = raw_str(&raw_fields, "cancellation_policy").map(str::to_owned);
266
267 let tags = extract_tags(&raw_fields);
268
269 let graph_revision = u64::try_from(graph_revision_i).unwrap_or(0);
270
271 Ok(FlowSnapshot::new(
272 flow_id,
273 flow_kind,
274 namespace,
275 public_flow_state,
276 graph_revision,
277 node_count,
278 edge_count,
279 TimestampMs(created_at_ms),
280 TimestampMs(last_mutation_at_ms),
281 cancelled_at,
282 cancel_reason,
283 cancellation_policy,
284 tags,
285 edge_groups,
286 ))
287}
288
289fn decode_edge_row(row: &PgRow, flow_id: &FlowId) -> Result<EdgeSnapshot, EngineError> {
295 let edge_uuid: Uuid = row.get("edge_id");
296 let upstream_uuid: Uuid = row.get("upstream_eid");
297 let downstream_uuid: Uuid = row.get("downstream_eid");
298 let part: i16 = row.get("partition_key");
299 let policy_raw: JsonValue = row.get("policy");
300
301 let upstream = ExecutionId::parse(&format!("{{fp:{part}}}:{upstream_uuid}"))
302 .map_err(|e| EngineError::Validation {
303 kind: ValidationKind::Corruption,
304 detail: format!("ff_edge.upstream_eid: {e}"),
305 })?;
306 let downstream = ExecutionId::parse(&format!("{{fp:{part}}}:{downstream_uuid}"))
307 .map_err(|e| EngineError::Validation {
308 kind: ValidationKind::Corruption,
309 detail: format!("ff_edge.downstream_eid: {e}"),
310 })?;
311
312 let dependency_kind = raw_str(&policy_raw, "dependency_kind")
313 .unwrap_or("success_only")
314 .to_owned();
315 let satisfaction_condition = raw_str(&policy_raw, "satisfaction_condition")
316 .unwrap_or("all_required")
317 .to_owned();
318 let data_passing_ref = raw_str(&policy_raw, "data_passing_ref")
319 .filter(|s| !s.is_empty())
320 .map(str::to_owned);
321 let edge_state = raw_str(&policy_raw, "edge_state")
322 .unwrap_or("pending")
323 .to_owned();
324 let created_at_ms =
325 raw_u64(&policy_raw, "created_at_ms").map(|n| n as i64).unwrap_or(0);
326 let created_by = raw_str(&policy_raw, "created_by")
327 .unwrap_or("engine")
328 .to_owned();
329
330 Ok(EdgeSnapshot::new(
331 EdgeId::from_uuid(edge_uuid),
332 flow_id.clone(),
333 upstream,
334 downstream,
335 dependency_kind,
336 satisfaction_condition,
337 data_passing_ref,
338 edge_state,
339 TimestampMs(created_at_ms),
340 created_by,
341 ))
342}
343
344pub async fn describe_flow(
346 pool: &PgPool,
347 partition_config: &ff_core::partition::PartitionConfig,
348 id: &FlowId,
349) -> Result<Option<FlowSnapshot>, EngineError> {
350 let part = flow_partition_byte(id, partition_config);
351 let flow_uuid: Uuid = id.0;
352
353 let flow_row_opt = sqlx::query(
354 "SELECT partition_key, flow_id, graph_revision, public_flow_state, \
355 created_at_ms, terminal_at_ms, raw_fields \
356 FROM ff_flow_core \
357 WHERE partition_key = $1 AND flow_id = $2",
358 )
359 .bind(part)
360 .bind(flow_uuid)
361 .fetch_optional(pool)
362 .await
363 .map_err(map_sqlx_error)?;
364
365 let Some(flow_row) = flow_row_opt else {
366 return Ok(None);
367 };
368
369 let eg_rows = sqlx::query(
370 "SELECT partition_key, flow_id, downstream_eid, policy, \
371 k_target, success_count, fail_count, skip_count, running_count \
372 FROM ff_edge_group \
373 WHERE partition_key = $1 AND flow_id = $2 \
374 ORDER BY downstream_eid",
375 )
376 .bind(part)
377 .bind(flow_uuid)
378 .fetch_all(pool)
379 .await
380 .map_err(map_sqlx_error)?;
381
382 let mut edge_groups = Vec::with_capacity(eg_rows.len());
383 for row in &eg_rows {
384 edge_groups.push(decode_edge_group_row(row)?);
385 }
386
387 decode_flow_row(id.clone(), &flow_row, edge_groups).map(Some)
388}
389
390pub async fn list_flows(
392 pool: &PgPool,
393 partition: PartitionKey,
394 cursor: Option<FlowId>,
395 limit: usize,
396) -> Result<ListFlowsPage, EngineError> {
397 if limit == 0 {
398 return Ok(ListFlowsPage::new(Vec::new(), None));
399 }
400 let part = partition_index_from_key(&partition)?;
401 let cursor_uuid: Option<Uuid> = cursor.as_ref().map(|f| f.0);
402 let fetch_limit = (limit + 1) as i64;
404
405 let rows = sqlx::query(
406 "SELECT flow_id, created_at_ms, public_flow_state \
407 FROM ff_flow_core \
408 WHERE partition_key = $1 \
409 AND ($2::uuid IS NULL OR flow_id > $2) \
410 ORDER BY flow_id \
411 LIMIT $3",
412 )
413 .bind(part)
414 .bind(cursor_uuid)
415 .bind(fetch_limit)
416 .fetch_all(pool)
417 .await
418 .map_err(map_sqlx_error)?;
419
420 let mut flows: Vec<FlowSummary> = Vec::with_capacity(rows.len().min(limit));
421 let mut next_cursor: Option<FlowId> = None;
422 for (idx, row) in rows.iter().enumerate() {
423 if idx >= limit {
424 if let Some(last) = flows.last() {
428 next_cursor = Some(last.flow_id.clone());
429 }
430 break;
431 }
432 let flow_uuid: Uuid = row.get("flow_id");
433 let created_at_ms: i64 = row.get("created_at_ms");
434 let public_state: String = row.get("public_flow_state");
435 let status = FlowStatus::from_public_flow_state(&public_state);
436 flows.push(FlowSummary::new(
437 FlowId::from_uuid(flow_uuid),
438 TimestampMs(created_at_ms),
439 status,
440 ));
441 }
442
443 Ok(ListFlowsPage::new(flows, next_cursor))
444}
445
446pub async fn list_edges(
448 pool: &PgPool,
449 partition_config: &ff_core::partition::PartitionConfig,
450 flow_id: &FlowId,
451 direction: EdgeDirection,
452) -> Result<Vec<EdgeSnapshot>, EngineError> {
453 let part = flow_partition_byte(flow_id, partition_config);
454 let flow_uuid: Uuid = flow_id.0;
455 let (column_filter, subject_eid) = match &direction {
456 EdgeDirection::Outgoing { from_node } => ("upstream_eid", from_node),
457 EdgeDirection::Incoming { to_node } => ("downstream_eid", to_node),
458 };
459 let subject_uuid = parse_exec_uuid(subject_eid)?;
461
462 let sql = format!(
463 "SELECT partition_key, flow_id, edge_id, upstream_eid, downstream_eid, policy \
464 FROM ff_edge \
465 WHERE partition_key = $1 AND flow_id = $2 AND {column_filter} = $3 \
466 ORDER BY edge_id"
467 );
468 let rows = sqlx::query(&sql)
469 .bind(part)
470 .bind(flow_uuid)
471 .bind(subject_uuid)
472 .fetch_all(pool)
473 .await
474 .map_err(map_sqlx_error)?;
475
476 let mut out = Vec::with_capacity(rows.len());
477 for row in &rows {
478 out.push(decode_edge_row(row, flow_id)?);
479 }
480 Ok(out)
481}
482
483pub async fn describe_edge(
485 pool: &PgPool,
486 partition_config: &ff_core::partition::PartitionConfig,
487 flow_id: &FlowId,
488 edge_id: &EdgeId,
489) -> Result<Option<EdgeSnapshot>, EngineError> {
490 let part = flow_partition_byte(flow_id, partition_config);
491 let row_opt = sqlx::query(
492 "SELECT partition_key, flow_id, edge_id, upstream_eid, downstream_eid, policy \
493 FROM ff_edge \
494 WHERE partition_key = $1 AND flow_id = $2 AND edge_id = $3",
495 )
496 .bind(part)
497 .bind(flow_id.0)
498 .bind(edge_id.0)
499 .fetch_optional(pool)
500 .await
501 .map_err(map_sqlx_error)?;
502
503 match row_opt {
504 Some(row) => decode_edge_row(&row, flow_id).map(Some),
505 None => Ok(None),
506 }
507}
508
509fn parse_exec_uuid(eid: &ExecutionId) -> Result<Uuid, EngineError> {
511 let s = eid.as_str();
512 let Some(colon) = s.rfind("}:") else {
513 return Err(EngineError::Validation {
514 kind: ValidationKind::InvalidInput,
515 detail: format!("execution_id missing '}}:' delimiter: {s}"),
516 });
517 };
518 let tail = &s[colon + 2..];
519 Uuid::parse_str(tail).map_err(|e| EngineError::Validation {
520 kind: ValidationKind::InvalidInput,
521 detail: format!("execution_id uuid suffix parse: {e}"),
522 })
523}
524
525fn cancel_policy_to_str(p: CancelFlowPolicy) -> &'static str {
526 match p {
527 CancelFlowPolicy::FlowOnly => "cancel_flow_only",
528 CancelFlowPolicy::CancelAll => "cancel_all",
529 CancelFlowPolicy::CancelPending => "cancel_pending",
530 _ => "cancel_all",
533 }
534}
535
536pub async fn cancel_flow(
556 pool: &PgPool,
557 partition_config: &ff_core::partition::PartitionConfig,
558 id: &FlowId,
559 policy: CancelFlowPolicy,
560 _wait: CancelFlowWait,
561) -> Result<CancelFlowResult, EngineError> {
562 let part = flow_partition_byte(id, partition_config);
563 let flow_uuid: Uuid = id.0;
564 let policy_str = cancel_policy_to_str(policy);
565
566 let mut last_transport: Option<EngineError> = None;
567 for attempt in 0..CANCEL_FLOW_MAX_ATTEMPTS {
568 match cancel_flow_once(pool, part, flow_uuid, policy, policy_str).await {
569 Ok(result) => return Ok(result),
570 Err(err) => {
571 if is_serialization_conflict(&err) {
572 if attempt + 1 < CANCEL_FLOW_MAX_ATTEMPTS {
574 let ms = 5u64 * (1u64 << attempt).saturating_sub(0);
575 tokio::time::sleep(Duration::from_millis(ms)).await;
576 }
577 last_transport = Some(err);
578 continue;
579 }
580 return Err(err);
581 }
582 }
583 }
584 let _ = last_transport; Err(EngineError::Contention(ContentionKind::RetryExhausted))
587}
588
589fn is_serialization_conflict(err: &EngineError) -> bool {
595 matches!(
596 err,
597 EngineError::Contention(ContentionKind::LeaseConflict)
598 )
599}
600
601async fn cancel_flow_once(
602 pool: &PgPool,
603 part: i16,
604 flow_uuid: Uuid,
605 policy: CancelFlowPolicy,
606 policy_str: &str,
607) -> Result<CancelFlowResult, EngineError> {
608 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
609
610 sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
611 .execute(&mut *tx)
612 .await
613 .map_err(map_sqlx_error)?;
614
615 let flow_found = sqlx::query(
620 "UPDATE ff_flow_core \
621 SET public_flow_state = 'cancelled', \
622 terminal_at_ms = COALESCE(terminal_at_ms, \
623 (extract(epoch from clock_timestamp())*1000)::bigint), \
624 raw_fields = raw_fields \
625 || jsonb_build_object('cancellation_policy', $3::text) \
626 WHERE partition_key = $1 AND flow_id = $2 \
627 RETURNING flow_id",
628 )
629 .bind(part)
630 .bind(flow_uuid)
631 .bind(policy_str)
632 .fetch_optional(&mut *tx)
633 .await
634 .map_err(map_sqlx_error)?;
635
636 if flow_found.is_none() {
637 tx.commit().await.map_err(map_sqlx_error)?;
638 return Ok(CancelFlowResult::Cancelled {
639 cancellation_policy: policy_str.to_owned(),
640 member_execution_ids: Vec::new(),
641 });
642 }
643
644 let member_rows = if matches!(policy, CancelFlowPolicy::FlowOnly) {
646 Vec::new()
647 } else {
648 let state_filter: &str = match policy {
649 CancelFlowPolicy::CancelAll => {
650 "lifecycle_phase NOT IN ('completed','failed','cancelled','expired')"
651 }
652 CancelFlowPolicy::CancelPending => {
653 "lifecycle_phase IN ('pending','blocked','eligible')"
654 }
655 _ => "lifecycle_phase NOT IN ('completed','failed','cancelled','expired')",
656 };
657 let sql = format!(
658 "SELECT execution_id FROM ff_exec_core \
659 WHERE partition_key = $1 AND flow_id = $2 AND {state_filter}"
660 );
661 sqlx::query(&sql)
662 .bind(part)
663 .bind(flow_uuid)
664 .fetch_all(&mut *tx)
665 .await
666 .map_err(map_sqlx_error)?
667 };
668
669 let mut member_execution_ids: Vec<String> = Vec::with_capacity(member_rows.len());
670 for row in &member_rows {
671 let exec_uuid: Uuid = row.get("execution_id");
672 sqlx::query(
674 "UPDATE ff_exec_core \
675 SET lifecycle_phase = 'cancelled', \
676 eligibility_state = 'cancelled', \
677 public_state = 'cancelled', \
678 terminal_at_ms = COALESCE(terminal_at_ms, \
679 (extract(epoch from clock_timestamp())*1000)::bigint), \
680 cancellation_reason = COALESCE(cancellation_reason, 'flow_cancelled'), \
681 cancelled_by = COALESCE(cancelled_by, 'cancel_flow') \
682 WHERE partition_key = $1 AND execution_id = $2",
683 )
684 .bind(part)
685 .bind(exec_uuid)
686 .execute(&mut *tx)
687 .await
688 .map_err(map_sqlx_error)?;
689
690 sqlx::query(
692 "INSERT INTO ff_completion_event \
693 (partition_key, execution_id, flow_id, outcome, occurred_at_ms) \
694 VALUES ($1, $2, $3, 'cancelled', \
695 (extract(epoch from clock_timestamp())*1000)::bigint)",
696 )
697 .bind(part)
698 .bind(exec_uuid)
699 .bind(flow_uuid)
700 .execute(&mut *tx)
701 .await
702 .map_err(map_sqlx_error)?;
703
704 member_execution_ids.push(format!("{{fp:{part}}}:{exec_uuid}"));
705 }
706
707 if matches!(policy, CancelFlowPolicy::CancelPending) {
711 sqlx::query(
712 "INSERT INTO ff_pending_cancel_groups \
713 (partition_key, flow_id, downstream_eid, enqueued_at_ms) \
714 SELECT partition_key, flow_id, downstream_eid, \
715 (extract(epoch from clock_timestamp())*1000)::bigint \
716 FROM ff_edge_group \
717 WHERE partition_key = $1 AND flow_id = $2 AND running_count > 0 \
718 ON CONFLICT DO NOTHING",
719 )
720 .bind(part)
721 .bind(flow_uuid)
722 .execute(&mut *tx)
723 .await
724 .map_err(map_sqlx_error)?;
725 }
726
727 tx.commit().await.map_err(map_sqlx_error)?;
728
729 Ok(CancelFlowResult::Cancelled {
730 cancellation_policy: policy_str.to_owned(),
731 member_execution_ids,
732 })
733}
734
735pub async fn set_edge_group_policy(
749 pool: &PgPool,
750 partition_config: &ff_core::partition::PartitionConfig,
751 flow_id: &FlowId,
752 downstream_execution_id: &ExecutionId,
753 policy: EdgeDependencyPolicy,
754) -> Result<SetEdgeGroupPolicyResult, EngineError> {
755 match &policy {
757 EdgeDependencyPolicy::AllOf => {}
758 EdgeDependencyPolicy::AnyOf { .. } => {}
759 EdgeDependencyPolicy::Quorum { k, .. } => {
760 if *k == 0 {
761 return Err(EngineError::Validation {
762 kind: ValidationKind::InvalidInput,
763 detail: "quorum k must be >= 1".to_string(),
764 });
765 }
766 }
767 _ => {
768 return Err(EngineError::Validation {
769 kind: ValidationKind::InvalidInput,
770 detail: "unknown EdgeDependencyPolicy variant".to_string(),
771 });
772 }
773 }
774
775 let part = flow_partition_byte(flow_id, partition_config);
776 let flow_uuid: Uuid = flow_id.0;
777 let downstream_uuid = parse_exec_uuid(downstream_execution_id)?;
778
779 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
780
781 let already_staged: i64 = sqlx::query_scalar(
783 "SELECT COUNT(*) FROM ff_edge \
784 WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3",
785 )
786 .bind(part)
787 .bind(flow_uuid)
788 .bind(downstream_uuid)
789 .fetch_one(&mut *tx)
790 .await
791 .map_err(map_sqlx_error)?;
792
793 if already_staged > 0 {
794 let _ = tx.rollback().await;
795 return Err(EngineError::Validation {
796 kind: ValidationKind::InvalidInput,
797 detail: "edge_group_policy_already_fixed: dependencies already staged".to_string(),
798 });
799 }
800
801 let existing: Option<JsonValue> = sqlx::query_scalar(
804 "SELECT policy FROM ff_edge_group \
805 WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3",
806 )
807 .bind(part)
808 .bind(flow_uuid)
809 .bind(downstream_uuid)
810 .fetch_optional(&mut *tx)
811 .await
812 .map_err(map_sqlx_error)?;
813
814 let encoded = encode_edge_policy(&policy);
815 if let Some(existing_policy) = existing
816 && existing_policy == encoded
817 {
818 tx.commit().await.map_err(map_sqlx_error)?;
819 return Ok(SetEdgeGroupPolicyResult::AlreadySet);
820 }
821
822 sqlx::query(
823 "INSERT INTO ff_edge_group \
824 (partition_key, flow_id, downstream_eid, policy) \
825 VALUES ($1, $2, $3, $4) \
826 ON CONFLICT (partition_key, flow_id, downstream_eid) \
827 DO UPDATE SET policy = EXCLUDED.policy",
828 )
829 .bind(part)
830 .bind(flow_uuid)
831 .bind(downstream_uuid)
832 .bind(&encoded)
833 .execute(&mut *tx)
834 .await
835 .map_err(map_sqlx_error)?;
836
837 tx.commit().await.map_err(map_sqlx_error)?;
838 Ok(SetEdgeGroupPolicyResult::Set)
839}
840
841#[allow(dead_code)]
844fn _unused_imports_anchor(_p: Partition, _f: PartitionFamily) {}