1use std::collections::BTreeMap;
28use std::time::Duration;
29
30use ff_core::backend::CancelFlowPolicy;
31use ff_core::contracts::{
32 CancelFlowResult, EdgeDependencyPolicy, EdgeDirection, EdgeGroupSnapshot, EdgeGroupState,
33 EdgeSnapshot, FlowSnapshot, FlowStatus, FlowSummary, ListFlowsPage, OnSatisfied,
34 SetEdgeGroupPolicyResult,
35};
36use ff_core::engine_error::{
37 ContentionKind, EngineError, ValidationKind,
38};
39use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
40use ff_core::types::{EdgeId, ExecutionId, FlowId, Namespace, TimestampMs};
41use serde_json::Value as JsonValue;
42use sqlx::postgres::PgRow;
43use sqlx::{PgPool, Row};
44use uuid::Uuid;
45
46use crate::error::map_sqlx_error;
47
48const CANCEL_FLOW_MAX_ATTEMPTS: u32 = 3;
53
54fn partition_index_from_key(key: &PartitionKey) -> Result<i16, EngineError> {
57 let p = key.parse().map_err(|e| EngineError::Validation {
58 kind: ValidationKind::InvalidInput,
59 detail: format!("partition_key: {e}"),
60 })?;
61 Ok(p.index as i16)
62}
63
64fn flow_partition_byte(
68 flow_id: &FlowId,
69 partition_config: &ff_core::partition::PartitionConfig,
70) -> i16 {
71 ff_core::partition::flow_partition(flow_id, partition_config).index as i16
72}
73
74fn raw_str<'a>(raw: &'a JsonValue, key: &str) -> Option<&'a str> {
77 raw.get(key).and_then(|v| v.as_str())
78}
79
80fn raw_u64(raw: &JsonValue, key: &str) -> Option<u64> {
84 raw.get(key).and_then(|v| {
85 v.as_u64()
86 .or_else(|| v.as_str().and_then(|s| s.parse::<u64>().ok()))
87 })
88}
89
90fn raw_u32(raw: &JsonValue, key: &str) -> Option<u32> {
91 raw.get(key).and_then(|v| {
92 v.as_u64()
93 .or_else(|| v.as_str().and_then(|s| s.parse::<u64>().ok()))
94 .and_then(|n| u32::try_from(n).ok())
95 })
96}
97
98fn extract_tags(raw: &JsonValue) -> BTreeMap<String, String> {
101 let mut tags = BTreeMap::new();
102 let Some(obj) = raw.as_object() else {
103 return tags;
104 };
105 for (k, v) in obj {
106 if !is_namespaced_tag_key(k) {
107 continue;
108 }
109 if let Some(s) = v.as_str() {
110 tags.insert(k.clone(), s.to_owned());
111 }
112 }
113 tags
114}
115
116fn is_namespaced_tag_key(k: &str) -> bool {
117 let mut chars = k.chars();
118 let Some(first) = chars.next() else {
119 return false;
120 };
121 if !first.is_ascii_lowercase() {
122 return false;
123 }
124 let mut saw_dot = false;
125 for c in chars {
126 if c == '.' {
127 saw_dot = true;
128 break;
129 }
130 if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
131 return false;
132 }
133 }
134 saw_dot
135}
136
137fn parse_on_satisfied(s: &str) -> OnSatisfied {
138 match s {
139 "let_run" => OnSatisfied::LetRun,
140 _ => OnSatisfied::CancelRemaining,
141 }
142}
143
144fn decode_edge_policy(v: &JsonValue) -> EdgeDependencyPolicy {
148 let kind = v.get("kind").and_then(|k| k.as_str()).unwrap_or("all_of");
149 match kind {
150 "any_of" => {
151 let on = v
152 .get("on_satisfied")
153 .and_then(|x| x.as_str())
154 .map(parse_on_satisfied)
155 .unwrap_or(OnSatisfied::CancelRemaining);
156 EdgeDependencyPolicy::AnyOf { on_satisfied: on }
157 }
158 "quorum" => {
159 let k = v
160 .get("k")
161 .and_then(|x| x.as_u64())
162 .and_then(|n| u32::try_from(n).ok())
163 .unwrap_or(1);
164 let on = v
165 .get("on_satisfied")
166 .and_then(|x| x.as_str())
167 .map(parse_on_satisfied)
168 .unwrap_or(OnSatisfied::CancelRemaining);
169 EdgeDependencyPolicy::Quorum { k, on_satisfied: on }
170 }
171 _ => EdgeDependencyPolicy::AllOf,
172 }
173}
174
175fn encode_edge_policy(p: &EdgeDependencyPolicy) -> JsonValue {
176 match p {
177 EdgeDependencyPolicy::AllOf => serde_json::json!({ "kind": "all_of" }),
178 EdgeDependencyPolicy::AnyOf { on_satisfied } => serde_json::json!({
179 "kind": "any_of",
180 "on_satisfied": on_satisfied.variant_str(),
181 }),
182 EdgeDependencyPolicy::Quorum { k, on_satisfied } => serde_json::json!({
183 "kind": "quorum",
184 "k": k,
185 "on_satisfied": on_satisfied.variant_str(),
186 }),
187 _ => serde_json::json!({ "kind": "all_of" }),
191 }
192}
193
194fn decode_edge_group_row(row: &PgRow) -> Result<EdgeGroupSnapshot, EngineError> {
196 let downstream_uuid: Uuid = row.get("downstream_eid");
197 let policy_raw: JsonValue = row.get("policy");
198 let k_target: i32 = row.get("k_target");
199 let success_count: i32 = row.get("success_count");
200 let fail_count: i32 = row.get("fail_count");
201 let skip_count: i32 = row.get("skip_count");
202 let running_count: i32 = row.get("running_count");
203
204 let part: i16 = row.get("partition_key");
207 let downstream_id = ExecutionId::parse(&format!("{{fp:{part}}}:{downstream_uuid}"))
208 .map_err(|e| EngineError::Validation {
209 kind: ValidationKind::Corruption,
210 detail: format!("ff_edge_group.downstream_eid: {e}"),
211 })?;
212
213 let policy = decode_edge_policy(&policy_raw);
214 let total: u32 = success_count.max(0) as u32
217 + fail_count.max(0) as u32
218 + skip_count.max(0) as u32
219 + running_count.max(0) as u32;
220
221 let state = if k_target > 0 && success_count >= k_target {
223 EdgeGroupState::Satisfied
224 } else {
225 EdgeGroupState::Pending
226 };
227
228 Ok(EdgeGroupSnapshot::new(
229 downstream_id,
230 policy,
231 total,
232 success_count.max(0) as u32,
233 fail_count.max(0) as u32,
234 skip_count.max(0) as u32,
235 running_count.max(0) as u32,
236 state,
237 ))
238}
239
240fn decode_flow_row(
243 flow_id: FlowId,
244 row: &PgRow,
245 edge_groups: Vec<EdgeGroupSnapshot>,
246) -> Result<FlowSnapshot, EngineError> {
247 let public_flow_state: String = row.get("public_flow_state");
248 let graph_revision_i: i64 = row.get("graph_revision");
249 let created_at_ms: i64 = row.get("created_at_ms");
250 let terminal_at_ms: Option<i64> = row.get("terminal_at_ms");
251 let raw_fields: JsonValue = row.get("raw_fields");
252
253 let flow_kind = raw_str(&raw_fields, "flow_kind")
254 .unwrap_or("")
255 .to_owned();
256 let namespace_str = raw_str(&raw_fields, "namespace").unwrap_or("default");
257 let namespace = Namespace::new(namespace_str.to_owned());
258 let node_count = raw_u32(&raw_fields, "node_count").unwrap_or(0);
259 let edge_count = raw_u32(&raw_fields, "edge_count").unwrap_or(0);
260 let last_mutation_at_ms =
261 raw_u64(&raw_fields, "last_mutation_at_ms").map(|n| n as i64).unwrap_or(created_at_ms);
262
263 let cancelled_at = terminal_at_ms.map(TimestampMs);
264 let cancel_reason = raw_str(&raw_fields, "cancel_reason").map(str::to_owned);
265 let cancellation_policy = raw_str(&raw_fields, "cancellation_policy").map(str::to_owned);
266
267 let tags = extract_tags(&raw_fields);
268
269 let graph_revision = u64::try_from(graph_revision_i).unwrap_or(0);
270
271 Ok(FlowSnapshot::new(
272 flow_id,
273 flow_kind,
274 namespace,
275 public_flow_state,
276 graph_revision,
277 node_count,
278 edge_count,
279 TimestampMs(created_at_ms),
280 TimestampMs(last_mutation_at_ms),
281 cancelled_at,
282 cancel_reason,
283 cancellation_policy,
284 tags,
285 edge_groups,
286 ))
287}
288
289fn decode_edge_row(row: &PgRow, flow_id: &FlowId) -> Result<EdgeSnapshot, EngineError> {
295 let edge_uuid: Uuid = row.get("edge_id");
296 let upstream_uuid: Uuid = row.get("upstream_eid");
297 let downstream_uuid: Uuid = row.get("downstream_eid");
298 let part: i16 = row.get("partition_key");
299 let policy_raw: JsonValue = row.get("policy");
300
301 let upstream = ExecutionId::parse(&format!("{{fp:{part}}}:{upstream_uuid}"))
302 .map_err(|e| EngineError::Validation {
303 kind: ValidationKind::Corruption,
304 detail: format!("ff_edge.upstream_eid: {e}"),
305 })?;
306 let downstream = ExecutionId::parse(&format!("{{fp:{part}}}:{downstream_uuid}"))
307 .map_err(|e| EngineError::Validation {
308 kind: ValidationKind::Corruption,
309 detail: format!("ff_edge.downstream_eid: {e}"),
310 })?;
311
312 let dependency_kind = raw_str(&policy_raw, "dependency_kind")
313 .unwrap_or("success_only")
314 .to_owned();
315 let satisfaction_condition = raw_str(&policy_raw, "satisfaction_condition")
316 .unwrap_or("all_required")
317 .to_owned();
318 let data_passing_ref = raw_str(&policy_raw, "data_passing_ref")
319 .filter(|s| !s.is_empty())
320 .map(str::to_owned);
321 let edge_state = raw_str(&policy_raw, "edge_state")
322 .unwrap_or("pending")
323 .to_owned();
324 let created_at_ms =
325 raw_u64(&policy_raw, "created_at_ms").map(|n| n as i64).unwrap_or(0);
326 let created_by = raw_str(&policy_raw, "created_by")
327 .unwrap_or("engine")
328 .to_owned();
329
330 Ok(EdgeSnapshot::new(
331 EdgeId::from_uuid(edge_uuid),
332 flow_id.clone(),
333 upstream,
334 downstream,
335 dependency_kind,
336 satisfaction_condition,
337 data_passing_ref,
338 edge_state,
339 TimestampMs(created_at_ms),
340 created_by,
341 ))
342}
343
344pub async fn describe_flow(
346 pool: &PgPool,
347 partition_config: &ff_core::partition::PartitionConfig,
348 id: &FlowId,
349) -> Result<Option<FlowSnapshot>, EngineError> {
350 let part = flow_partition_byte(id, partition_config);
351 let flow_uuid: Uuid = id.0;
352
353 let flow_row_opt = sqlx::query(
354 "SELECT partition_key, flow_id, graph_revision, public_flow_state, \
355 created_at_ms, terminal_at_ms, raw_fields \
356 FROM ff_flow_core \
357 WHERE partition_key = $1 AND flow_id = $2",
358 )
359 .bind(part)
360 .bind(flow_uuid)
361 .fetch_optional(pool)
362 .await
363 .map_err(map_sqlx_error)?;
364
365 let Some(flow_row) = flow_row_opt else {
366 return Ok(None);
367 };
368
369 let eg_rows = sqlx::query(
370 "SELECT partition_key, flow_id, downstream_eid, policy, \
371 k_target, success_count, fail_count, skip_count, running_count \
372 FROM ff_edge_group \
373 WHERE partition_key = $1 AND flow_id = $2 \
374 ORDER BY downstream_eid",
375 )
376 .bind(part)
377 .bind(flow_uuid)
378 .fetch_all(pool)
379 .await
380 .map_err(map_sqlx_error)?;
381
382 let mut edge_groups = Vec::with_capacity(eg_rows.len());
383 for row in &eg_rows {
384 edge_groups.push(decode_edge_group_row(row)?);
385 }
386
387 decode_flow_row(id.clone(), &flow_row, edge_groups).map(Some)
388}
389
390pub async fn list_flows(
392 pool: &PgPool,
393 partition: PartitionKey,
394 cursor: Option<FlowId>,
395 limit: usize,
396) -> Result<ListFlowsPage, EngineError> {
397 if limit == 0 {
398 return Ok(ListFlowsPage::new(Vec::new(), None));
399 }
400 let part = partition_index_from_key(&partition)?;
401 let cursor_uuid: Option<Uuid> = cursor.as_ref().map(|f| f.0);
402 let fetch_limit = (limit + 1) as i64;
404
405 let rows = sqlx::query(
406 "SELECT flow_id, created_at_ms, public_flow_state \
407 FROM ff_flow_core \
408 WHERE partition_key = $1 \
409 AND ($2::uuid IS NULL OR flow_id > $2) \
410 ORDER BY flow_id \
411 LIMIT $3",
412 )
413 .bind(part)
414 .bind(cursor_uuid)
415 .bind(fetch_limit)
416 .fetch_all(pool)
417 .await
418 .map_err(map_sqlx_error)?;
419
420 let mut flows: Vec<FlowSummary> = Vec::with_capacity(rows.len().min(limit));
421 let mut next_cursor: Option<FlowId> = None;
422 for (idx, row) in rows.iter().enumerate() {
423 if idx >= limit {
424 if let Some(last) = flows.last() {
428 next_cursor = Some(last.flow_id.clone());
429 }
430 break;
431 }
432 let flow_uuid: Uuid = row.get("flow_id");
433 let created_at_ms: i64 = row.get("created_at_ms");
434 let public_state: String = row.get("public_flow_state");
435 let status = FlowStatus::from_public_flow_state(&public_state);
436 flows.push(FlowSummary::new(
437 FlowId::from_uuid(flow_uuid),
438 TimestampMs(created_at_ms),
439 status,
440 ));
441 }
442
443 Ok(ListFlowsPage::new(flows, next_cursor))
444}
445
446pub async fn list_edges(
448 pool: &PgPool,
449 partition_config: &ff_core::partition::PartitionConfig,
450 flow_id: &FlowId,
451 direction: EdgeDirection,
452) -> Result<Vec<EdgeSnapshot>, EngineError> {
453 let part = flow_partition_byte(flow_id, partition_config);
454 let flow_uuid: Uuid = flow_id.0;
455 let (column_filter, subject_eid) = match &direction {
456 EdgeDirection::Outgoing { from_node } => ("upstream_eid", from_node),
457 EdgeDirection::Incoming { to_node } => ("downstream_eid", to_node),
458 };
459 let subject_uuid = parse_exec_uuid(subject_eid)?;
461
462 let sql = format!(
463 "SELECT partition_key, flow_id, edge_id, upstream_eid, downstream_eid, policy \
464 FROM ff_edge \
465 WHERE partition_key = $1 AND flow_id = $2 AND {column_filter} = $3 \
466 ORDER BY edge_id"
467 );
468 let rows = sqlx::query(&sql)
469 .bind(part)
470 .bind(flow_uuid)
471 .bind(subject_uuid)
472 .fetch_all(pool)
473 .await
474 .map_err(map_sqlx_error)?;
475
476 let mut out = Vec::with_capacity(rows.len());
477 for row in &rows {
478 out.push(decode_edge_row(row, flow_id)?);
479 }
480 Ok(out)
481}
482
483pub async fn describe_edge(
485 pool: &PgPool,
486 partition_config: &ff_core::partition::PartitionConfig,
487 flow_id: &FlowId,
488 edge_id: &EdgeId,
489) -> Result<Option<EdgeSnapshot>, EngineError> {
490 let part = flow_partition_byte(flow_id, partition_config);
491 let row_opt = sqlx::query(
492 "SELECT partition_key, flow_id, edge_id, upstream_eid, downstream_eid, policy \
493 FROM ff_edge \
494 WHERE partition_key = $1 AND flow_id = $2 AND edge_id = $3",
495 )
496 .bind(part)
497 .bind(flow_id.0)
498 .bind(edge_id.0)
499 .fetch_optional(pool)
500 .await
501 .map_err(map_sqlx_error)?;
502
503 match row_opt {
504 Some(row) => decode_edge_row(&row, flow_id).map(Some),
505 None => Ok(None),
506 }
507}
508
509fn parse_exec_uuid(eid: &ExecutionId) -> Result<Uuid, EngineError> {
511 let s = eid.as_str();
512 let Some(colon) = s.rfind("}:") else {
513 return Err(EngineError::Validation {
514 kind: ValidationKind::InvalidInput,
515 detail: format!("execution_id missing '}}:' delimiter: {s}"),
516 });
517 };
518 let tail = &s[colon + 2..];
519 Uuid::parse_str(tail).map_err(|e| EngineError::Validation {
520 kind: ValidationKind::InvalidInput,
521 detail: format!("execution_id uuid suffix parse: {e}"),
522 })
523}
524
525fn cancel_policy_to_str(p: CancelFlowPolicy) -> &'static str {
526 match p {
527 CancelFlowPolicy::FlowOnly => "cancel_flow_only",
528 CancelFlowPolicy::CancelAll => "cancel_all",
529 CancelFlowPolicy::CancelPending => "cancel_pending",
530 _ => "cancel_all",
533 }
534}
535
536pub async fn cancel_flow(
556 pool: &PgPool,
557 partition_config: &ff_core::partition::PartitionConfig,
558 id: &FlowId,
559 policy: CancelFlowPolicy,
560) -> Result<CancelFlowResult, EngineError> {
561 let part = flow_partition_byte(id, partition_config);
562 let flow_uuid: Uuid = id.0;
563 let policy_str = cancel_policy_to_str(policy);
564
565 let mut last_transport: Option<EngineError> = None;
566 for attempt in 0..CANCEL_FLOW_MAX_ATTEMPTS {
567 match cancel_flow_once(pool, part, flow_uuid, policy, policy_str).await {
568 Ok(result) => return Ok(result),
569 Err(err) => {
570 if is_serialization_conflict(&err) {
571 if attempt + 1 < CANCEL_FLOW_MAX_ATTEMPTS {
573 let ms = 5u64 * (1u64 << attempt).saturating_sub(0);
574 tokio::time::sleep(Duration::from_millis(ms)).await;
575 }
576 last_transport = Some(err);
577 continue;
578 }
579 return Err(err);
580 }
581 }
582 }
583 let _ = last_transport; Err(EngineError::Contention(ContentionKind::RetryExhausted))
586}
587
588fn is_serialization_conflict(err: &EngineError) -> bool {
594 matches!(
595 err,
596 EngineError::Contention(ContentionKind::LeaseConflict)
597 )
598}
599
600async fn cancel_flow_once(
601 pool: &PgPool,
602 part: i16,
603 flow_uuid: Uuid,
604 policy: CancelFlowPolicy,
605 policy_str: &str,
606) -> Result<CancelFlowResult, EngineError> {
607 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
608
609 sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
610 .execute(&mut *tx)
611 .await
612 .map_err(map_sqlx_error)?;
613
614 let flow_found = sqlx::query(
619 "UPDATE ff_flow_core \
620 SET public_flow_state = 'cancelled', \
621 terminal_at_ms = COALESCE(terminal_at_ms, \
622 (extract(epoch from clock_timestamp())*1000)::bigint), \
623 raw_fields = raw_fields \
624 || jsonb_build_object('cancellation_policy', $3::text) \
625 WHERE partition_key = $1 AND flow_id = $2 \
626 RETURNING flow_id",
627 )
628 .bind(part)
629 .bind(flow_uuid)
630 .bind(policy_str)
631 .fetch_optional(&mut *tx)
632 .await
633 .map_err(map_sqlx_error)?;
634
635 if flow_found.is_none() {
636 tx.commit().await.map_err(map_sqlx_error)?;
637 return Ok(CancelFlowResult::Cancelled {
638 cancellation_policy: policy_str.to_owned(),
639 member_execution_ids: Vec::new(),
640 });
641 }
642
643 let member_rows = if matches!(policy, CancelFlowPolicy::FlowOnly) {
645 Vec::new()
646 } else {
647 let state_filter: &str = match policy {
648 CancelFlowPolicy::CancelAll => {
649 "lifecycle_phase NOT IN ('completed','failed','cancelled','expired')"
650 }
651 CancelFlowPolicy::CancelPending => {
652 "lifecycle_phase IN ('pending','blocked','eligible')"
653 }
654 _ => "lifecycle_phase NOT IN ('completed','failed','cancelled','expired')",
655 };
656 let sql = format!(
657 "SELECT execution_id FROM ff_exec_core \
658 WHERE partition_key = $1 AND flow_id = $2 AND {state_filter}"
659 );
660 sqlx::query(&sql)
661 .bind(part)
662 .bind(flow_uuid)
663 .fetch_all(&mut *tx)
664 .await
665 .map_err(map_sqlx_error)?
666 };
667
668 let mut member_execution_ids: Vec<String> = Vec::with_capacity(member_rows.len());
669 for row in &member_rows {
670 let exec_uuid: Uuid = row.get("execution_id");
671 sqlx::query(
673 "UPDATE ff_exec_core \
674 SET lifecycle_phase = 'cancelled', \
675 eligibility_state = 'cancelled', \
676 public_state = 'cancelled', \
677 terminal_at_ms = COALESCE(terminal_at_ms, \
678 (extract(epoch from clock_timestamp())*1000)::bigint), \
679 cancellation_reason = COALESCE(cancellation_reason, 'flow_cancelled'), \
680 cancelled_by = COALESCE(cancelled_by, 'cancel_flow') \
681 WHERE partition_key = $1 AND execution_id = $2",
682 )
683 .bind(part)
684 .bind(exec_uuid)
685 .execute(&mut *tx)
686 .await
687 .map_err(map_sqlx_error)?;
688
689 sqlx::query(
691 "INSERT INTO ff_completion_event \
692 (partition_key, execution_id, flow_id, outcome, occurred_at_ms) \
693 VALUES ($1, $2, $3, 'cancelled', \
694 (extract(epoch from clock_timestamp())*1000)::bigint)",
695 )
696 .bind(part)
697 .bind(exec_uuid)
698 .bind(flow_uuid)
699 .execute(&mut *tx)
700 .await
701 .map_err(map_sqlx_error)?;
702
703 sqlx::query(
707 "INSERT INTO ff_lease_event \
708 (execution_id, lease_id, event_type, occurred_at_ms, partition_key) \
709 VALUES ($1, NULL, 'revoked', \
710 (extract(epoch from clock_timestamp())*1000)::bigint, $2)",
711 )
712 .bind(exec_uuid.to_string())
713 .bind(i32::from(part))
714 .execute(&mut *tx)
715 .await
716 .map_err(map_sqlx_error)?;
717
718 member_execution_ids.push(format!("{{fp:{part}}}:{exec_uuid}"));
719 }
720
721 if matches!(policy, CancelFlowPolicy::CancelPending) {
725 sqlx::query(
726 "INSERT INTO ff_pending_cancel_groups \
727 (partition_key, flow_id, downstream_eid, enqueued_at_ms) \
728 SELECT partition_key, flow_id, downstream_eid, \
729 (extract(epoch from clock_timestamp())*1000)::bigint \
730 FROM ff_edge_group \
731 WHERE partition_key = $1 AND flow_id = $2 AND running_count > 0 \
732 ON CONFLICT DO NOTHING",
733 )
734 .bind(part)
735 .bind(flow_uuid)
736 .execute(&mut *tx)
737 .await
738 .map_err(map_sqlx_error)?;
739 }
740
741 tx.commit().await.map_err(map_sqlx_error)?;
742
743 Ok(CancelFlowResult::Cancelled {
744 cancellation_policy: policy_str.to_owned(),
745 member_execution_ids,
746 })
747}
748
749pub async fn set_edge_group_policy(
763 pool: &PgPool,
764 partition_config: &ff_core::partition::PartitionConfig,
765 flow_id: &FlowId,
766 downstream_execution_id: &ExecutionId,
767 policy: EdgeDependencyPolicy,
768) -> Result<SetEdgeGroupPolicyResult, EngineError> {
769 match &policy {
771 EdgeDependencyPolicy::AllOf => {}
772 EdgeDependencyPolicy::AnyOf { .. } => {}
773 EdgeDependencyPolicy::Quorum { k, .. } => {
774 if *k == 0 {
775 return Err(EngineError::Validation {
776 kind: ValidationKind::InvalidInput,
777 detail: "quorum k must be >= 1".to_string(),
778 });
779 }
780 }
781 _ => {
782 return Err(EngineError::Validation {
783 kind: ValidationKind::InvalidInput,
784 detail: "unknown EdgeDependencyPolicy variant".to_string(),
785 });
786 }
787 }
788
789 let part = flow_partition_byte(flow_id, partition_config);
790 let flow_uuid: Uuid = flow_id.0;
791 let downstream_uuid = parse_exec_uuid(downstream_execution_id)?;
792
793 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
794
795 let already_staged: i64 = sqlx::query_scalar(
797 "SELECT COUNT(*) FROM ff_edge \
798 WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3",
799 )
800 .bind(part)
801 .bind(flow_uuid)
802 .bind(downstream_uuid)
803 .fetch_one(&mut *tx)
804 .await
805 .map_err(map_sqlx_error)?;
806
807 if already_staged > 0 {
808 let _ = tx.rollback().await;
809 return Err(EngineError::Validation {
810 kind: ValidationKind::InvalidInput,
811 detail: "edge_group_policy_already_fixed: dependencies already staged".to_string(),
812 });
813 }
814
815 let existing: Option<JsonValue> = sqlx::query_scalar(
818 "SELECT policy FROM ff_edge_group \
819 WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3",
820 )
821 .bind(part)
822 .bind(flow_uuid)
823 .bind(downstream_uuid)
824 .fetch_optional(&mut *tx)
825 .await
826 .map_err(map_sqlx_error)?;
827
828 let encoded = encode_edge_policy(&policy);
829 if let Some(existing_policy) = existing
830 && existing_policy == encoded
831 {
832 tx.commit().await.map_err(map_sqlx_error)?;
833 return Ok(SetEdgeGroupPolicyResult::AlreadySet);
834 }
835
836 sqlx::query(
837 "INSERT INTO ff_edge_group \
838 (partition_key, flow_id, downstream_eid, policy) \
839 VALUES ($1, $2, $3, $4) \
840 ON CONFLICT (partition_key, flow_id, downstream_eid) \
841 DO UPDATE SET policy = EXCLUDED.policy",
842 )
843 .bind(part)
844 .bind(flow_uuid)
845 .bind(downstream_uuid)
846 .bind(&encoded)
847 .execute(&mut *tx)
848 .await
849 .map_err(map_sqlx_error)?;
850
851 tx.commit().await.map_err(map_sqlx_error)?;
852 Ok(SetEdgeGroupPolicyResult::Set)
853}
854
855#[allow(dead_code)]
858fn _unused_imports_anchor(_p: Partition, _f: PartitionFamily) {}