1use std::collections::HashMap;
36use std::time::{SystemTime, UNIX_EPOCH};
37
38use ff_core::contracts::decode::build_execution_snapshot;
39use ff_core::contracts::{CreateExecutionArgs, ExecutionInfo, ExecutionSnapshot, ListExecutionsPage};
40use ff_core::engine_error::{EngineError, ValidationKind};
41use ff_core::partition::{PartitionConfig, PartitionKey};
42use ff_core::state::{
43 AttemptState, BlockingReason, EligibilityState, LifecyclePhase, OwnershipState, PublicState,
44 StateVector, TerminalOutcome,
45};
46use ff_core::types::{ExecutionId, FlowId};
47use serde_json::Value as JsonValue;
48use sqlx::{PgPool, Row};
49use uuid::Uuid;
50
51use crate::error::map_sqlx_error;
52
53fn eid_uuid(eid: &ExecutionId) -> Uuid {
58 let s = eid.as_str();
59 let suffix = s
61 .split_once("}:")
62 .map(|(_, u)| u)
63 .expect("ExecutionId has `}:` separator (invariant)");
64 Uuid::parse_str(suffix).expect("ExecutionId suffix is a valid UUID (invariant)")
65}
66
67fn eid_from_parts(partition: u16, uuid: Uuid) -> Result<ExecutionId, EngineError> {
71 let s = format!("{{fp:{partition}}}:{uuid}");
72 ExecutionId::parse(&s).map_err(|e| EngineError::Validation {
73 kind: ValidationKind::Corruption,
74 detail: format!("exec_core: execution_id: could not reassemble '{s}': {e}"),
75 })
76}
77
78fn now_ms() -> i64 {
79 let d = SystemTime::now()
80 .duration_since(UNIX_EPOCH)
81 .expect("clock is after UNIX_EPOCH");
82 (d.as_millis() as i64).max(0)
83}
84
85pub(super) async fn create_execution_impl(
94 pool: &PgPool,
95 _partition_config: &PartitionConfig,
96 args: CreateExecutionArgs,
97) -> Result<ExecutionId, EngineError> {
98 let partition_key: i16 = args.execution_id.partition() as i16;
99 let execution_id = eid_uuid(&args.execution_id);
100 let lane_id = args.lane_id.as_str().to_owned();
101 let priority: i32 = args.priority;
102 let created_at_ms: i64 = args.now.0;
103 let deadline_at_ms: Option<i64> = args.execution_deadline_at.map(|t| t.0);
104
105 let mut raw: serde_json::Map<String, JsonValue> = serde_json::Map::new();
110 raw.insert(
111 "namespace".into(),
112 JsonValue::String(args.namespace.as_str().to_owned()),
113 );
114 raw.insert("execution_kind".into(), JsonValue::String(args.execution_kind));
115 raw.insert(
116 "creator_identity".into(),
117 JsonValue::String(args.creator_identity),
118 );
119 if let Some(k) = args.idempotency_key {
120 raw.insert("idempotency_key".into(), JsonValue::String(k));
121 }
122 if let Some(enc) = args.payload_encoding {
123 raw.insert("payload_encoding".into(), JsonValue::String(enc));
124 }
125 raw.insert(
128 "last_mutation_at".into(),
129 JsonValue::String(created_at_ms.to_string()),
130 );
131 raw.insert(
132 "total_attempt_count".into(),
133 JsonValue::String("0".to_owned()),
134 );
135 let tags_json: serde_json::Map<String, JsonValue> = args
137 .tags
138 .into_iter()
139 .map(|(k, v)| (k, JsonValue::String(v)))
140 .collect();
141 raw.insert("tags".into(), JsonValue::Object(tags_json));
142
143 let raw_fields = JsonValue::Object(raw);
144 let policy_json: Option<JsonValue> = match args.policy {
145 Some(p) => Some(serde_json::to_value(&p).map_err(|e| EngineError::Validation {
146 kind: ValidationKind::InvalidInput,
147 detail: format!("create_execution: policy: serialize failed: {e}"),
148 })?),
149 None => None,
150 };
151
152 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
156
157 sqlx::query(
158 r#"
159 INSERT INTO ff_exec_core (
160 partition_key, execution_id, flow_id, lane_id,
161 required_capabilities, attempt_index,
162 lifecycle_phase, ownership_state, eligibility_state,
163 public_state, attempt_state,
164 priority, created_at_ms, deadline_at_ms,
165 payload, policy, raw_fields
166 ) VALUES (
167 $1, $2, NULL, $3,
168 '{}'::text[], 0,
169 'submitted', 'unowned', 'eligible_now',
170 'waiting', 'pending',
171 $4, $5, $6,
172 $7, $8, $9
173 )
174 ON CONFLICT (partition_key, execution_id) DO NOTHING
175 "#,
176 )
177 .bind(partition_key)
178 .bind(execution_id)
179 .bind(&lane_id)
180 .bind(priority)
181 .bind(created_at_ms)
182 .bind(deadline_at_ms)
183 .bind(&args.input_payload)
184 .bind(policy_json)
185 .bind(&raw_fields)
186 .execute(&mut *tx)
187 .await
188 .map_err(map_sqlx_error)?;
189
190 sqlx::query(
192 r#"
193 INSERT INTO ff_lane_registry (lane_id, registered_at_ms, registered_by)
194 VALUES ($1, $2, $3)
195 ON CONFLICT (lane_id) DO NOTHING
196 "#,
197 )
198 .bind(&lane_id)
199 .bind(created_at_ms)
200 .bind("create_execution")
201 .execute(&mut *tx)
202 .await
203 .map_err(map_sqlx_error)?;
204
205 tx.commit().await.map_err(map_sqlx_error)?;
206
207 Ok(args.execution_id)
208}
209
210pub(super) async fn describe_execution_impl(
213 pool: &PgPool,
214 _partition_config: &PartitionConfig,
215 id: &ExecutionId,
216) -> Result<Option<ExecutionSnapshot>, EngineError> {
217 let partition_key: i16 = id.partition() as i16;
218 let execution_id = eid_uuid(id);
219
220 let row = sqlx::query(
221 r#"
222 SELECT flow_id, lane_id, public_state, blocking_reason,
223 created_at_ms, raw_fields
224 FROM ff_exec_core
225 WHERE partition_key = $1 AND execution_id = $2
226 "#,
227 )
228 .bind(partition_key)
229 .bind(execution_id)
230 .fetch_optional(pool)
231 .await
232 .map_err(map_sqlx_error)?;
233
234 let Some(row) = row else {
235 return Ok(None);
236 };
237
238 let flow_id_uuid: Option<Uuid> = row.try_get("flow_id").map_err(map_sqlx_error)?;
239 let lane_id: String = row.try_get("lane_id").map_err(map_sqlx_error)?;
240 let public_state: String = row.try_get("public_state").map_err(map_sqlx_error)?;
241 let blocking_reason: Option<String> =
242 row.try_get("blocking_reason").map_err(map_sqlx_error)?;
243 let created_at_ms: i64 = row.try_get("created_at_ms").map_err(map_sqlx_error)?;
244 let raw_fields: JsonValue = row.try_get("raw_fields").map_err(map_sqlx_error)?;
245
246 let mut core: HashMap<String, String> = HashMap::new();
250 core.insert("public_state".into(), public_state);
251 core.insert("lane_id".into(), lane_id);
252 if let Some(fid) = flow_id_uuid {
253 core.insert(
256 "flow_id".into(),
257 format!("{{fp:{part}}}:{fid}", part = id.partition()),
258 );
259 }
260 if let Some(r) = blocking_reason {
261 core.insert("blocking_reason".into(), r);
262 }
263 core.insert("created_at".into(), created_at_ms.to_string());
264
265 if let JsonValue::Object(map) = &raw_fields {
269 for key in [
270 "namespace",
271 "last_mutation_at",
272 "total_attempt_count",
273 "current_attempt_id",
274 "current_attempt_index",
275 "current_waitpoint_id",
276 "blocking_detail",
277 ] {
278 if let Some(JsonValue::String(s)) = map.get(key) {
279 core.insert(key.to_owned(), s.clone());
280 }
281 }
282 }
283
284 let tags_raw: HashMap<String, String> = match &raw_fields {
286 JsonValue::Object(map) => match map.get("tags") {
287 Some(JsonValue::Object(tag_map)) => tag_map
288 .iter()
289 .filter_map(|(k, v)| {
290 v.as_str().map(|s| (k.clone(), s.to_owned()))
291 })
292 .collect(),
293 _ => HashMap::new(),
294 },
295 _ => HashMap::new(),
296 };
297
298 build_execution_snapshot(id.clone(), &core, tags_raw)
299}
300
301pub(super) async fn list_executions_impl(
304 pool: &PgPool,
305 _partition_config: &PartitionConfig,
306 partition: PartitionKey,
307 cursor: Option<ExecutionId>,
308 limit: usize,
309) -> Result<ListExecutionsPage, EngineError> {
310 if limit == 0 {
311 return Ok(ListExecutionsPage::new(Vec::new(), None));
312 }
313 let parsed = partition.parse().map_err(|e| EngineError::Validation {
315 kind: ValidationKind::InvalidInput,
316 detail: format!("list_executions: partition: '{partition}': {e}"),
317 })?;
318 let partition_key: i16 = parsed.index as i16;
319 let cursor_uuid: Option<Uuid> = cursor.as_ref().map(eid_uuid);
320
321 let effective_limit = limit.min(1000);
323 let fetch_limit: i64 = (effective_limit as i64).saturating_add(1);
324
325 let rows = sqlx::query(
326 r#"
327 SELECT execution_id
328 FROM ff_exec_core
329 WHERE partition_key = $1
330 AND ($2::uuid IS NULL OR execution_id > $2)
331 ORDER BY execution_id ASC
332 LIMIT $3
333 "#,
334 )
335 .bind(partition_key)
336 .bind(cursor_uuid)
337 .bind(fetch_limit)
338 .fetch_all(pool)
339 .await
340 .map_err(map_sqlx_error)?;
341
342 let mut ids: Vec<ExecutionId> = Vec::with_capacity(rows.len());
343 for row in &rows {
344 let u: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
345 ids.push(eid_from_parts(parsed.index, u)?);
346 }
347
348 let has_more = ids.len() > effective_limit;
349 if has_more {
350 ids.truncate(effective_limit);
351 }
352 let next_cursor = if has_more { ids.last().cloned() } else { None };
353 Ok(ListExecutionsPage::new(ids, next_cursor))
354}
355
356pub(super) async fn cancel_impl(
366 pool: &PgPool,
367 _partition_config: &PartitionConfig,
368 execution_id: &ExecutionId,
369 reason: &str,
370) -> Result<(), EngineError> {
371 let partition_key: i16 = execution_id.partition() as i16;
372 let eid_uuid = eid_uuid(execution_id);
373 let now = now_ms();
374
375 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
376
377 let current: Option<(String, String)> = sqlx::query_as(
378 r#"
379 SELECT lifecycle_phase, public_state
380 FROM ff_exec_core
381 WHERE partition_key = $1 AND execution_id = $2
382 FOR UPDATE
383 "#,
384 )
385 .bind(partition_key)
386 .bind(eid_uuid)
387 .fetch_optional(&mut *tx)
388 .await
389 .map_err(map_sqlx_error)?;
390
391 let Some((lifecycle_phase, public_state)) = current else {
392 tx.rollback().await.map_err(map_sqlx_error)?;
395 return Err(EngineError::Validation {
396 kind: ValidationKind::InvalidInput,
397 detail: format!(
398 "cancel: execution_id={execution_id}: row not found on partition_key={partition_key}"
399 ),
400 });
401 };
402
403 if lifecycle_phase == "terminal" {
406 tx.rollback().await.map_err(map_sqlx_error)?;
407 return if public_state == "cancelled" {
411 Ok(())
412 } else {
413 Err(EngineError::Validation {
414 kind: ValidationKind::InvalidInput,
415 detail: format!(
416 "cancel: execution_id={execution_id}: already terminal in state '{public_state}'"
417 ),
418 })
419 };
420 }
421
422 sqlx::query(
423 r#"
424 UPDATE ff_exec_core
425 SET lifecycle_phase = 'terminal',
426 ownership_state = 'unowned',
427 eligibility_state = 'not_applicable',
428 public_state = 'cancelled',
429 attempt_state = 'cancelled',
430 terminal_at_ms = $3,
431 cancellation_reason = $4,
432 cancelled_by = 'worker',
433 raw_fields = jsonb_set(raw_fields, '{last_mutation_at}', to_jsonb($3::text))
434 WHERE partition_key = $1 AND execution_id = $2
435 "#,
436 )
437 .bind(partition_key)
438 .bind(eid_uuid)
439 .bind(now)
440 .bind(reason)
441 .execute(&mut *tx)
442 .await
443 .map_err(map_sqlx_error)?;
444
445 tx.commit().await.map_err(map_sqlx_error)?;
446 Ok(())
447}
448
449pub(super) async fn resolve_execution_flow_id_impl(
452 pool: &PgPool,
453 _partition_config: &PartitionConfig,
454 eid: &ExecutionId,
455) -> Result<Option<FlowId>, EngineError> {
456 let partition_key: i16 = eid.partition() as i16;
457 let execution_id = eid_uuid(eid);
458
459 let row: Option<(Option<Uuid>,)> = sqlx::query_as(
460 r#"
461 SELECT flow_id
462 FROM ff_exec_core
463 WHERE partition_key = $1 AND execution_id = $2
464 "#,
465 )
466 .bind(partition_key)
467 .bind(execution_id)
468 .fetch_optional(pool)
469 .await
470 .map_err(map_sqlx_error)?;
471
472 let Some((maybe_fid,)) = row else {
473 return Ok(None);
474 };
475 let Some(fid_uuid) = maybe_fid else {
476 return Ok(None);
477 };
478 let s = fid_uuid.to_string();
479 FlowId::parse(&s)
480 .map(Some)
481 .map_err(|e| EngineError::Validation {
482 kind: ValidationKind::Corruption,
483 detail: format!(
484 "resolve_execution_flow_id: exec_core.flow_id='{s}' is not a valid FlowId: {e}"
485 ),
486 })
487}
488
489fn normalise_lifecycle_phase(raw: &str) -> &str {
513 match raw {
514 "cancelled" | "terminal" => "terminal",
517 "pending" | "runnable" | "eligible" | "blocked" => "runnable",
522 "active" => "active",
523 "suspended" => "suspended",
524 "submitted" => "submitted",
525 other => other,
526 }
527}
528
529fn normalise_ownership_state(raw: &str) -> &str {
530 match raw {
531 "released" | "unowned" => "unowned",
534 "leased" => "leased",
535 "lease_expired_reclaimable" => "lease_expired_reclaimable",
536 "lease_revoked" => "lease_revoked",
537 other => other,
538 }
539}
540
541fn normalise_eligibility_state(raw: &str) -> &str {
542 match raw {
543 "cancelled" => "not_applicable",
545 "pending_claim" => "eligible_now",
550 other => other,
551 }
552}
553
554fn normalise_attempt_state(raw: &str) -> &str {
555 match raw {
556 "pending" | "pending_claim" => "pending_first_attempt",
560 "running" => "running_attempt",
563 "cancelled" => "attempt_terminal",
566 other => other,
567 }
568}
569
570fn normalise_public_state(raw: &str) -> &str {
573 match raw {
574 "running" => "active",
578 other => other,
579 }
580}
581
582macro_rules! json_enum {
583 ($ty:ty, $field:expr, $raw:expr) => {{
584 let quoted = format!("\"{}\"", $raw);
585 serde_json::from_str::<$ty>("ed).map_err(|e| EngineError::Validation {
586 kind: ValidationKind::Corruption,
587 detail: format!(
588 "exec_core: {}: '{}' is not a known value: {}",
589 $field, $raw, e
590 ),
591 })
592 }};
593}
594
595fn derive_terminal_outcome(
599 phase_norm: &str,
600 phase_raw: &str,
601 attempt_outcome: Option<&str>,
602) -> TerminalOutcome {
603 if phase_norm != "terminal" {
604 return TerminalOutcome::None;
605 }
606 if phase_raw == "cancelled" {
607 return TerminalOutcome::Cancelled;
608 }
609 match attempt_outcome {
610 Some("success") => TerminalOutcome::Success,
611 Some("failed") => TerminalOutcome::Failed,
612 Some("cancelled") => TerminalOutcome::Cancelled,
613 Some("expired") => TerminalOutcome::Expired,
614 Some("skipped") => TerminalOutcome::Skipped,
615 _ => TerminalOutcome::None,
616 }
617}
618
619pub(super) async fn read_execution_state_impl(
622 pool: &PgPool,
623 _partition_config: &PartitionConfig,
624 id: &ExecutionId,
625) -> Result<Option<PublicState>, EngineError> {
626 let partition_key: i16 = id.partition() as i16;
627 let execution_id = eid_uuid(id);
628
629 let row: Option<(String,)> = sqlx::query_as(
630 r#"
631 SELECT public_state
632 FROM ff_exec_core
633 WHERE partition_key = $1 AND execution_id = $2
634 "#,
635 )
636 .bind(partition_key)
637 .bind(execution_id)
638 .fetch_optional(pool)
639 .await
640 .map_err(map_sqlx_error)?;
641
642 let Some((raw,)) = row else {
643 return Ok(None);
644 };
645 let parsed: PublicState =
646 json_enum!(PublicState, "public_state", normalise_public_state(&raw))?;
647 Ok(Some(parsed))
648}
649
650pub(super) async fn read_execution_info_impl(
656 pool: &PgPool,
657 _partition_config: &PartitionConfig,
658 id: &ExecutionId,
659) -> Result<Option<ExecutionInfo>, EngineError> {
660 let partition_key: i16 = id.partition() as i16;
661 let execution_id = eid_uuid(id);
662
663 let row = sqlx::query(
669 r#"
670 SELECT ec.flow_id,
671 ec.lane_id,
672 ec.priority,
673 ec.lifecycle_phase,
674 ec.ownership_state,
675 ec.eligibility_state,
676 ec.public_state,
677 ec.attempt_state,
678 ec.blocking_reason,
679 ec.attempt_index,
680 ec.created_at_ms,
681 ec.terminal_at_ms,
682 ec.raw_fields,
683 cur.outcome AS attempt_outcome,
684 first_att.started_at_ms AS first_started_at_ms
685 FROM ff_exec_core ec
686 LEFT JOIN LATERAL (
687 SELECT outcome
688 FROM ff_attempt
689 WHERE partition_key = ec.partition_key
690 AND execution_id = ec.execution_id
691 AND attempt_index = ec.attempt_index
692 ) cur ON TRUE
693 LEFT JOIN LATERAL (
694 SELECT started_at_ms
695 FROM ff_attempt
696 WHERE partition_key = ec.partition_key
697 AND execution_id = ec.execution_id
698 AND started_at_ms IS NOT NULL
699 ORDER BY attempt_index ASC
700 LIMIT 1
701 ) first_att ON TRUE
702 WHERE ec.partition_key = $1 AND ec.execution_id = $2
703 "#,
704 )
705 .bind(partition_key)
706 .bind(execution_id)
707 .fetch_optional(pool)
708 .await
709 .map_err(map_sqlx_error)?;
710
711 let Some(row) = row else {
712 return Ok(None);
713 };
714
715 let flow_id_uuid: Option<Uuid> = row.try_get("flow_id").map_err(map_sqlx_error)?;
716 let lane_id: String = row.try_get("lane_id").map_err(map_sqlx_error)?;
717 let priority: i32 = row.try_get("priority").map_err(map_sqlx_error)?;
718 let lifecycle_phase_raw: String =
719 row.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
720 let ownership_state_raw: String =
721 row.try_get("ownership_state").map_err(map_sqlx_error)?;
722 let eligibility_state_raw: String =
723 row.try_get("eligibility_state").map_err(map_sqlx_error)?;
724 let public_state_raw: String = row.try_get("public_state").map_err(map_sqlx_error)?;
725 let attempt_state_raw: String = row.try_get("attempt_state").map_err(map_sqlx_error)?;
726 let blocking_reason_opt: Option<String> =
727 row.try_get("blocking_reason").map_err(map_sqlx_error)?;
728 let attempt_index: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
729 let created_at_ms: i64 = row.try_get("created_at_ms").map_err(map_sqlx_error)?;
730 let terminal_at_ms_opt: Option<i64> =
731 row.try_get("terminal_at_ms").map_err(map_sqlx_error)?;
732 let raw_fields: JsonValue = row.try_get("raw_fields").map_err(map_sqlx_error)?;
733 let attempt_outcome_opt: Option<String> =
734 row.try_get("attempt_outcome").map_err(map_sqlx_error)?;
735 let first_started_at_ms_opt: Option<i64> =
736 row.try_get("first_started_at_ms").map_err(map_sqlx_error)?;
737
738 let lifecycle_phase: LifecyclePhase = json_enum!(
739 LifecyclePhase,
740 "lifecycle_phase",
741 normalise_lifecycle_phase(&lifecycle_phase_raw)
742 )?;
743 let ownership_state: OwnershipState = json_enum!(
744 OwnershipState,
745 "ownership_state",
746 normalise_ownership_state(&ownership_state_raw)
747 )?;
748 let eligibility_state: EligibilityState = json_enum!(
749 EligibilityState,
750 "eligibility_state",
751 normalise_eligibility_state(&eligibility_state_raw)
752 )?;
753 let public_state: PublicState = json_enum!(
754 PublicState,
755 "public_state",
756 normalise_public_state(&public_state_raw)
757 )?;
758 let attempt_state: AttemptState = json_enum!(
759 AttemptState,
760 "attempt_state",
761 normalise_attempt_state(&attempt_state_raw)
762 )?;
763 let blocking_reason: BlockingReason = match blocking_reason_opt
764 .as_deref()
765 .filter(|s| !s.is_empty())
766 {
767 None => BlockingReason::None,
768 Some(raw) => json_enum!(BlockingReason, "blocking_reason", raw)?,
769 };
770 let terminal_outcome = derive_terminal_outcome(
771 normalise_lifecycle_phase(&lifecycle_phase_raw),
772 &lifecycle_phase_raw,
773 attempt_outcome_opt.as_deref(),
774 );
775
776 let state_vector = StateVector {
777 lifecycle_phase,
778 ownership_state,
779 eligibility_state,
780 blocking_reason,
781 terminal_outcome,
782 attempt_state,
783 public_state,
784 };
785
786 let mut namespace = String::new();
789 let mut execution_kind = String::new();
790 let mut blocking_detail = String::new();
791 if let JsonValue::Object(map) = &raw_fields {
792 if let Some(JsonValue::String(s)) = map.get("namespace") {
793 namespace = s.clone();
794 }
795 if let Some(JsonValue::String(s)) = map.get("execution_kind") {
796 execution_kind = s.clone();
797 }
798 if let Some(JsonValue::String(s)) = map.get("blocking_detail") {
799 blocking_detail = s.clone();
800 }
801 }
802
803 let flow_id = flow_id_uuid.map(|fid| fid.to_string());
807
808 Ok(Some(ExecutionInfo {
809 execution_id: id.clone(),
810 namespace,
811 lane_id,
812 priority,
813 execution_kind,
814 state_vector,
815 public_state,
816 created_at: created_at_ms.to_string(),
817 started_at: first_started_at_ms_opt.map(|v| v.to_string()),
818 completed_at: terminal_at_ms_opt.map(|v| v.to_string()),
819 current_attempt_index: attempt_index.max(0) as u32,
820 flow_id,
821 blocking_detail,
822 }))
823}
824
825pub(super) async fn get_execution_result_impl(
831 pool: &PgPool,
832 _partition_config: &PartitionConfig,
833 id: &ExecutionId,
834) -> Result<Option<Vec<u8>>, EngineError> {
835 let partition_key: i16 = id.partition() as i16;
836 let execution_id = eid_uuid(id);
837
838 let row: Option<(Option<Vec<u8>>,)> = sqlx::query_as(
839 r#"
840 SELECT result
841 FROM ff_exec_core
842 WHERE partition_key = $1 AND execution_id = $2
843 "#,
844 )
845 .bind(partition_key)
846 .bind(execution_id)
847 .fetch_optional(pool)
848 .await
849 .map_err(map_sqlx_error)?;
850
851 match row {
852 None => Ok(None),
853 Some((payload,)) => Ok(payload),
854 }
855}
856