1use ff_core::contracts::{
42 AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
43 ApplyDependencyToChildResult, CreateFlowArgs, CreateFlowResult, StageDependencyEdgeArgs,
44 StageDependencyEdgeResult,
45};
46use ff_core::engine_error::{ConflictKind, ContentionKind, EngineError, ValidationKind};
47use ff_core::partition::PartitionConfig;
48use ff_core::types::{ExecutionId, FlowId};
49use serde_json::json;
50use sqlx::{PgPool, Row};
51use uuid::Uuid;
52
53use crate::error::map_sqlx_error;
54
55fn flow_partition_byte(flow_id: &FlowId, pc: &PartitionConfig) -> i16 {
57 ff_core::partition::flow_partition(flow_id, pc).index as i16
58}
59
60fn parse_exec_uuid(eid: &ExecutionId) -> Result<Uuid, EngineError> {
62 let s = eid.as_str();
63 let Some(colon) = s.rfind("}:") else {
64 return Err(EngineError::Validation {
65 kind: ValidationKind::InvalidInput,
66 detail: format!("execution_id missing '}}:' delimiter: {s}"),
67 });
68 };
69 Uuid::parse_str(&s[colon + 2..]).map_err(|e| EngineError::Validation {
70 kind: ValidationKind::InvalidInput,
71 detail: format!("execution_id uuid suffix: {e}"),
72 })
73}
74
75pub async fn create_flow(
83 pool: &PgPool,
84 pc: &PartitionConfig,
85 args: &CreateFlowArgs,
86) -> Result<CreateFlowResult, EngineError> {
87 let part = flow_partition_byte(&args.flow_id, pc);
88 let flow_uuid: Uuid = args.flow_id.0;
89 let now_ms = args.now.0;
90
91 let raw_fields = json!({
92 "flow_kind": args.flow_kind,
93 "namespace": args.namespace.as_str(),
94 "node_count": 0,
95 "edge_count": 0,
96 "last_mutation_at_ms": now_ms,
97 });
98
99 let inserted = sqlx::query(
103 r#"
104 INSERT INTO ff_flow_core
105 (partition_key, flow_id, graph_revision, public_flow_state,
106 created_at_ms, raw_fields)
107 VALUES ($1, $2, 0, 'open', $3, $4)
108 ON CONFLICT (partition_key, flow_id) DO NOTHING
109 RETURNING flow_id
110 "#,
111 )
112 .bind(part)
113 .bind(flow_uuid)
114 .bind(now_ms)
115 .bind(&raw_fields)
116 .fetch_optional(pool)
117 .await
118 .map_err(map_sqlx_error)?;
119
120 if inserted.is_some() {
121 Ok(CreateFlowResult::Created {
122 flow_id: args.flow_id.clone(),
123 })
124 } else {
125 Ok(CreateFlowResult::AlreadySatisfied {
126 flow_id: args.flow_id.clone(),
127 })
128 }
129}
130
131pub async fn add_execution_to_flow(
142 pool: &PgPool,
143 pc: &PartitionConfig,
144 args: &AddExecutionToFlowArgs,
145) -> Result<AddExecutionToFlowResult, EngineError> {
146 let part = flow_partition_byte(&args.flow_id, pc);
147 let flow_uuid: Uuid = args.flow_id.0;
148 let exec_uuid = parse_exec_uuid(&args.execution_id)?;
149 let now_ms = args.now.0;
150
151 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
152
153 let flow_row = sqlx::query(
155 "SELECT public_flow_state, raw_fields FROM ff_flow_core \
156 WHERE partition_key = $1 AND flow_id = $2 FOR UPDATE",
157 )
158 .bind(part)
159 .bind(flow_uuid)
160 .fetch_optional(&mut *tx)
161 .await
162 .map_err(map_sqlx_error)?;
163
164 let Some(flow_row) = flow_row else {
165 return Err(EngineError::Validation {
166 kind: ValidationKind::InvalidInput,
167 detail: "flow_not_found".to_string(),
168 });
169 };
170 let public_flow_state: String = flow_row.get("public_flow_state");
171 if matches!(
172 public_flow_state.as_str(),
173 "cancelled" | "completed" | "failed"
174 ) {
175 return Err(EngineError::Validation {
176 kind: ValidationKind::InvalidInput,
177 detail: "flow_already_terminal".to_string(),
178 });
179 }
180
181 let exec_row = sqlx::query(
185 "SELECT flow_id FROM ff_exec_core \
186 WHERE partition_key = $1 AND execution_id = $2 FOR UPDATE",
187 )
188 .bind(part)
189 .bind(exec_uuid)
190 .fetch_optional(&mut *tx)
191 .await
192 .map_err(map_sqlx_error)?;
193
194 let Some(exec_row) = exec_row else {
195 return Err(EngineError::Validation {
196 kind: ValidationKind::InvalidInput,
197 detail: "execution_not_found".to_string(),
198 });
199 };
200 let existing_flow_id: Option<Uuid> = exec_row.get("flow_id");
201
202 if existing_flow_id == Some(flow_uuid) {
204 let raw: serde_json::Value = flow_row.get("raw_fields");
205 let nc = raw
206 .get("node_count")
207 .and_then(|v| v.as_u64())
208 .and_then(|n| u32::try_from(n).ok())
209 .unwrap_or(0);
210 tx.commit().await.map_err(map_sqlx_error)?;
211 return Ok(AddExecutionToFlowResult::AlreadyMember {
212 execution_id: args.execution_id.clone(),
213 node_count: nc,
214 });
215 }
216
217 if let Some(other) = existing_flow_id
219 && other != flow_uuid
220 {
221 return Err(EngineError::Validation {
222 kind: ValidationKind::InvalidInput,
223 detail: format!("already_member_of_different_flow:{other}"),
224 });
225 }
226
227 sqlx::query(
229 "UPDATE ff_exec_core SET flow_id = $3 \
230 WHERE partition_key = $1 AND execution_id = $2",
231 )
232 .bind(part)
233 .bind(exec_uuid)
234 .bind(flow_uuid)
235 .execute(&mut *tx)
236 .await
237 .map_err(map_sqlx_error)?;
238
239 let bumped = sqlx::query(
240 r#"
241 UPDATE ff_flow_core
242 SET graph_revision = graph_revision + 1,
243 raw_fields = raw_fields
244 || jsonb_build_object(
245 'node_count',
246 COALESCE((raw_fields->>'node_count')::int, 0) + 1,
247 'last_mutation_at_ms', $3::bigint
248 )
249 WHERE partition_key = $1 AND flow_id = $2
250 RETURNING (raw_fields->>'node_count')::int AS node_count
251 "#,
252 )
253 .bind(part)
254 .bind(flow_uuid)
255 .bind(now_ms)
256 .fetch_one(&mut *tx)
257 .await
258 .map_err(map_sqlx_error)?;
259 let new_nc: i32 = bumped.get("node_count");
260
261 tx.commit().await.map_err(map_sqlx_error)?;
262
263 Ok(AddExecutionToFlowResult::Added {
264 execution_id: args.execution_id.clone(),
265 new_node_count: u32::try_from(new_nc.max(0)).unwrap_or(0),
266 })
267}
268
269pub async fn stage_dependency_edge(
287 pool: &PgPool,
288 pc: &PartitionConfig,
289 args: &StageDependencyEdgeArgs,
290) -> Result<StageDependencyEdgeResult, EngineError> {
291 if args.upstream_execution_id == args.downstream_execution_id {
292 return Err(EngineError::Validation {
293 kind: ValidationKind::InvalidInput,
294 detail: "self_referencing_edge".to_string(),
295 });
296 }
297
298 let part = flow_partition_byte(&args.flow_id, pc);
299 let flow_uuid: Uuid = args.flow_id.0;
300 let edge_uuid: Uuid = args.edge_id.0;
301 let upstream_uuid = parse_exec_uuid(&args.upstream_execution_id)?;
302 let downstream_uuid = parse_exec_uuid(&args.downstream_execution_id)?;
303 let now_ms = args.now.0;
304 let expected_rev = i64::try_from(args.expected_graph_revision).unwrap_or(i64::MAX);
305
306 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
307
308 let bumped = sqlx::query(
312 r#"
313 UPDATE ff_flow_core
314 SET graph_revision = graph_revision + 1,
315 raw_fields = raw_fields
316 || jsonb_build_object(
317 'edge_count',
318 COALESCE((raw_fields->>'edge_count')::int, 0) + 1,
319 'last_mutation_at_ms', $4::bigint
320 )
321 WHERE partition_key = $1
322 AND flow_id = $2
323 AND graph_revision = $3
324 AND public_flow_state = 'open'
325 RETURNING graph_revision
326 "#,
327 )
328 .bind(part)
329 .bind(flow_uuid)
330 .bind(expected_rev)
331 .bind(now_ms)
332 .fetch_optional(&mut *tx)
333 .await
334 .map_err(map_sqlx_error)?;
335
336 let Some(bumped_row) = bumped else {
337 let probe = sqlx::query(
339 "SELECT graph_revision, public_flow_state FROM ff_flow_core \
340 WHERE partition_key = $1 AND flow_id = $2",
341 )
342 .bind(part)
343 .bind(flow_uuid)
344 .fetch_optional(&mut *tx)
345 .await
346 .map_err(map_sqlx_error)?;
347 let _ = tx.rollback().await;
348 return match probe {
349 None => Err(EngineError::Validation {
350 kind: ValidationKind::InvalidInput,
351 detail: "flow_not_found".to_string(),
352 }),
353 Some(r) => {
354 let state: String = r.get("public_flow_state");
355 if matches!(state.as_str(), "cancelled" | "completed" | "failed") {
356 Err(EngineError::Validation {
357 kind: ValidationKind::InvalidInput,
358 detail: "flow_already_terminal".to_string(),
359 })
360 } else {
361 Err(EngineError::Contention(ContentionKind::StaleGraphRevision))
362 }
363 }
364 };
365 };
366 let new_rev: i64 = bumped_row.get("graph_revision");
367
368 let members: Vec<Uuid> = sqlx::query_scalar(
370 "SELECT execution_id FROM ff_exec_core \
371 WHERE partition_key = $1 AND flow_id = $2 \
372 AND execution_id = ANY($3)",
373 )
374 .bind(part)
375 .bind(flow_uuid)
376 .bind(&[upstream_uuid, downstream_uuid][..])
377 .fetch_all(&mut *tx)
378 .await
379 .map_err(map_sqlx_error)?;
380 if !members.contains(&upstream_uuid) || !members.contains(&downstream_uuid) {
381 let _ = tx.rollback().await;
382 return Err(EngineError::Validation {
383 kind: ValidationKind::InvalidInput,
384 detail: "execution_not_in_flow".to_string(),
385 });
386 }
387
388 let policy = json!({
392 "dependency_kind": args.dependency_kind,
393 "satisfaction_condition": "all_required",
394 "data_passing_ref": args.data_passing_ref.clone().unwrap_or_default(),
395 "edge_state": "pending",
396 "created_at_ms": now_ms,
397 "created_by": "engine",
398 "staged_at_ms": now_ms,
399 "applied_at_ms": serde_json::Value::Null,
400 });
401
402 let insert = sqlx::query(
403 r#"
404 INSERT INTO ff_edge
405 (partition_key, flow_id, edge_id, upstream_eid, downstream_eid,
406 policy, policy_version)
407 VALUES ($1, $2, $3, $4, $5, $6, 0)
408 ON CONFLICT (partition_key, flow_id, edge_id) DO NOTHING
409 RETURNING edge_id
410 "#,
411 )
412 .bind(part)
413 .bind(flow_uuid)
414 .bind(edge_uuid)
415 .bind(upstream_uuid)
416 .bind(downstream_uuid)
417 .bind(&policy)
418 .fetch_optional(&mut *tx)
419 .await
420 .map_err(map_sqlx_error)?;
421
422 if insert.is_none() {
423 let _ = tx.rollback().await;
428 let existing = crate::flow::describe_edge(pool, pc, &args.flow_id, &args.edge_id)
430 .await?
431 .ok_or_else(|| EngineError::Validation {
432 kind: ValidationKind::Corruption,
433 detail: "edge vanished between insert and describe".to_string(),
434 })?;
435 return Err(EngineError::Conflict(
436 ConflictKind::DependencyAlreadyExists { existing },
437 ));
438 }
439
440 tx.commit().await.map_err(map_sqlx_error)?;
441
442 Ok(StageDependencyEdgeResult::Staged {
443 edge_id: args.edge_id.clone(),
444 new_graph_revision: u64::try_from(new_rev).unwrap_or(0),
445 })
446}
447
448pub async fn apply_dependency_to_child(
464 pool: &PgPool,
465 pc: &PartitionConfig,
466 args: &ApplyDependencyToChildArgs,
467) -> Result<ApplyDependencyToChildResult, EngineError> {
468 let part = flow_partition_byte(&args.flow_id, pc);
469 let flow_uuid: Uuid = args.flow_id.0;
470 let edge_uuid: Uuid = args.edge_id.0;
471 let downstream_uuid = parse_exec_uuid(&args.downstream_execution_id)?;
472 let now_ms = args.now.0;
473
474 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
475
476 let edge_row = sqlx::query(
478 "SELECT policy FROM ff_edge \
479 WHERE partition_key = $1 AND flow_id = $2 AND edge_id = $3 \
480 FOR UPDATE",
481 )
482 .bind(part)
483 .bind(flow_uuid)
484 .bind(edge_uuid)
485 .fetch_optional(&mut *tx)
486 .await
487 .map_err(map_sqlx_error)?;
488
489 let Some(edge_row) = edge_row else {
490 return Err(EngineError::Validation {
491 kind: ValidationKind::InvalidInput,
492 detail: "edge_not_found".to_string(),
493 });
494 };
495 let mut policy: serde_json::Value = edge_row.get("policy");
496
497 let already_applied = policy
501 .get("applied_at_ms")
502 .and_then(|v| v.as_i64())
503 .is_some();
504 if already_applied {
505 tx.commit().await.map_err(map_sqlx_error)?;
506 return Ok(ApplyDependencyToChildResult::AlreadyApplied);
507 }
508
509 if let Some(obj) = policy.as_object_mut() {
511 obj.insert("applied_at_ms".to_string(), json!(now_ms));
512 obj.insert("edge_state".to_string(), json!("applied"));
513 }
514 sqlx::query(
515 "UPDATE ff_edge SET policy = $4 \
516 WHERE partition_key = $1 AND flow_id = $2 AND edge_id = $3",
517 )
518 .bind(part)
519 .bind(flow_uuid)
520 .bind(edge_uuid)
521 .bind(&policy)
522 .execute(&mut *tx)
523 .await
524 .map_err(map_sqlx_error)?;
525
526 sqlx::query(
531 r#"
532 INSERT INTO ff_edge_group
533 (partition_key, flow_id, downstream_eid, policy, running_count)
534 VALUES ($1, $2, $3, $4, 1)
535 ON CONFLICT (partition_key, flow_id, downstream_eid) DO UPDATE
536 SET running_count = ff_edge_group.running_count + 1
537 "#,
538 )
539 .bind(part)
540 .bind(flow_uuid)
541 .bind(downstream_uuid)
542 .bind(json!({ "kind": "all_of" }))
543 .execute(&mut *tx)
544 .await
545 .map_err(map_sqlx_error)?;
546
547 let unsatisfied: i32 = sqlx::query_scalar(
549 "SELECT running_count FROM ff_edge_group \
550 WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3",
551 )
552 .bind(part)
553 .bind(flow_uuid)
554 .bind(downstream_uuid)
555 .fetch_one(&mut *tx)
556 .await
557 .map_err(map_sqlx_error)?;
558
559 tx.commit().await.map_err(map_sqlx_error)?;
560
561 Ok(ApplyDependencyToChildResult::Applied {
562 unsatisfied_count: u32::try_from(unsatisfied.max(0)).unwrap_or(0),
563 })
564}