Skip to main content

ff_backend_postgres/
suspend_ops.rs

1//! Suspend / deliver_signal / claim_resumed / observe_signals SQL bodies.
2//!
3//! **Wave 4d follow-up.** Builds on PR #238's primitives:
4//!
5//! * [`crate::signal::hmac_sign`] / [`crate::signal::hmac_verify`]
6//! * [`crate::signal::SERIALIZABLE_RETRY_BUDGET`] +
7//!   [`crate::signal::is_retryable_serialization`]
8//! * [`crate::suspend::evaluate`] — composite-condition evaluator
9//!
10//! HMAC signing, retry looping, and composite evaluation are never
11//! re-implemented here.
12//!
13//! # Isolation
14//!
15//! `suspend` + `deliver_signal` run at SERIALIZABLE with a 3-attempt
16//! retry budget (Q11). On retry exhaustion they surface
17//! `Contention(RetryExhausted)`. `claim_resumed_execution` +
18//! `observe_signals` stay at READ COMMITTED (row-level `FOR UPDATE`
19//! for the former, read-only for the latter).
20
21use std::collections::HashMap;
22use std::time::{SystemTime, UNIX_EPOCH};
23
24use ff_core::backend::{BackendTag, Handle, HandleKind, HandleOpaque, ResumeSignal, WaitpointHmac};
25use ff_core::contracts::{
26    AdditionalWaitpointBinding, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
27    ClaimedResumedExecution, DeliverSignalArgs, DeliverSignalResult, ResumeCondition, SuspendArgs,
28    SuspendOutcome, SuspendOutcomeDetails, WaitpointBinding,
29};
30use ff_core::engine_error::{ContentionKind, EngineError, ValidationKind};
31use ff_core::handle_codec::{encode as encode_opaque, HandlePayload};
32use ff_core::partition::PartitionConfig;
33use ff_core::types::{
34    AttemptId, AttemptIndex, ExecutionId, LeaseEpoch, SignalId, SuspensionId, TimestampMs,
35    WaitpointId,
36};
37use serde_json::{json, Value as JsonValue};
38use sqlx::{PgPool, Postgres, Transaction};
39use uuid::Uuid;
40
41use crate::error::map_sqlx_error;
42use crate::signal::{hmac_sign, hmac_verify, is_retryable_serialization, SERIALIZABLE_RETRY_BUDGET};
43use crate::suspend::evaluate;
44
45// ─── small shared helpers ────────────────────────────────────────────────
46
47fn now_ms() -> i64 {
48    let d = SystemTime::now()
49        .duration_since(UNIX_EPOCH)
50        .unwrap_or_default();
51    (d.as_millis() as i64).max(0)
52}
53
54fn split_exec_id(eid: &ExecutionId) -> Result<(i16, Uuid), EngineError> {
55    let s = eid.as_str();
56    let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
57        kind: ValidationKind::InvalidInput,
58        detail: format!("execution_id missing `{{fp:` prefix: {s}"),
59    })?;
60    let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
61        kind: ValidationKind::InvalidInput,
62        detail: format!("execution_id missing `}}:`: {s}"),
63    })?;
64    let part: i16 = rest[..close]
65        .parse()
66        .map_err(|_| EngineError::Validation {
67            kind: ValidationKind::InvalidInput,
68            detail: format!("execution_id partition index not u16: {s}"),
69        })?;
70    let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
71        kind: ValidationKind::InvalidInput,
72        detail: format!("execution_id UUID invalid: {s}"),
73    })?;
74    Ok((part, uuid))
75}
76
77fn decode_handle(handle: &Handle) -> Result<HandlePayload, EngineError> {
78    if handle.backend != BackendTag::Postgres {
79        return Err(EngineError::Validation {
80            kind: ValidationKind::HandleFromOtherBackend,
81            detail: format!("expected Postgres, got {:?}", handle.backend),
82        });
83    }
84    let decoded = ff_core::handle_codec::decode(&handle.opaque)?;
85    if decoded.tag != BackendTag::Postgres {
86        return Err(EngineError::Validation {
87            kind: ValidationKind::HandleFromOtherBackend,
88            detail: format!("embedded tag {:?}", decoded.tag),
89        });
90    }
91    Ok(decoded.payload)
92}
93
94fn wp_uuid(w: &WaitpointId) -> Result<Uuid, EngineError> {
95    Uuid::parse_str(&w.to_string()).map_err(|e| EngineError::Validation {
96        kind: ValidationKind::InvalidInput,
97        detail: format!("waitpoint_id not a UUID: {e}"),
98    })
99}
100
101fn susp_uuid(s: &SuspensionId) -> Result<Uuid, EngineError> {
102    Uuid::parse_str(&s.to_string()).map_err(|e| EngineError::Validation {
103        kind: ValidationKind::InvalidInput,
104        detail: format!("suspension_id not a UUID: {e}"),
105    })
106}
107
108// ─── SERIALIZABLE retry loop ─────────────────────────────────────────────
109
110/// Return true if an `EngineError::Transport` carries a sqlx
111/// serialization-failure SQLSTATE. `map_sqlx_error` stringifies the
112/// underlying code; we match defensively on both the raw codes
113/// (40001/40P01) and the symbolic labels.
114fn is_retryable_engine(err: &EngineError) -> bool {
115    match err {
116        EngineError::Transport { source, .. } => {
117            let s = source.to_string();
118            s.contains("40001")
119                || s.contains("40P01")
120                || s.contains("serialization_failure")
121                || s.contains("deadlock_detected")
122        }
123        // `map_sqlx_error` collapses serialization_failure (40001) +
124        // deadlock_detected (40P01) into `Contention(LeaseConflict)`.
125        // Inside a SERIALIZABLE retry loop those are retryable; the
126        // explicit in-body fence-mismatch case is NOT (a bumped epoch
127        // won't unbump on retry). We can't distinguish them by
128        // discriminant alone, so the retry loop treats LeaseConflict
129        // as retryable — a genuine fence mismatch will still bail
130        // after budget exhaustion as RetryExhausted, which callers
131        // reconcile by re-reading exec_core.
132        EngineError::Contention(ContentionKind::LeaseConflict) => true,
133        _ => false,
134    }
135}
136
137/// Run `op` inside a SERIALIZABLE transaction, retrying up to
138/// [`SERIALIZABLE_RETRY_BUDGET`] times on retryable faults. On
139/// exhaustion returns `Contention(RetryExhausted)`.
140async fn run_serializable<T, F>(pool: &PgPool, mut op: F) -> Result<T, EngineError>
141where
142    T: Send,
143    F: for<'a> FnMut(
144            &'a mut Transaction<'_, Postgres>,
145        ) -> std::pin::Pin<
146            Box<dyn std::future::Future<Output = Result<T, EngineError>> + Send + 'a>,
147        > + Send,
148{
149    for _ in 0..SERIALIZABLE_RETRY_BUDGET {
150        let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
151        sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
152            .execute(&mut *tx)
153            .await
154            .map_err(map_sqlx_error)?;
155        let body_res = op(&mut tx).await;
156        match body_res {
157            Ok(v) => match tx.commit().await {
158                Ok(()) => return Ok(v),
159                Err(e) if is_retryable_serialization(&e) => continue,
160                Err(e) => return Err(map_sqlx_error(e)),
161            },
162            Err(e) if is_retryable_engine(&e) => {
163                let _ = tx.rollback().await;
164                continue;
165            }
166            Err(e) => {
167                let _ = tx.rollback().await;
168                return Err(e);
169            }
170        }
171    }
172    Err(EngineError::Contention(ContentionKind::RetryExhausted))
173}
174
175// ─── dedup outcome (de)serialization ─────────────────────────────────────
176
177fn outcome_to_dedup_json(outcome: &SuspendOutcome) -> JsonValue {
178    let details = outcome.details();
179    let extras: Vec<JsonValue> = details
180        .additional_waitpoints
181        .iter()
182        .map(|e| {
183            json!({
184                "waitpoint_id": e.waitpoint_id.to_string(),
185                "waitpoint_key": e.waitpoint_key,
186                "token": e.waitpoint_token.as_str(),
187            })
188        })
189        .collect();
190    let (variant, handle_opaque) = match outcome {
191        SuspendOutcome::Suspended { handle, .. } => {
192            ("Suspended", Some(hex::encode(handle.opaque.as_bytes())))
193        }
194        SuspendOutcome::AlreadySatisfied { .. } => ("AlreadySatisfied", None),
195        _ => ("Suspended", None),
196    };
197    json!({
198        "variant": variant,
199        "details": {
200            "suspension_id": details.suspension_id.to_string(),
201            "waitpoint_id": details.waitpoint_id.to_string(),
202            "waitpoint_key": details.waitpoint_key,
203            "token": details.waitpoint_token.as_str(),
204            "extras": extras,
205        },
206        "handle_opaque_hex": handle_opaque,
207    })
208}
209
210fn outcome_from_dedup_json(v: &JsonValue) -> Result<SuspendOutcome, EngineError> {
211    let corrupt = |s: String| EngineError::Validation {
212        kind: ValidationKind::Corruption,
213        detail: s,
214    };
215    let det = &v["details"];
216    let suspension_id = SuspensionId::parse(det["suspension_id"].as_str().unwrap_or(""))
217        .map_err(|e| corrupt(format!("dedup suspension_id: {e}")))?;
218    let waitpoint_id = WaitpointId::parse(det["waitpoint_id"].as_str().unwrap_or(""))
219        .map_err(|e| corrupt(format!("dedup waitpoint_id: {e}")))?;
220    let waitpoint_key = det["waitpoint_key"].as_str().unwrap_or("").to_owned();
221    let token = det["token"].as_str().unwrap_or("").to_owned();
222    let mut extras: Vec<AdditionalWaitpointBinding> = Vec::new();
223    if let Some(arr) = det["extras"].as_array() {
224        for e in arr {
225            let wid = WaitpointId::parse(e["waitpoint_id"].as_str().unwrap_or(""))
226                .map_err(|err| corrupt(format!("dedup extra wp_id: {err}")))?;
227            let wkey = e["waitpoint_key"].as_str().unwrap_or("").to_owned();
228            let tok = e["token"].as_str().unwrap_or("").to_owned();
229            extras.push(AdditionalWaitpointBinding::new(
230                wid,
231                wkey,
232                WaitpointHmac::new(tok),
233            ));
234        }
235    }
236    let details = SuspendOutcomeDetails::new(
237        suspension_id,
238        waitpoint_id,
239        waitpoint_key,
240        WaitpointHmac::new(token),
241    )
242    .with_additional_waitpoints(extras);
243
244    match v["variant"].as_str().unwrap_or("Suspended") {
245        "AlreadySatisfied" => Ok(SuspendOutcome::AlreadySatisfied { details }),
246        _ => {
247            let opaque_hex = v["handle_opaque_hex"].as_str().unwrap_or("");
248            let bytes = hex::decode(opaque_hex)
249                .map_err(|e| corrupt(format!("dedup handle hex: {e}")))?;
250            let opaque = HandleOpaque::new(bytes.into_boxed_slice());
251            let handle = Handle::new(BackendTag::Postgres, HandleKind::Suspended, opaque);
252            Ok(SuspendOutcome::Suspended { details, handle })
253        }
254    }
255}
256
257// ─── suspend ─────────────────────────────────────────────────────────────
258
259pub(crate) async fn suspend_impl(
260    pool: &PgPool,
261    _partition_config: &PartitionConfig,
262    handle: &Handle,
263    args: SuspendArgs,
264) -> Result<SuspendOutcome, EngineError> {
265    let payload = decode_handle(handle)?;
266    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
267    let attempt_index_i = i32::try_from(payload.attempt_index.0).unwrap_or(0);
268    let expected_epoch = payload.lease_epoch.0;
269    let idem_key = args.idempotency_key.as_ref().map(|k| k.as_str().to_owned());
270
271    run_serializable(pool, move |tx| {
272        let args = args.clone();
273        let idem = idem_key.clone();
274        let payload = payload.clone();
275        Box::pin(async move {
276            // 1. Dedup replay check.
277            if let Some(key) = idem.as_deref() {
278                let row: Option<(JsonValue,)> = sqlx::query_as(
279                    "SELECT outcome_json FROM ff_suspend_dedup \
280                     WHERE partition_key = $1 AND idempotency_key = $2",
281                )
282                .bind(part)
283                .bind(key)
284                .fetch_optional(&mut **tx)
285                .await
286                .map_err(map_sqlx_error)?;
287                if let Some((cached,)) = row {
288                    return outcome_from_dedup_json(&cached);
289                }
290            }
291
292            // 2. Fence check against ff_attempt.
293            let epoch_row: Option<(i64,)> = sqlx::query_as(
294                "SELECT lease_epoch FROM ff_attempt \
295                 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3 \
296                 FOR UPDATE",
297            )
298            .bind(part)
299            .bind(exec_uuid)
300            .bind(attempt_index_i)
301            .fetch_optional(&mut **tx)
302            .await
303            .map_err(map_sqlx_error)?;
304            let observed_epoch: u64 = match epoch_row {
305                Some((e,)) => u64::try_from(e).unwrap_or(0),
306                None => return Err(EngineError::NotFound { entity: "attempt" }),
307            };
308            if observed_epoch != expected_epoch {
309                return Err(EngineError::Contention(ContentionKind::LeaseConflict));
310            }
311
312            // 3. Resolve the active HMAC kid inside the txn.
313            let kid_row: Option<(String, Vec<u8>)> = sqlx::query_as(
314                "SELECT kid, secret FROM ff_waitpoint_hmac \
315                 WHERE active = TRUE \
316                 ORDER BY rotated_at_ms DESC LIMIT 1",
317            )
318            .fetch_optional(&mut **tx)
319            .await
320            .map_err(map_sqlx_error)?;
321            let (kid, secret) = kid_row.ok_or_else(|| EngineError::Validation {
322                kind: ValidationKind::InvalidInput,
323                detail: "ff_waitpoint_hmac empty — rotate a kid before suspend".into(),
324            })?;
325
326            // 4. Sign + insert waitpoint_pending for each binding.
327            let now = args.now.0;
328            let mut signed: Vec<(WaitpointId, String, String)> = Vec::new();
329            for binding in args.waitpoints.iter() {
330                let (wp_id, wp_key) = match binding {
331                    WaitpointBinding::Fresh {
332                        waitpoint_id,
333                        waitpoint_key,
334                    } => (waitpoint_id.clone(), waitpoint_key.clone()),
335                    WaitpointBinding::UsePending { waitpoint_id } => {
336                        let row: Option<(String,)> = sqlx::query_as(
337                            "SELECT waitpoint_key FROM ff_waitpoint_pending \
338                             WHERE partition_key = $1 AND waitpoint_id = $2",
339                        )
340                        .bind(part)
341                        .bind(wp_uuid(waitpoint_id)?)
342                        .fetch_optional(&mut **tx)
343                        .await
344                        .map_err(map_sqlx_error)?;
345                        let wp_key = row.map(|(k,)| k).unwrap_or_default();
346                        (waitpoint_id.clone(), wp_key)
347                    }
348                    _ => {
349                        return Err(EngineError::Validation {
350                            kind: ValidationKind::InvalidInput,
351                            detail: "unsupported WaitpointBinding variant".into(),
352                        });
353                    }
354                };
355                let msg = format!("{}:{}", payload.execution_id, wp_id);
356                let token = hmac_sign(&secret, &kid, msg.as_bytes());
357                sqlx::query(
358                    "INSERT INTO ff_waitpoint_pending \
359                       (partition_key, waitpoint_id, execution_id, token_kid, token, \
360                        created_at_ms, expires_at_ms, waitpoint_key) \
361                     VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \
362                     ON CONFLICT (partition_key, waitpoint_id) DO UPDATE SET \
363                       token_kid = EXCLUDED.token_kid, token = EXCLUDED.token, \
364                       waitpoint_key = EXCLUDED.waitpoint_key",
365                )
366                .bind(part)
367                .bind(wp_uuid(&wp_id)?)
368                .bind(exec_uuid)
369                .bind(&kid)
370                .bind(&token)
371                .bind(now)
372                .bind(args.timeout_at.map(|t| t.0))
373                .bind(&wp_key)
374                .execute(&mut **tx)
375                .await
376                .map_err(map_sqlx_error)?;
377                signed.push((wp_id, wp_key, token));
378            }
379
380            // 5. Insert ff_suspension_current.
381            let condition_json =
382                serde_json::to_value(&args.resume_condition).map_err(|e| {
383                    EngineError::Validation {
384                        kind: ValidationKind::Corruption,
385                        detail: format!("resume_condition serialize: {e}"),
386                    }
387                })?;
388            sqlx::query(
389                "INSERT INTO ff_suspension_current \
390                   (partition_key, execution_id, suspension_id, suspended_at_ms, \
391                    timeout_at_ms, reason_code, condition, satisfied_set, member_map, \
392                    timeout_behavior) \
393                 VALUES ($1, $2, $3, $4, $5, $6, $7, '[]'::jsonb, '{}'::jsonb, $8) \
394                 ON CONFLICT (partition_key, execution_id) DO UPDATE SET \
395                   suspension_id = EXCLUDED.suspension_id, \
396                   suspended_at_ms = EXCLUDED.suspended_at_ms, \
397                   timeout_at_ms = EXCLUDED.timeout_at_ms, \
398                   reason_code = EXCLUDED.reason_code, \
399                   condition = EXCLUDED.condition, \
400                   satisfied_set = '[]'::jsonb, \
401                   member_map = '{}'::jsonb, \
402                   timeout_behavior = EXCLUDED.timeout_behavior",
403            )
404            .bind(part)
405            .bind(exec_uuid)
406            .bind(susp_uuid(&args.suspension_id)?)
407            .bind(now)
408            .bind(args.timeout_at.map(|t| t.0))
409            .bind(args.reason_code.as_wire_str())
410            .bind(&condition_json)
411            .bind(args.timeout_behavior.as_wire_str())
412            .execute(&mut **tx)
413            .await
414            .map_err(map_sqlx_error)?;
415
416            // 6. Transition exec_core to suspended.
417            sqlx::query(
418                "UPDATE ff_exec_core \
419                    SET lifecycle_phase = 'suspended', \
420                        ownership_state = 'released', \
421                        eligibility_state = 'not_applicable', \
422                        public_state = 'suspended', \
423                        attempt_state = 'attempt_interrupted' \
424                  WHERE partition_key = $1 AND execution_id = $2",
425            )
426            .bind(part)
427            .bind(exec_uuid)
428            .execute(&mut **tx)
429            .await
430            .map_err(map_sqlx_error)?;
431
432            // 7. Release + bump lease epoch on ff_attempt.
433            sqlx::query(
434                "UPDATE ff_attempt \
435                    SET worker_id = NULL, \
436                        worker_instance_id = NULL, \
437                        lease_expires_at_ms = NULL, \
438                        lease_epoch = lease_epoch + 1, \
439                        outcome = 'attempt_interrupted' \
440                  WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3",
441            )
442            .bind(part)
443            .bind(exec_uuid)
444            .bind(attempt_index_i)
445            .execute(&mut **tx)
446            .await
447            .map_err(map_sqlx_error)?;
448
449            // 8. Assemble outcome.
450            let (primary_id, primary_key, primary_token) = signed[0].clone();
451            let extras: Vec<AdditionalWaitpointBinding> = signed
452                .iter()
453                .skip(1)
454                .map(|(id, key, tok)| {
455                    AdditionalWaitpointBinding::new(
456                        id.clone(),
457                        key.clone(),
458                        WaitpointHmac::new(tok.clone()),
459                    )
460                })
461                .collect();
462            let details = SuspendOutcomeDetails::new(
463                args.suspension_id.clone(),
464                primary_id,
465                primary_key,
466                WaitpointHmac::new(primary_token),
467            )
468            .with_additional_waitpoints(extras);
469
470            let opaque = encode_opaque(BackendTag::Postgres, &payload);
471            let suspended_handle =
472                Handle::new(BackendTag::Postgres, HandleKind::Suspended, opaque);
473            let outcome = SuspendOutcome::Suspended {
474                details,
475                handle: suspended_handle,
476            };
477
478            // 9. Cache outcome if idempotency key present.
479            if let Some(key) = idem.as_deref() {
480                let cached = outcome_to_dedup_json(&outcome);
481                sqlx::query(
482                    "INSERT INTO ff_suspend_dedup \
483                       (partition_key, idempotency_key, outcome_json, created_at_ms) \
484                     VALUES ($1, $2, $3, $4) \
485                     ON CONFLICT DO NOTHING",
486                )
487                .bind(part)
488                .bind(key)
489                .bind(&cached)
490                .bind(now)
491                .execute(&mut **tx)
492                .await
493                .map_err(map_sqlx_error)?;
494            }
495
496            Ok(outcome)
497        })
498    })
499    .await
500}
501
502// ─── deliver_signal ──────────────────────────────────────────────────────
503
504pub(crate) async fn deliver_signal_impl(
505    pool: &PgPool,
506    _partition_config: &PartitionConfig,
507    args: DeliverSignalArgs,
508) -> Result<DeliverSignalResult, EngineError> {
509    let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
510    let wp_u = wp_uuid(&args.waitpoint_id)?;
511
512    run_serializable(pool, move |tx| {
513        let args = args.clone();
514        Box::pin(async move {
515            // 1. Look up pending waitpoint + stored token + kid.
516            let row: Option<(String, String, String, Uuid)> = sqlx::query_as(
517                "SELECT token_kid, token, waitpoint_key, execution_id \
518                   FROM ff_waitpoint_pending \
519                  WHERE partition_key = $1 AND waitpoint_id = $2 \
520                  FOR UPDATE",
521            )
522            .bind(part)
523            .bind(wp_u)
524            .fetch_optional(&mut **tx)
525            .await
526            .map_err(map_sqlx_error)?;
527            let (kid, stored_token, wp_key, stored_exec) = match row {
528                Some(r) => r,
529                None => return Err(EngineError::NotFound { entity: "waitpoint" }),
530            };
531            if stored_exec != exec_uuid {
532                return Err(EngineError::Validation {
533                    kind: ValidationKind::InvalidInput,
534                    detail: "waitpoint belongs to a different execution".into(),
535                });
536            }
537
538            // 2. HMAC verify. Secret comes from the keystore — allows
539            //    post-rotation grace verification when kid is inactive
540            //    but still stored.
541            let secret_row: Option<(Vec<u8>,)> = sqlx::query_as(
542                "SELECT secret FROM ff_waitpoint_hmac WHERE kid = $1",
543            )
544            .bind(&kid)
545            .fetch_optional(&mut **tx)
546            .await
547            .map_err(map_sqlx_error)?;
548            let (secret,) = secret_row.ok_or_else(|| EngineError::Validation {
549                kind: ValidationKind::InvalidInput,
550                detail: format!("kid {kid} missing from keystore"),
551            })?;
552            let presented = args.waitpoint_token.as_str();
553            let msg = format!("{}:{}", args.execution_id, args.waitpoint_id);
554            hmac_verify(&secret, &kid, msg.as_bytes(), presented).map_err(|e| {
555                EngineError::Validation {
556                    kind: ValidationKind::InvalidInput,
557                    detail: format!("waitpoint_token verify: {e}"),
558                }
559            })?;
560            if presented != stored_token {
561                return Err(EngineError::Validation {
562                    kind: ValidationKind::InvalidInput,
563                    detail: "waitpoint_token does not match minted token".into(),
564                });
565            }
566
567            // 3. Load suspension_current (lock the row so concurrent
568            //    signals serialize on this execution).
569            let susp_row: Option<(JsonValue, JsonValue)> = sqlx::query_as(
570                "SELECT condition, member_map FROM ff_suspension_current \
571                  WHERE partition_key = $1 AND execution_id = $2 \
572                  FOR UPDATE",
573            )
574            .bind(part)
575            .bind(exec_uuid)
576            .fetch_optional(&mut **tx)
577            .await
578            .map_err(map_sqlx_error)?;
579            let (condition_json, mut member_map) = match susp_row {
580                Some(r) => r,
581                None => return Err(EngineError::NotFound { entity: "suspension" }),
582            };
583
584            // 4. Append signal into member_map[wp_key].
585            let signal_blob = json!({
586                "signal_id": args.signal_id.to_string(),
587                "signal_name": args.signal_name,
588                "signal_category": args.signal_category,
589                "source_type": args.source_type,
590                "source_identity": args.source_identity,
591                "correlation_id": args.correlation_id.clone().unwrap_or_default(),
592                "accepted_at": args.now.0,
593                "payload_hex": args.payload.as_ref().map(hex::encode),
594            });
595            let map_obj = member_map.as_object_mut().ok_or_else(|| {
596                EngineError::Validation {
597                    kind: ValidationKind::Corruption,
598                    detail: "member_map not a JSON object".into(),
599                }
600            })?;
601            let entry = map_obj.entry(wp_key.clone()).or_insert_with(|| json!([]));
602            entry
603                .as_array_mut()
604                .ok_or_else(|| EngineError::Validation {
605                    kind: ValidationKind::Corruption,
606                    detail: "member_map[wp_key] not a JSON array".into(),
607                })?
608                .push(signal_blob);
609
610            // 5. Evaluate composite condition against the updated map.
611            let condition: ResumeCondition = serde_json::from_value(condition_json)
612                .map_err(|e| EngineError::Validation {
613                    kind: ValidationKind::Corruption,
614                    detail: format!("condition deserialize: {e}"),
615                })?;
616            let signals_by_wp: HashMap<String, Vec<ResumeSignal>> = map_obj
617                .iter()
618                .map(|(k, v)| {
619                    let sigs: Vec<ResumeSignal> = v
620                        .as_array()
621                        .map(|arr| arr.iter().filter_map(resume_signal_from_json).collect())
622                        .unwrap_or_default();
623                    (k.clone(), sigs)
624                })
625                .collect();
626            let borrowed: HashMap<&str, &[ResumeSignal]> = signals_by_wp
627                .iter()
628                .map(|(k, v)| (k.as_str(), v.as_slice()))
629                .collect();
630            let satisfied = evaluate(&condition, &borrowed);
631
632            // 6. Persist member_map (always — both append + satisfy
633            //    cases need the updated view for observe_signals).
634            sqlx::query(
635                "UPDATE ff_suspension_current SET member_map = $1 \
636                  WHERE partition_key = $2 AND execution_id = $3",
637            )
638            .bind(&member_map)
639            .bind(part)
640            .bind(exec_uuid)
641            .execute(&mut **tx)
642            .await
643            .map_err(map_sqlx_error)?;
644
645            let effect = if satisfied {
646                sqlx::query(
647                    "UPDATE ff_exec_core \
648                        SET public_state = 'resumable', \
649                            lifecycle_phase = 'runnable', \
650                            eligibility_state = 'eligible_now' \
651                      WHERE partition_key = $1 AND execution_id = $2",
652                )
653                .bind(part)
654                .bind(exec_uuid)
655                .execute(&mut **tx)
656                .await
657                .map_err(map_sqlx_error)?;
658
659                sqlx::query(
660                    "DELETE FROM ff_waitpoint_pending \
661                      WHERE partition_key = $1 AND execution_id = $2",
662                )
663                .bind(part)
664                .bind(exec_uuid)
665                .execute(&mut **tx)
666                .await
667                .map_err(map_sqlx_error)?;
668
669                sqlx::query(
670                    "INSERT INTO ff_completion_event \
671                       (partition_key, execution_id, outcome, occurred_at_ms) \
672                     VALUES ($1, $2, 'resumable', $3)",
673                )
674                .bind(part)
675                .bind(exec_uuid)
676                .bind(args.now.0)
677                .execute(&mut **tx)
678                .await
679                .map_err(map_sqlx_error)?;
680
681                "resume_condition_satisfied"
682            } else {
683                "appended_to_waitpoint"
684            };
685
686            Ok(DeliverSignalResult::Accepted {
687                signal_id: args.signal_id.clone(),
688                effect: effect.to_owned(),
689            })
690        })
691    })
692    .await
693}
694
695fn resume_signal_from_json(v: &JsonValue) -> Option<ResumeSignal> {
696    let signal_id = SignalId::parse(v["signal_id"].as_str()?).ok()?;
697    Some(ResumeSignal {
698        signal_id,
699        signal_name: v["signal_name"].as_str()?.to_owned(),
700        signal_category: v["signal_category"].as_str().unwrap_or("").to_owned(),
701        source_type: v["source_type"].as_str().unwrap_or("").to_owned(),
702        source_identity: v["source_identity"].as_str().unwrap_or("").to_owned(),
703        correlation_id: v["correlation_id"].as_str().unwrap_or("").to_owned(),
704        accepted_at: TimestampMs::from_millis(v["accepted_at"].as_i64().unwrap_or(0)),
705        payload: v["payload_hex"].as_str().and_then(|h| hex::decode(h).ok()),
706    })
707}
708
709// ─── claim_resumed_execution ─────────────────────────────────────────────
710
711pub(crate) async fn claim_resumed_execution_impl(
712    pool: &PgPool,
713    _partition_config: &PartitionConfig,
714    args: ClaimResumedExecutionArgs,
715) -> Result<ClaimResumedExecutionResult, EngineError> {
716    let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
717    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
718
719    let row: Option<(String, i32)> = sqlx::query_as(
720        "SELECT public_state, attempt_index FROM ff_exec_core \
721          WHERE partition_key = $1 AND execution_id = $2 \
722          FOR UPDATE",
723    )
724    .bind(part)
725    .bind(exec_uuid)
726    .fetch_optional(&mut *tx)
727    .await
728    .map_err(map_sqlx_error)?;
729    let (public_state, attempt_index_i) = match row {
730        Some(r) => r,
731        None => {
732            tx.rollback().await.ok();
733            return Err(EngineError::NotFound { entity: "execution" });
734        }
735    };
736    if public_state != "resumable" {
737        tx.rollback().await.ok();
738        return Err(EngineError::Contention(
739            ContentionKind::NotAResumedExecution,
740        ));
741    }
742
743    let now = now_ms();
744    let lease_ttl = i64::try_from(args.lease_ttl_ms).unwrap_or(0);
745    let new_expires = now.saturating_add(lease_ttl);
746
747    sqlx::query(
748        "UPDATE ff_attempt \
749            SET worker_id = $1, worker_instance_id = $2, \
750                lease_epoch = lease_epoch + 1, \
751                lease_expires_at_ms = $3, started_at_ms = $4, outcome = NULL \
752          WHERE partition_key = $5 AND execution_id = $6 AND attempt_index = $7",
753    )
754    .bind(args.worker_id.as_str())
755    .bind(args.worker_instance_id.as_str())
756    .bind(new_expires)
757    .bind(now)
758    .bind(part)
759    .bind(exec_uuid)
760    .bind(attempt_index_i)
761    .execute(&mut *tx)
762    .await
763    .map_err(map_sqlx_error)?;
764
765    sqlx::query(
766        "UPDATE ff_exec_core \
767            SET lifecycle_phase = 'active', ownership_state = 'leased', \
768                eligibility_state = 'not_applicable', \
769                public_state = 'running', attempt_state = 'running_attempt' \
770          WHERE partition_key = $1 AND execution_id = $2",
771    )
772    .bind(part)
773    .bind(exec_uuid)
774    .execute(&mut *tx)
775    .await
776    .map_err(map_sqlx_error)?;
777
778    let epoch_row: (i64,) = sqlx::query_as(
779        "SELECT lease_epoch FROM ff_attempt \
780          WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3",
781    )
782    .bind(part)
783    .bind(exec_uuid)
784    .bind(attempt_index_i)
785    .fetch_one(&mut *tx)
786    .await
787    .map_err(map_sqlx_error)?;
788
789    tx.commit().await.map_err(map_sqlx_error)?;
790
791    let attempt_index = AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
792    let lease_epoch = LeaseEpoch(u64::try_from(epoch_row.0).unwrap_or(0));
793    let attempt_id = AttemptId::new();
794
795    Ok(ClaimResumedExecutionResult::Claimed(
796        ClaimedResumedExecution {
797            execution_id: args.execution_id.clone(),
798            lease_id: args.lease_id.clone(),
799            lease_epoch,
800            attempt_index,
801            attempt_id,
802            lease_expires_at: TimestampMs::from_millis(new_expires),
803        },
804    ))
805}
806
807// ─── observe_signals ─────────────────────────────────────────────────────
808
809pub(crate) async fn observe_signals_impl(
810    pool: &PgPool,
811    handle: &Handle,
812) -> Result<Vec<ResumeSignal>, EngineError> {
813    let payload = decode_handle(handle)?;
814    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
815
816    let row: Option<(JsonValue,)> = sqlx::query_as(
817        "SELECT member_map FROM ff_suspension_current \
818          WHERE partition_key = $1 AND execution_id = $2",
819    )
820    .bind(part)
821    .bind(exec_uuid)
822    .fetch_optional(pool)
823    .await
824    .map_err(map_sqlx_error)?;
825
826    let Some((member_map,)) = row else {
827        return Ok(Vec::new());
828    };
829    let mut out: Vec<ResumeSignal> = Vec::new();
830    if let Some(map) = member_map.as_object() {
831        for (_wp_key, arr) in map {
832            if let Some(sigs) = arr.as_array() {
833                for v in sigs {
834                    if let Some(s) = resume_signal_from_json(v) {
835                        out.push(s);
836                    }
837                }
838            }
839        }
840    }
841    Ok(out)
842}