1use std::collections::HashMap;
36use std::time::{SystemTime, UNIX_EPOCH};
37
38use ff_core::contracts::decode::build_execution_snapshot;
39use ff_core::contracts::{
40 CreateExecutionArgs, ExecutionContext, ExecutionInfo, ExecutionSnapshot, ListExecutionsPage,
41};
42use ff_core::engine_error::{EngineError, ValidationKind};
43use ff_core::partition::{PartitionConfig, PartitionKey};
44use ff_core::state::{
45 AttemptState, BlockingReason, EligibilityState, LifecyclePhase, OwnershipState, PublicState,
46 StateVector, TerminalOutcome,
47};
48use ff_core::types::{ExecutionId, FlowId};
49use serde_json::Value as JsonValue;
50use sqlx::{PgPool, Row};
51use uuid::Uuid;
52
53use crate::error::map_sqlx_error;
54
55pub(crate) fn eid_uuid(eid: &ExecutionId) -> Uuid {
60 let s = eid.as_str();
61 let suffix = s
63 .split_once("}:")
64 .map(|(_, u)| u)
65 .expect("ExecutionId has `}:` separator (invariant)");
66 Uuid::parse_str(suffix).expect("ExecutionId suffix is a valid UUID (invariant)")
67}
68
69fn eid_from_parts(partition: u16, uuid: Uuid) -> Result<ExecutionId, EngineError> {
73 let s = format!("{{fp:{partition}}}:{uuid}");
74 ExecutionId::parse(&s).map_err(|e| EngineError::Validation {
75 kind: ValidationKind::Corruption,
76 detail: format!("exec_core: execution_id: could not reassemble '{s}': {e}"),
77 })
78}
79
80fn now_ms() -> i64 {
81 let d = SystemTime::now()
82 .duration_since(UNIX_EPOCH)
83 .expect("clock is after UNIX_EPOCH");
84 (d.as_millis() as i64).max(0)
85}
86
87pub(super) async fn create_execution_impl(
96 pool: &PgPool,
97 _partition_config: &PartitionConfig,
98 args: CreateExecutionArgs,
99) -> Result<ExecutionId, EngineError> {
100 let partition_key: i16 = args.execution_id.partition() as i16;
101 let execution_id = eid_uuid(&args.execution_id);
102 let lane_id = args.lane_id.as_str().to_owned();
103 let priority: i32 = args.priority;
104 let created_at_ms: i64 = args.now.0;
105 let deadline_at_ms: Option<i64> = args.execution_deadline_at.map(|t| t.0);
106
107 let mut raw: serde_json::Map<String, JsonValue> = serde_json::Map::new();
112 raw.insert(
113 "namespace".into(),
114 JsonValue::String(args.namespace.as_str().to_owned()),
115 );
116 raw.insert("execution_kind".into(), JsonValue::String(args.execution_kind));
117 raw.insert(
118 "creator_identity".into(),
119 JsonValue::String(args.creator_identity),
120 );
121 if let Some(k) = args.idempotency_key {
122 raw.insert("idempotency_key".into(), JsonValue::String(k));
123 }
124 if let Some(enc) = args.payload_encoding {
125 raw.insert("payload_encoding".into(), JsonValue::String(enc));
126 }
127 raw.insert(
130 "last_mutation_at".into(),
131 JsonValue::String(created_at_ms.to_string()),
132 );
133 raw.insert(
134 "total_attempt_count".into(),
135 JsonValue::String("0".to_owned()),
136 );
137 let tags_json: serde_json::Map<String, JsonValue> = args
139 .tags
140 .into_iter()
141 .map(|(k, v)| (k, JsonValue::String(v)))
142 .collect();
143 raw.insert("tags".into(), JsonValue::Object(tags_json));
144
145 let raw_fields = JsonValue::Object(raw);
146 let policy_json: Option<JsonValue> = match args.policy {
147 Some(p) => Some(serde_json::to_value(&p).map_err(|e| EngineError::Validation {
148 kind: ValidationKind::InvalidInput,
149 detail: format!("create_execution: policy: serialize failed: {e}"),
150 })?),
151 None => None,
152 };
153
154 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
158
159 sqlx::query(
160 r#"
161 INSERT INTO ff_exec_core (
162 partition_key, execution_id, flow_id, lane_id,
163 required_capabilities, attempt_index,
164 lifecycle_phase, ownership_state, eligibility_state,
165 public_state, attempt_state,
166 priority, created_at_ms, deadline_at_ms,
167 payload, policy, raw_fields
168 ) VALUES (
169 $1, $2, NULL, $3,
170 '{}'::text[], 0,
171 'submitted', 'unowned', 'eligible_now',
172 'waiting', 'pending',
173 $4, $5, $6,
174 $7, $8, $9
175 )
176 ON CONFLICT (partition_key, execution_id) DO NOTHING
177 "#,
178 )
179 .bind(partition_key)
180 .bind(execution_id)
181 .bind(&lane_id)
182 .bind(priority)
183 .bind(created_at_ms)
184 .bind(deadline_at_ms)
185 .bind(&args.input_payload)
186 .bind(policy_json)
187 .bind(&raw_fields)
188 .execute(&mut *tx)
189 .await
190 .map_err(map_sqlx_error)?;
191
192 sqlx::query(
194 r#"
195 INSERT INTO ff_lane_registry (lane_id, registered_at_ms, registered_by)
196 VALUES ($1, $2, $3)
197 ON CONFLICT (lane_id) DO NOTHING
198 "#,
199 )
200 .bind(&lane_id)
201 .bind(created_at_ms)
202 .bind("create_execution")
203 .execute(&mut *tx)
204 .await
205 .map_err(map_sqlx_error)?;
206
207 tx.commit().await.map_err(map_sqlx_error)?;
208
209 Ok(args.execution_id)
210}
211
212pub(super) async fn describe_execution_impl(
215 pool: &PgPool,
216 _partition_config: &PartitionConfig,
217 id: &ExecutionId,
218) -> Result<Option<ExecutionSnapshot>, EngineError> {
219 let partition_key: i16 = id.partition() as i16;
220 let execution_id = eid_uuid(id);
221
222 let row = sqlx::query(
223 r#"
224 SELECT flow_id, lane_id, public_state, blocking_reason,
225 created_at_ms, raw_fields
226 FROM ff_exec_core
227 WHERE partition_key = $1 AND execution_id = $2
228 "#,
229 )
230 .bind(partition_key)
231 .bind(execution_id)
232 .fetch_optional(pool)
233 .await
234 .map_err(map_sqlx_error)?;
235
236 let Some(row) = row else {
237 return Ok(None);
238 };
239
240 let flow_id_uuid: Option<Uuid> = row.try_get("flow_id").map_err(map_sqlx_error)?;
241 let lane_id: String = row.try_get("lane_id").map_err(map_sqlx_error)?;
242 let public_state: String = row.try_get("public_state").map_err(map_sqlx_error)?;
243 let blocking_reason: Option<String> =
244 row.try_get("blocking_reason").map_err(map_sqlx_error)?;
245 let created_at_ms: i64 = row.try_get("created_at_ms").map_err(map_sqlx_error)?;
246 let raw_fields: JsonValue = row.try_get("raw_fields").map_err(map_sqlx_error)?;
247
248 let mut core: HashMap<String, String> = HashMap::new();
252 core.insert("public_state".into(), public_state);
253 core.insert("lane_id".into(), lane_id);
254 if let Some(fid) = flow_id_uuid {
255 core.insert(
258 "flow_id".into(),
259 format!("{{fp:{part}}}:{fid}", part = id.partition()),
260 );
261 }
262 if let Some(r) = blocking_reason {
263 core.insert("blocking_reason".into(), r);
264 }
265 core.insert("created_at".into(), created_at_ms.to_string());
266
267 if let JsonValue::Object(map) = &raw_fields {
271 for key in [
272 "namespace",
273 "last_mutation_at",
274 "total_attempt_count",
275 "current_attempt_id",
276 "current_attempt_index",
277 "current_waitpoint_id",
278 "blocking_detail",
279 ] {
280 if let Some(JsonValue::String(s)) = map.get(key) {
281 core.insert(key.to_owned(), s.clone());
282 }
283 }
284 }
285
286 let tags_raw: HashMap<String, String> = match &raw_fields {
288 JsonValue::Object(map) => match map.get("tags") {
289 Some(JsonValue::Object(tag_map)) => tag_map
290 .iter()
291 .filter_map(|(k, v)| {
292 v.as_str().map(|s| (k.clone(), s.to_owned()))
293 })
294 .collect(),
295 _ => HashMap::new(),
296 },
297 _ => HashMap::new(),
298 };
299
300 build_execution_snapshot(id.clone(), &core, tags_raw)
301}
302
303pub(super) async fn read_execution_context_impl(
317 pool: &PgPool,
318 _partition_config: &PartitionConfig,
319 id: &ExecutionId,
320) -> Result<ExecutionContext, EngineError> {
321 let partition_key: i16 = id.partition() as i16;
322 let execution_id = eid_uuid(id);
323
324 let row = sqlx::query(
325 r#"
326 SELECT payload, raw_fields
327 FROM ff_exec_core
328 WHERE partition_key = $1 AND execution_id = $2
329 "#,
330 )
331 .bind(partition_key)
332 .bind(execution_id)
333 .fetch_optional(pool)
334 .await
335 .map_err(map_sqlx_error)?;
336
337 let Some(row) = row else {
338 return Err(EngineError::Validation {
339 kind: ValidationKind::InvalidInput,
340 detail: format!("read_execution_context: execution not found: {id}"),
341 });
342 };
343
344 let payload: Option<Vec<u8>> = row.try_get("payload").map_err(map_sqlx_error)?;
345 let raw_fields: JsonValue = row.try_get("raw_fields").map_err(map_sqlx_error)?;
346
347 let input_payload = payload.unwrap_or_default();
348
349 let (execution_kind, tags) = match &raw_fields {
350 JsonValue::Object(map) => {
351 let kind = map
352 .get("execution_kind")
353 .and_then(|v| v.as_str())
354 .map(|s| s.to_owned())
355 .unwrap_or_default();
356 let tags: HashMap<String, String> = match map.get("tags") {
357 Some(JsonValue::Object(tag_map)) => tag_map
358 .iter()
359 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_owned())))
360 .collect(),
361 _ => HashMap::new(),
362 };
363 (kind, tags)
364 }
365 _ => (String::new(), HashMap::new()),
366 };
367
368 Ok(ExecutionContext::new(input_payload, execution_kind, tags))
369}
370
371pub(super) async fn read_current_attempt_index_impl(
378 pool: &PgPool,
379 _partition_config: &PartitionConfig,
380 id: &ExecutionId,
381) -> Result<ff_core::types::AttemptIndex, EngineError> {
382 let partition_key: i16 = id.partition() as i16;
383 let execution_id = eid_uuid(id);
384
385 let row = sqlx::query(
386 r#"
387 SELECT attempt_index
388 FROM ff_exec_core
389 WHERE partition_key = $1 AND execution_id = $2
390 "#,
391 )
392 .bind(partition_key)
393 .bind(execution_id)
394 .fetch_optional(pool)
395 .await
396 .map_err(map_sqlx_error)?;
397
398 let Some(row) = row else {
399 return Err(EngineError::Validation {
400 kind: ValidationKind::InvalidInput,
401 detail: format!(
402 "read_current_attempt_index: execution not found: {id}"
403 ),
404 });
405 };
406 let attempt_index_i: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
407 let attempt_index =
413 ff_core::types::AttemptIndex::new(attempt_index_i.max(0) as u32);
414 Ok(attempt_index)
415}
416
417pub(super) async fn read_total_attempt_count_impl(
430 pool: &PgPool,
431 _partition_config: &PartitionConfig,
432 id: &ExecutionId,
433) -> Result<ff_core::types::AttemptIndex, EngineError> {
434 let partition_key: i16 = id.partition() as i16;
435 let execution_id = eid_uuid(id);
436
437 let row = sqlx::query(
438 r#"
439 SELECT raw_fields ->> 'total_attempt_count' AS total_attempt_count
440 FROM ff_exec_core
441 WHERE partition_key = $1 AND execution_id = $2
442 "#,
443 )
444 .bind(partition_key)
445 .bind(execution_id)
446 .fetch_optional(pool)
447 .await
448 .map_err(map_sqlx_error)?;
449
450 let Some(row) = row else {
451 return Err(EngineError::Validation {
452 kind: ValidationKind::InvalidInput,
453 detail: format!(
454 "read_total_attempt_count: execution not found: {id}"
455 ),
456 });
457 };
458 let raw: Option<String> = row
459 .try_get("total_attempt_count")
460 .map_err(map_sqlx_error)?;
461 let count = raw
462 .as_deref()
463 .and_then(|s| s.parse::<u32>().ok())
464 .unwrap_or(0);
465 Ok(ff_core::types::AttemptIndex::new(count))
466}
467
468pub(super) async fn list_executions_impl(
471 pool: &PgPool,
472 _partition_config: &PartitionConfig,
473 partition: PartitionKey,
474 cursor: Option<ExecutionId>,
475 limit: usize,
476) -> Result<ListExecutionsPage, EngineError> {
477 if limit == 0 {
478 return Ok(ListExecutionsPage::new(Vec::new(), None));
479 }
480 let parsed = partition.parse().map_err(|e| EngineError::Validation {
482 kind: ValidationKind::InvalidInput,
483 detail: format!("list_executions: partition: '{partition}': {e}"),
484 })?;
485 let partition_key: i16 = parsed.index as i16;
486 let cursor_uuid: Option<Uuid> = cursor.as_ref().map(eid_uuid);
487
488 let effective_limit = limit.min(1000);
490 let fetch_limit: i64 = (effective_limit as i64).saturating_add(1);
491
492 let rows = sqlx::query(
493 r#"
494 SELECT execution_id
495 FROM ff_exec_core
496 WHERE partition_key = $1
497 AND ($2::uuid IS NULL OR execution_id > $2)
498 ORDER BY execution_id ASC
499 LIMIT $3
500 "#,
501 )
502 .bind(partition_key)
503 .bind(cursor_uuid)
504 .bind(fetch_limit)
505 .fetch_all(pool)
506 .await
507 .map_err(map_sqlx_error)?;
508
509 let mut ids: Vec<ExecutionId> = Vec::with_capacity(rows.len());
510 for row in &rows {
511 let u: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
512 ids.push(eid_from_parts(parsed.index, u)?);
513 }
514
515 let has_more = ids.len() > effective_limit;
516 if has_more {
517 ids.truncate(effective_limit);
518 }
519 let next_cursor = if has_more { ids.last().cloned() } else { None };
520 Ok(ListExecutionsPage::new(ids, next_cursor))
521}
522
523pub(super) async fn cancel_impl(
533 pool: &PgPool,
534 _partition_config: &PartitionConfig,
535 execution_id: &ExecutionId,
536 reason: &str,
537) -> Result<(), EngineError> {
538 let partition_key: i16 = execution_id.partition() as i16;
539 let eid_uuid = eid_uuid(execution_id);
540 let now = now_ms();
541
542 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
543
544 let current: Option<(String, String)> = sqlx::query_as(
545 r#"
546 SELECT lifecycle_phase, public_state
547 FROM ff_exec_core
548 WHERE partition_key = $1 AND execution_id = $2
549 FOR UPDATE
550 "#,
551 )
552 .bind(partition_key)
553 .bind(eid_uuid)
554 .fetch_optional(&mut *tx)
555 .await
556 .map_err(map_sqlx_error)?;
557
558 let Some((lifecycle_phase, public_state)) = current else {
559 tx.rollback().await.map_err(map_sqlx_error)?;
562 return Err(EngineError::Validation {
563 kind: ValidationKind::InvalidInput,
564 detail: format!(
565 "cancel: execution_id={execution_id}: row not found on partition_key={partition_key}"
566 ),
567 });
568 };
569
570 if lifecycle_phase == "terminal" {
573 tx.rollback().await.map_err(map_sqlx_error)?;
574 return if public_state == "cancelled" {
578 Ok(())
579 } else {
580 Err(EngineError::Validation {
581 kind: ValidationKind::InvalidInput,
582 detail: format!(
583 "cancel: execution_id={execution_id}: already terminal in state '{public_state}'"
584 ),
585 })
586 };
587 }
588
589 sqlx::query(
590 r#"
591 UPDATE ff_exec_core
592 SET lifecycle_phase = 'terminal',
593 ownership_state = 'unowned',
594 eligibility_state = 'not_applicable',
595 public_state = 'cancelled',
596 attempt_state = 'cancelled',
597 terminal_at_ms = $3,
598 cancellation_reason = $4,
599 cancelled_by = 'worker',
600 raw_fields = jsonb_set(raw_fields, '{last_mutation_at}', to_jsonb($3::text))
601 WHERE partition_key = $1 AND execution_id = $2
602 "#,
603 )
604 .bind(partition_key)
605 .bind(eid_uuid)
606 .bind(now)
607 .bind(reason)
608 .execute(&mut *tx)
609 .await
610 .map_err(map_sqlx_error)?;
611
612 sqlx::query(
618 r#"
619 UPDATE ff_attempt
620 SET outcome = NULL
621 WHERE partition_key = $1
622 AND execution_id = $2
623 AND attempt_index = (SELECT attempt_index FROM ff_exec_core
624 WHERE partition_key = $1 AND execution_id = $2)
625 "#,
626 )
627 .bind(partition_key)
628 .bind(eid_uuid)
629 .execute(&mut *tx)
630 .await
631 .map_err(map_sqlx_error)?;
632
633 tx.commit().await.map_err(map_sqlx_error)?;
634 Ok(())
635}
636
637pub(super) async fn resolve_execution_flow_id_impl(
640 pool: &PgPool,
641 _partition_config: &PartitionConfig,
642 eid: &ExecutionId,
643) -> Result<Option<FlowId>, EngineError> {
644 let partition_key: i16 = eid.partition() as i16;
645 let execution_id = eid_uuid(eid);
646
647 let row: Option<(Option<Uuid>,)> = sqlx::query_as(
648 r#"
649 SELECT flow_id
650 FROM ff_exec_core
651 WHERE partition_key = $1 AND execution_id = $2
652 "#,
653 )
654 .bind(partition_key)
655 .bind(execution_id)
656 .fetch_optional(pool)
657 .await
658 .map_err(map_sqlx_error)?;
659
660 let Some((maybe_fid,)) = row else {
661 return Ok(None);
662 };
663 let Some(fid_uuid) = maybe_fid else {
664 return Ok(None);
665 };
666 let s = fid_uuid.to_string();
667 FlowId::parse(&s)
668 .map(Some)
669 .map_err(|e| EngineError::Validation {
670 kind: ValidationKind::Corruption,
671 detail: format!(
672 "resolve_execution_flow_id: exec_core.flow_id='{s}' is not a valid FlowId: {e}"
673 ),
674 })
675}
676
677fn normalise_lifecycle_phase(raw: &str) -> &str {
745 match raw {
746 "cancelled" | "terminal" => "terminal",
749 "pending" | "runnable" | "eligible" | "blocked" => "runnable",
754 "active" => "active",
755 "suspended" => "suspended",
756 "submitted" => "submitted",
757 other => other,
758 }
759}
760
761fn normalise_ownership_state(raw: &str) -> &str {
762 match raw {
763 "released" | "unowned" => "unowned",
766 "leased" => "leased",
767 "lease_expired_reclaimable" => "lease_expired_reclaimable",
768 "lease_revoked" => "lease_revoked",
769 other => other,
770 }
771}
772
773fn normalise_eligibility_state(raw: &str) -> &str {
774 match raw {
775 "cancelled" => "not_applicable",
777 "pending_claim" => "eligible_now",
782 other => other,
783 }
784}
785
786fn normalise_attempt_state(raw: &str) -> &str {
787 match raw {
788 "pending" | "pending_claim" => "pending_first_attempt",
792 "running" => "running_attempt",
795 "cancelled" => "attempt_terminal",
798 other => other,
799 }
800}
801
802fn normalise_public_state(raw: &str) -> &str {
805 match raw {
806 "running" => "active",
810 other => other,
811 }
812}
813
814macro_rules! json_enum {
815 ($ty:ty, $field:expr, $raw:expr) => {{
816 let quoted = format!("\"{}\"", $raw);
817 serde_json::from_str::<$ty>("ed).map_err(|e| EngineError::Validation {
818 kind: ValidationKind::Corruption,
819 detail: format!(
820 "exec_core: {}: '{}' is not a known value: {}",
821 $field, $raw, e
822 ),
823 })
824 }};
825}
826
827fn derive_terminal_outcome(
831 phase_norm: &str,
832 phase_raw: &str,
833 attempt_outcome: Option<&str>,
834) -> TerminalOutcome {
835 if phase_norm != "terminal" {
836 return TerminalOutcome::None;
837 }
838 if phase_raw == "cancelled" {
839 return TerminalOutcome::Cancelled;
840 }
841 match attempt_outcome {
842 Some("success") => TerminalOutcome::Success,
843 Some("failed") => TerminalOutcome::Failed,
844 Some("cancelled") => TerminalOutcome::Cancelled,
845 Some("expired") => TerminalOutcome::Expired,
846 Some("skipped") => TerminalOutcome::Skipped,
847 _ => TerminalOutcome::None,
848 }
849}
850
851pub(super) async fn read_execution_state_impl(
854 pool: &PgPool,
855 _partition_config: &PartitionConfig,
856 id: &ExecutionId,
857) -> Result<Option<PublicState>, EngineError> {
858 let partition_key: i16 = id.partition() as i16;
859 let execution_id = eid_uuid(id);
860
861 let row: Option<(String,)> = sqlx::query_as(
862 r#"
863 SELECT public_state
864 FROM ff_exec_core
865 WHERE partition_key = $1 AND execution_id = $2
866 "#,
867 )
868 .bind(partition_key)
869 .bind(execution_id)
870 .fetch_optional(pool)
871 .await
872 .map_err(map_sqlx_error)?;
873
874 let Some((raw,)) = row else {
875 return Ok(None);
876 };
877 let parsed: PublicState =
878 json_enum!(PublicState, "public_state", normalise_public_state(&raw))?;
879 Ok(Some(parsed))
880}
881
882pub(super) async fn read_execution_info_impl(
888 pool: &PgPool,
889 _partition_config: &PartitionConfig,
890 id: &ExecutionId,
891) -> Result<Option<ExecutionInfo>, EngineError> {
892 let partition_key: i16 = id.partition() as i16;
893 let execution_id = eid_uuid(id);
894
895 let row = sqlx::query(
902 r#"
903 SELECT ec.flow_id,
904 ec.lane_id,
905 ec.priority,
906 ec.lifecycle_phase,
907 ec.ownership_state,
908 ec.eligibility_state,
909 ec.public_state,
910 ec.attempt_state,
911 ec.blocking_reason,
912 ec.attempt_index,
913 ec.created_at_ms,
914 ec.terminal_at_ms,
915 ec.started_at_ms,
916 ec.raw_fields,
917 cur.outcome AS attempt_outcome
918 FROM ff_exec_core ec
919 LEFT JOIN LATERAL (
920 SELECT outcome
921 FROM ff_attempt
922 WHERE partition_key = ec.partition_key
923 AND execution_id = ec.execution_id
924 AND attempt_index = ec.attempt_index
925 ) cur ON TRUE
926 WHERE ec.partition_key = $1 AND ec.execution_id = $2
927 "#,
928 )
929 .bind(partition_key)
930 .bind(execution_id)
931 .fetch_optional(pool)
932 .await
933 .map_err(map_sqlx_error)?;
934
935 let Some(row) = row else {
936 return Ok(None);
937 };
938
939 let flow_id_uuid: Option<Uuid> = row.try_get("flow_id").map_err(map_sqlx_error)?;
940 let lane_id: String = row.try_get("lane_id").map_err(map_sqlx_error)?;
941 let priority: i32 = row.try_get("priority").map_err(map_sqlx_error)?;
942 let lifecycle_phase_raw: String =
943 row.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
944 let ownership_state_raw: String =
945 row.try_get("ownership_state").map_err(map_sqlx_error)?;
946 let eligibility_state_raw: String =
947 row.try_get("eligibility_state").map_err(map_sqlx_error)?;
948 let public_state_raw: String = row.try_get("public_state").map_err(map_sqlx_error)?;
949 let attempt_state_raw: String = row.try_get("attempt_state").map_err(map_sqlx_error)?;
950 let blocking_reason_opt: Option<String> =
951 row.try_get("blocking_reason").map_err(map_sqlx_error)?;
952 let attempt_index: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
953 let created_at_ms: i64 = row.try_get("created_at_ms").map_err(map_sqlx_error)?;
954 let terminal_at_ms_opt: Option<i64> =
955 row.try_get("terminal_at_ms").map_err(map_sqlx_error)?;
956 let raw_fields: JsonValue = row.try_get("raw_fields").map_err(map_sqlx_error)?;
957 let attempt_outcome_opt: Option<String> =
958 row.try_get("attempt_outcome").map_err(map_sqlx_error)?;
959 let started_at_ms_opt: Option<i64> =
960 row.try_get("started_at_ms").map_err(map_sqlx_error)?;
961
962 let lifecycle_phase: LifecyclePhase = json_enum!(
963 LifecyclePhase,
964 "lifecycle_phase",
965 normalise_lifecycle_phase(&lifecycle_phase_raw)
966 )?;
967 let ownership_state: OwnershipState = json_enum!(
968 OwnershipState,
969 "ownership_state",
970 normalise_ownership_state(&ownership_state_raw)
971 )?;
972 let eligibility_state: EligibilityState = json_enum!(
973 EligibilityState,
974 "eligibility_state",
975 normalise_eligibility_state(&eligibility_state_raw)
976 )?;
977 let public_state: PublicState = json_enum!(
978 PublicState,
979 "public_state",
980 normalise_public_state(&public_state_raw)
981 )?;
982 let attempt_state: AttemptState = json_enum!(
983 AttemptState,
984 "attempt_state",
985 normalise_attempt_state(&attempt_state_raw)
986 )?;
987 let blocking_reason: BlockingReason = match blocking_reason_opt
988 .as_deref()
989 .filter(|s| !s.is_empty())
990 {
991 None => BlockingReason::None,
992 Some(raw) => json_enum!(BlockingReason, "blocking_reason", raw)?,
993 };
994 let terminal_outcome = derive_terminal_outcome(
995 normalise_lifecycle_phase(&lifecycle_phase_raw),
996 &lifecycle_phase_raw,
997 attempt_outcome_opt.as_deref(),
998 );
999
1000 let state_vector = StateVector {
1001 lifecycle_phase,
1002 ownership_state,
1003 eligibility_state,
1004 blocking_reason,
1005 terminal_outcome,
1006 attempt_state,
1007 public_state,
1008 };
1009
1010 let mut namespace = String::new();
1013 let mut execution_kind = String::new();
1014 let mut blocking_detail = String::new();
1015 if let JsonValue::Object(map) = &raw_fields {
1016 if let Some(JsonValue::String(s)) = map.get("namespace") {
1017 namespace = s.clone();
1018 }
1019 if let Some(JsonValue::String(s)) = map.get("execution_kind") {
1020 execution_kind = s.clone();
1021 }
1022 if let Some(JsonValue::String(s)) = map.get("blocking_detail") {
1023 blocking_detail = s.clone();
1024 }
1025 }
1026
1027 let flow_id = flow_id_uuid.map(|fid| fid.to_string());
1031
1032 Ok(Some(ExecutionInfo {
1033 execution_id: id.clone(),
1034 namespace,
1035 lane_id,
1036 priority,
1037 execution_kind,
1038 state_vector,
1039 public_state,
1040 created_at: created_at_ms.to_string(),
1041 started_at: started_at_ms_opt.map(|v| v.to_string()),
1042 completed_at: terminal_at_ms_opt.map(|v| v.to_string()),
1043 current_attempt_index: attempt_index.max(0) as u32,
1044 flow_id,
1045 blocking_detail,
1046 }))
1047}
1048
1049pub(super) async fn get_execution_result_impl(
1055 pool: &PgPool,
1056 _partition_config: &PartitionConfig,
1057 id: &ExecutionId,
1058) -> Result<Option<Vec<u8>>, EngineError> {
1059 let partition_key: i16 = id.partition() as i16;
1060 let execution_id = eid_uuid(id);
1061
1062 let row: Option<(Option<Vec<u8>>,)> = sqlx::query_as(
1063 r#"
1064 SELECT result
1065 FROM ff_exec_core
1066 WHERE partition_key = $1 AND execution_id = $2
1067 "#,
1068 )
1069 .bind(partition_key)
1070 .bind(execution_id)
1071 .fetch_optional(pool)
1072 .await
1073 .map_err(map_sqlx_error)?;
1074
1075 match row {
1076 None => Ok(None),
1077 Some((payload,)) => Ok(payload),
1078 }
1079}
1080
1081
1082pub(super) async fn set_execution_tag_impl(
1091 pool: &PgPool,
1092 id: &ExecutionId,
1093 key: &str,
1094 value: &str,
1095) -> Result<(), EngineError> {
1096 let partition_key: i16 = id.partition() as i16;
1097 let execution_id = eid_uuid(id);
1098
1099 let result = sqlx::query(
1100 r#"
1101 UPDATE ff_exec_core
1102 SET raw_fields = jsonb_set(
1103 COALESCE(raw_fields, '{}'::jsonb),
1104 ARRAY['tags', $3::text],
1105 to_jsonb($4::text),
1106 true
1107 )
1108 WHERE partition_key = $1 AND execution_id = $2
1109 "#,
1110 )
1111 .bind(partition_key)
1112 .bind(execution_id)
1113 .bind(key)
1114 .bind(value)
1115 .execute(pool)
1116 .await
1117 .map_err(map_sqlx_error)?;
1118
1119 if result.rows_affected() == 0 {
1120 return Err(EngineError::NotFound {
1121 entity: "execution",
1122 });
1123 }
1124 Ok(())
1125}
1126
1127pub(super) async fn get_execution_tag_impl(
1131 pool: &PgPool,
1132 id: &ExecutionId,
1133 key: &str,
1134) -> Result<Option<String>, EngineError> {
1135 let partition_key: i16 = id.partition() as i16;
1136 let execution_id = eid_uuid(id);
1137
1138 let row: Option<(Option<String>,)> = sqlx::query_as(
1139 r#"
1140 SELECT raw_fields->'tags'->>$3
1141 FROM ff_exec_core
1142 WHERE partition_key = $1 AND execution_id = $2
1143 "#,
1144 )
1145 .bind(partition_key)
1146 .bind(execution_id)
1147 .bind(key)
1148 .fetch_optional(pool)
1149 .await
1150 .map_err(map_sqlx_error)?;
1151
1152 Ok(row.and_then(|(tag,)| tag))
1153}
1154
1155pub(super) async fn get_execution_namespace_impl(
1161 pool: &PgPool,
1162 id: &ExecutionId,
1163) -> Result<Option<String>, EngineError> {
1164 let partition_key: i16 = id.partition() as i16;
1165 let execution_id = eid_uuid(id);
1166
1167 let row: Option<(Option<String>,)> = sqlx::query_as(
1168 r#"
1169 SELECT raw_fields->>'namespace'
1170 FROM ff_exec_core
1171 WHERE partition_key = $1 AND execution_id = $2
1172 "#,
1173 )
1174 .bind(partition_key)
1175 .bind(execution_id)
1176 .fetch_optional(pool)
1177 .await
1178 .map_err(map_sqlx_error)?;
1179
1180 Ok(row.and_then(|(ns,)| ns))
1181}
1182
1183const RETENTION_SIBLING_TABLES_UUID: &[&str] = &[
1213 "ff_attempt",
1214 "ff_claim_grant",
1215 "ff_completion_event",
1216 "ff_stream_frame",
1217 "ff_stream_meta",
1218 "ff_stream_summary",
1219 "ff_suspension_current",
1220 "ff_waitpoint_pending",
1221];
1222
1223const RETENTION_SIBLING_TABLES_TEXT: &[&str] = &[
1224 "ff_cancel_backlog_member",
1225 "ff_lease_event",
1226 "ff_operator_event",
1227 "ff_quota_admitted",
1228 "ff_quota_window",
1229 "ff_signal_event",
1230];
1231
1232pub(super) async fn trim_retention_impl(
1237 pool: &PgPool,
1238 partition_key: i16,
1239 lane_id: &str,
1240 retention_ms: u64,
1241 now_ms: i64,
1242 batch_size: u32,
1243 filter: &ff_core::backend::ScannerFilter,
1244) -> Result<u32, EngineError> {
1245 let retention_i64 = i64::try_from(retention_ms).unwrap_or(i64::MAX);
1246 let cutoff = now_ms.saturating_sub(retention_i64);
1247 let limit_i64 = i64::from(batch_size);
1248
1249 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
1250
1251 let namespace_opt = filter.namespace.as_ref().map(|n| n.as_str().to_owned());
1262 let instance_tag_opt = filter
1263 .instance_tag
1264 .as_ref()
1265 .map(|(k, v)| (k.clone(), v.clone()));
1266
1267 let rows: Vec<(Uuid,)> = sqlx::query_as(
1268 r#"
1269 SELECT execution_id
1270 FROM ff_exec_core
1271 WHERE partition_key = $1
1272 AND lane_id = $2
1273 AND lifecycle_phase = 'terminal'
1274 AND terminal_at_ms IS NOT NULL
1275 AND terminal_at_ms <= $3
1276 AND ($5::text IS NULL OR raw_fields->>'namespace' = $5)
1277 AND ($6::text IS NULL OR raw_fields->'tags'->>$6 = $7)
1278 ORDER BY terminal_at_ms
1279 LIMIT $4
1280 "#,
1281 )
1282 .bind(partition_key)
1283 .bind(lane_id)
1284 .bind(cutoff)
1285 .bind(limit_i64)
1286 .bind(namespace_opt.as_deref())
1287 .bind(instance_tag_opt.as_ref().map(|(k, _)| k.as_str()))
1288 .bind(instance_tag_opt.as_ref().map(|(_, v)| v.as_str()))
1289 .fetch_all(&mut *tx)
1290 .await
1291 .map_err(map_sqlx_error)?;
1292
1293 if rows.is_empty() {
1294 tx.commit().await.map_err(map_sqlx_error)?;
1295 return Ok(0);
1296 }
1297
1298 let eids: Vec<Uuid> = rows.into_iter().map(|(e,)| e).collect();
1299 let eid_texts: Vec<String> = eids.iter().map(Uuid::to_string).collect();
1303
1304 for table in RETENTION_SIBLING_TABLES_UUID {
1310 let sql = format!(
1311 "DELETE FROM {} WHERE partition_key = $1 AND execution_id = ANY($2::uuid[])",
1312 table
1313 );
1314 sqlx::query(&sql)
1315 .bind(partition_key)
1316 .bind(&eids)
1317 .execute(&mut *tx)
1318 .await
1319 .map_err(map_sqlx_error)?;
1320 }
1321
1322 for table in RETENTION_SIBLING_TABLES_TEXT {
1323 let sql = format!(
1324 "DELETE FROM {} WHERE partition_key = $1 AND execution_id = ANY($2::text[])",
1325 table
1326 );
1327 sqlx::query(&sql)
1328 .bind(partition_key)
1329 .bind(&eid_texts)
1330 .execute(&mut *tx)
1331 .await
1332 .map_err(map_sqlx_error)?;
1333 }
1334
1335 let deleted = sqlx::query(
1336 "DELETE FROM ff_exec_core \
1337 WHERE partition_key = $1 AND execution_id = ANY($2::uuid[])",
1338 )
1339 .bind(partition_key)
1340 .bind(&eids)
1341 .execute(&mut *tx)
1342 .await
1343 .map_err(map_sqlx_error)?
1344 .rows_affected();
1345
1346 tx.commit().await.map_err(map_sqlx_error)?;
1347
1348 Ok(u32::try_from(deleted).unwrap_or(u32::MAX))
1349}