Skip to main content

ff_backend_postgres/
suspend_ops.rs

1//! Suspend / deliver_signal / claim_resumed / observe_signals SQL bodies.
2//!
3//! **Wave 4d follow-up.** Builds on PR #238's primitives:
4//!
5//! * [`crate::signal::hmac_sign`] / [`crate::signal::hmac_verify`]
6//! * [`crate::signal::SERIALIZABLE_RETRY_BUDGET`] +
7//!   [`crate::signal::is_retryable_serialization`]
8//! * [`crate::suspend::evaluate`] — composite-condition evaluator
9//!
10//! HMAC signing, retry looping, and composite evaluation are never
11//! re-implemented here.
12//!
13//! # Isolation
14//!
15//! `suspend` + `deliver_signal` run at SERIALIZABLE with a 3-attempt
16//! retry budget (Q11). On retry exhaustion they surface
17//! `Contention(RetryExhausted)`. `claim_resumed_execution` +
18//! `observe_signals` stay at READ COMMITTED (row-level `FOR UPDATE`
19//! for the former, read-only for the latter).
20
21use std::collections::HashMap;
22use std::time::{SystemTime, UNIX_EPOCH};
23
24use ff_core::backend::{BackendTag, Handle, HandleKind, HandleOpaque, ResumeSignal, WaitpointHmac};
25use ff_core::contracts::{
26    AdditionalWaitpointBinding, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
27    ClaimedResumedExecution, DeliverSignalArgs, DeliverSignalResult, ResumeCondition, SuspendArgs,
28    SuspendOutcome, SuspendOutcomeDetails, WaitpointBinding,
29};
30use ff_core::engine_error::{ContentionKind, EngineError, ValidationKind};
31use ff_core::handle_codec::{encode as encode_opaque, HandlePayload};
32use ff_core::partition::PartitionConfig;
33use ff_core::types::{
34    AttemptId, AttemptIndex, ExecutionId, LeaseEpoch, LeaseFence, SignalId, SuspensionId,
35    TimestampMs, WaitpointId,
36};
37use serde_json::{json, Value as JsonValue};
38use sqlx::{PgPool, Postgres, Transaction};
39use uuid::Uuid;
40
41use crate::error::map_sqlx_error;
42use crate::lease_event;
43use crate::signal::{hmac_sign, hmac_verify, is_retryable_serialization, SERIALIZABLE_RETRY_BUDGET};
44use crate::signal_event;
45use crate::suspend::evaluate;
46
47// ─── small shared helpers ────────────────────────────────────────────────
48
49fn now_ms() -> i64 {
50    let d = SystemTime::now()
51        .duration_since(UNIX_EPOCH)
52        .unwrap_or_default();
53    (d.as_millis() as i64).max(0)
54}
55
56fn split_exec_id(eid: &ExecutionId) -> Result<(i16, Uuid), EngineError> {
57    let s = eid.as_str();
58    let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
59        kind: ValidationKind::InvalidInput,
60        detail: format!("execution_id missing `{{fp:` prefix: {s}"),
61    })?;
62    let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
63        kind: ValidationKind::InvalidInput,
64        detail: format!("execution_id missing `}}:`: {s}"),
65    })?;
66    let part: i16 = rest[..close]
67        .parse()
68        .map_err(|_| EngineError::Validation {
69            kind: ValidationKind::InvalidInput,
70            detail: format!("execution_id partition index not u16: {s}"),
71        })?;
72    let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
73        kind: ValidationKind::InvalidInput,
74        detail: format!("execution_id UUID invalid: {s}"),
75    })?;
76    Ok((part, uuid))
77}
78
79fn decode_handle(handle: &Handle) -> Result<HandlePayload, EngineError> {
80    if handle.backend != BackendTag::Postgres {
81        return Err(EngineError::Validation {
82            kind: ValidationKind::HandleFromOtherBackend,
83            detail: format!("expected Postgres, got {:?}", handle.backend),
84        });
85    }
86    let decoded = ff_core::handle_codec::decode(&handle.opaque)?;
87    if decoded.tag != BackendTag::Postgres {
88        return Err(EngineError::Validation {
89            kind: ValidationKind::HandleFromOtherBackend,
90            detail: format!("embedded tag {:?}", decoded.tag),
91        });
92    }
93    Ok(decoded.payload)
94}
95
96fn wp_uuid(w: &WaitpointId) -> Result<Uuid, EngineError> {
97    Uuid::parse_str(&w.to_string()).map_err(|e| EngineError::Validation {
98        kind: ValidationKind::InvalidInput,
99        detail: format!("waitpoint_id not a UUID: {e}"),
100    })
101}
102
103fn susp_uuid(s: &SuspensionId) -> Result<Uuid, EngineError> {
104    Uuid::parse_str(&s.to_string()).map_err(|e| EngineError::Validation {
105        kind: ValidationKind::InvalidInput,
106        detail: format!("suspension_id not a UUID: {e}"),
107    })
108}
109
110// ─── SERIALIZABLE retry loop ─────────────────────────────────────────────
111
112/// Return true if an `EngineError::Transport` carries a sqlx
113/// serialization-failure SQLSTATE. `map_sqlx_error` stringifies the
114/// underlying code; we match defensively on both the raw codes
115/// (40001/40P01) and the symbolic labels.
116fn is_retryable_engine(err: &EngineError) -> bool {
117    match err {
118        EngineError::Transport { source, .. } => {
119            let s = source.to_string();
120            s.contains("40001")
121                || s.contains("40P01")
122                || s.contains("serialization_failure")
123                || s.contains("deadlock_detected")
124        }
125        // `map_sqlx_error` collapses serialization_failure (40001) +
126        // deadlock_detected (40P01) into `Contention(LeaseConflict)`.
127        // Inside a SERIALIZABLE retry loop those are retryable; the
128        // explicit in-body fence-mismatch case is NOT (a bumped epoch
129        // won't unbump on retry). We can't distinguish them by
130        // discriminant alone, so the retry loop treats LeaseConflict
131        // as retryable — a genuine fence mismatch will still bail
132        // after budget exhaustion as RetryExhausted, which callers
133        // reconcile by re-reading exec_core.
134        EngineError::Contention(ContentionKind::LeaseConflict) => true,
135        _ => false,
136    }
137}
138
139/// Run `op` inside a SERIALIZABLE transaction, retrying up to
140/// [`SERIALIZABLE_RETRY_BUDGET`] times on retryable faults. On
141/// exhaustion returns `Contention(RetryExhausted)`.
142async fn run_serializable<T, F>(pool: &PgPool, mut op: F) -> Result<T, EngineError>
143where
144    T: Send,
145    F: for<'a> FnMut(
146            &'a mut Transaction<'_, Postgres>,
147        ) -> std::pin::Pin<
148            Box<dyn std::future::Future<Output = Result<T, EngineError>> + Send + 'a>,
149        > + Send,
150{
151    for _ in 0..SERIALIZABLE_RETRY_BUDGET {
152        let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
153        sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
154            .execute(&mut *tx)
155            .await
156            .map_err(map_sqlx_error)?;
157        let body_res = op(&mut tx).await;
158        match body_res {
159            Ok(v) => match tx.commit().await {
160                Ok(()) => return Ok(v),
161                Err(e) if is_retryable_serialization(&e) => continue,
162                Err(e) => return Err(map_sqlx_error(e)),
163            },
164            Err(e) if is_retryable_engine(&e) => {
165                let _ = tx.rollback().await;
166                continue;
167            }
168            Err(e) => {
169                let _ = tx.rollback().await;
170                return Err(e);
171            }
172        }
173    }
174    Err(EngineError::Contention(ContentionKind::RetryExhausted))
175}
176
177// ─── dedup outcome (de)serialization ─────────────────────────────────────
178
179fn outcome_to_dedup_json(outcome: &SuspendOutcome) -> JsonValue {
180    let details = outcome.details();
181    let extras: Vec<JsonValue> = details
182        .additional_waitpoints
183        .iter()
184        .map(|e| {
185            json!({
186                "waitpoint_id": e.waitpoint_id.to_string(),
187                "waitpoint_key": e.waitpoint_key,
188                "token": e.waitpoint_token.as_str(),
189            })
190        })
191        .collect();
192    let (variant, handle_opaque) = match outcome {
193        SuspendOutcome::Suspended { handle, .. } => {
194            ("Suspended", Some(hex::encode(handle.opaque.as_bytes())))
195        }
196        SuspendOutcome::AlreadySatisfied { .. } => ("AlreadySatisfied", None),
197        _ => ("Suspended", None),
198    };
199    json!({
200        "variant": variant,
201        "details": {
202            "suspension_id": details.suspension_id.to_string(),
203            "waitpoint_id": details.waitpoint_id.to_string(),
204            "waitpoint_key": details.waitpoint_key,
205            "token": details.waitpoint_token.as_str(),
206            "extras": extras,
207        },
208        "handle_opaque_hex": handle_opaque,
209    })
210}
211
212fn outcome_from_dedup_json(v: &JsonValue) -> Result<SuspendOutcome, EngineError> {
213    let corrupt = |s: String| EngineError::Validation {
214        kind: ValidationKind::Corruption,
215        detail: s,
216    };
217    let det = &v["details"];
218    let suspension_id = SuspensionId::parse(det["suspension_id"].as_str().unwrap_or(""))
219        .map_err(|e| corrupt(format!("dedup suspension_id: {e}")))?;
220    let waitpoint_id = WaitpointId::parse(det["waitpoint_id"].as_str().unwrap_or(""))
221        .map_err(|e| corrupt(format!("dedup waitpoint_id: {e}")))?;
222    let waitpoint_key = det["waitpoint_key"].as_str().unwrap_or("").to_owned();
223    let token = det["token"].as_str().unwrap_or("").to_owned();
224    let mut extras: Vec<AdditionalWaitpointBinding> = Vec::new();
225    if let Some(arr) = det["extras"].as_array() {
226        for e in arr {
227            let wid = WaitpointId::parse(e["waitpoint_id"].as_str().unwrap_or(""))
228                .map_err(|err| corrupt(format!("dedup extra wp_id: {err}")))?;
229            let wkey = e["waitpoint_key"].as_str().unwrap_or("").to_owned();
230            let tok = e["token"].as_str().unwrap_or("").to_owned();
231            extras.push(AdditionalWaitpointBinding::new(
232                wid,
233                wkey,
234                WaitpointHmac::new(tok),
235            ));
236        }
237    }
238    let details = SuspendOutcomeDetails::new(
239        suspension_id,
240        waitpoint_id,
241        waitpoint_key,
242        WaitpointHmac::new(token),
243    )
244    .with_additional_waitpoints(extras);
245
246    match v["variant"].as_str().unwrap_or("Suspended") {
247        "AlreadySatisfied" => Ok(SuspendOutcome::AlreadySatisfied { details }),
248        _ => {
249            let opaque_hex = v["handle_opaque_hex"].as_str().unwrap_or("");
250            let bytes = hex::decode(opaque_hex)
251                .map_err(|e| corrupt(format!("dedup handle hex: {e}")))?;
252            let opaque = HandleOpaque::new(bytes.into_boxed_slice());
253            let handle = Handle::new(BackendTag::Postgres, HandleKind::Suspended, opaque);
254            Ok(SuspendOutcome::Suspended { details, handle })
255        }
256    }
257}
258
259// ─── suspend ─────────────────────────────────────────────────────────────
260
261pub(crate) async fn suspend_impl(
262    pool: &PgPool,
263    _partition_config: &PartitionConfig,
264    handle: &Handle,
265    args: SuspendArgs,
266) -> Result<SuspendOutcome, EngineError> {
267    let payload = decode_handle(handle)?;
268    suspend_core(pool, payload, args).await
269}
270
271/// Cairn #322 — service-layer entry point: suspend when the caller
272/// holds a lease fence triple but no `Handle`. Resolves `attempt_index`
273/// from `ff_attempt` by `(exec_id, attempt_id)` then delegates to the
274/// same transactional body used by [`suspend_impl`].
275pub(crate) async fn suspend_by_triple_impl(
276    pool: &PgPool,
277    _partition_config: &PartitionConfig,
278    exec_id: ExecutionId,
279    triple: LeaseFence,
280    args: SuspendArgs,
281) -> Result<SuspendOutcome, EngineError> {
282    let (part, exec_uuid) = split_exec_id(&exec_id)?;
283    // Postgres-side attempts are keyed by `(execution_id, attempt_index)`;
284    // there is no `attempt_id` column on `ff_attempt`. The triple's
285    // `attempt_id` is therefore advisory on this backend — the
286    // authoritative "which attempt" pointer lives on `ff_exec_core`.
287    // Read it outside the serializable body; `suspend_core` re-fences
288    // against `lease_epoch` (`FOR UPDATE`) inside the txn so a racing
289    // attempt-bump or lease-bump surfaces as `Contention(LeaseConflict)`.
290    let row: Option<(i32,)> = sqlx::query_as(
291        "SELECT attempt_index FROM ff_exec_core \
292         WHERE partition_key = $1 AND execution_id = $2",
293    )
294    .bind(part)
295    .bind(exec_uuid)
296    .fetch_optional(pool)
297    .await
298    .map_err(map_sqlx_error)?;
299    let attempt_index_i = match row {
300        Some((i,)) => i,
301        None => return Err(EngineError::NotFound { entity: "execution" }),
302    };
303    let attempt_index =
304        AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
305
306    // Synthesize a HandlePayload — `suspend_core` only reads
307    // `execution_id`, `attempt_index`, and `lease_epoch`; the rest of
308    // the payload fields are carried through to the replayed-outcome
309    // handle encoding (kind = Suspended).
310    let payload = HandlePayload::new(
311        exec_id,
312        attempt_index,
313        triple.attempt_id,
314        triple.lease_id,
315        triple.lease_epoch,
316        0,
317        ff_core::types::LaneId::new(""),
318        ff_core::types::WorkerInstanceId::new(""),
319    );
320    suspend_core(pool, payload, args).await
321}
322
323async fn suspend_core(
324    pool: &PgPool,
325    payload: HandlePayload,
326    args: SuspendArgs,
327) -> Result<SuspendOutcome, EngineError> {
328    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
329    let attempt_index_i = i32::try_from(payload.attempt_index.0).unwrap_or(0);
330    let expected_epoch = payload.lease_epoch.0;
331    let idem_key = args.idempotency_key.as_ref().map(|k| k.as_str().to_owned());
332
333    run_serializable(pool, move |tx| {
334        let args = args.clone();
335        let idem = idem_key.clone();
336        let payload = payload.clone();
337        Box::pin(async move {
338            // 1. Dedup replay check.
339            if let Some(key) = idem.as_deref() {
340                let row: Option<(JsonValue,)> = sqlx::query_as(
341                    "SELECT outcome_json FROM ff_suspend_dedup \
342                     WHERE partition_key = $1 AND idempotency_key = $2",
343                )
344                .bind(part)
345                .bind(key)
346                .fetch_optional(&mut **tx)
347                .await
348                .map_err(map_sqlx_error)?;
349                if let Some((cached,)) = row {
350                    return outcome_from_dedup_json(&cached);
351                }
352            }
353
354            // 2. Fence check against ff_attempt.
355            let epoch_row: Option<(i64,)> = sqlx::query_as(
356                "SELECT lease_epoch FROM ff_attempt \
357                 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3 \
358                 FOR UPDATE",
359            )
360            .bind(part)
361            .bind(exec_uuid)
362            .bind(attempt_index_i)
363            .fetch_optional(&mut **tx)
364            .await
365            .map_err(map_sqlx_error)?;
366            let observed_epoch: u64 = match epoch_row {
367                Some((e,)) => u64::try_from(e).unwrap_or(0),
368                None => return Err(EngineError::NotFound { entity: "attempt" }),
369            };
370            if observed_epoch != expected_epoch {
371                return Err(EngineError::Contention(ContentionKind::LeaseConflict));
372            }
373
374            // 3. Resolve the active HMAC kid inside the txn.
375            let kid_row: Option<(String, Vec<u8>)> = sqlx::query_as(
376                "SELECT kid, secret FROM ff_waitpoint_hmac \
377                 WHERE active = TRUE \
378                 ORDER BY rotated_at_ms DESC LIMIT 1",
379            )
380            .fetch_optional(&mut **tx)
381            .await
382            .map_err(map_sqlx_error)?;
383            let (kid, secret) = kid_row.ok_or_else(|| EngineError::Validation {
384                kind: ValidationKind::InvalidInput,
385                detail: "ff_waitpoint_hmac empty — rotate a kid before suspend".into(),
386            })?;
387
388            // 4. Sign + insert waitpoint_pending for each binding.
389            let now = args.now.0;
390            let mut signed: Vec<(WaitpointId, String, String)> = Vec::new();
391            for binding in args.waitpoints.iter() {
392                let (wp_id, wp_key) = match binding {
393                    WaitpointBinding::Fresh {
394                        waitpoint_id,
395                        waitpoint_key,
396                    } => (waitpoint_id.clone(), waitpoint_key.clone()),
397                    WaitpointBinding::UsePending { waitpoint_id } => {
398                        let row: Option<(String,)> = sqlx::query_as(
399                            "SELECT waitpoint_key FROM ff_waitpoint_pending \
400                             WHERE partition_key = $1 AND waitpoint_id = $2",
401                        )
402                        .bind(part)
403                        .bind(wp_uuid(waitpoint_id)?)
404                        .fetch_optional(&mut **tx)
405                        .await
406                        .map_err(map_sqlx_error)?;
407                        let wp_key = row.map(|(k,)| k).unwrap_or_default();
408                        (waitpoint_id.clone(), wp_key)
409                    }
410                    _ => {
411                        return Err(EngineError::Validation {
412                            kind: ValidationKind::InvalidInput,
413                            detail: "unsupported WaitpointBinding variant".into(),
414                        });
415                    }
416                };
417                let msg = format!("{}:{}", payload.execution_id, wp_id);
418                let token = hmac_sign(&secret, &kid, msg.as_bytes());
419                sqlx::query(
420                    "INSERT INTO ff_waitpoint_pending \
421                       (partition_key, waitpoint_id, execution_id, token_kid, token, \
422                        created_at_ms, expires_at_ms, waitpoint_key) \
423                     VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \
424                     ON CONFLICT (partition_key, waitpoint_id) DO UPDATE SET \
425                       token_kid = EXCLUDED.token_kid, token = EXCLUDED.token, \
426                       waitpoint_key = EXCLUDED.waitpoint_key",
427                )
428                .bind(part)
429                .bind(wp_uuid(&wp_id)?)
430                .bind(exec_uuid)
431                .bind(&kid)
432                .bind(&token)
433                .bind(now)
434                .bind(args.timeout_at.map(|t| t.0))
435                .bind(&wp_key)
436                .execute(&mut **tx)
437                .await
438                .map_err(map_sqlx_error)?;
439                signed.push((wp_id, wp_key, token));
440            }
441
442            // 5. Insert ff_suspension_current.
443            let condition_json =
444                serde_json::to_value(&args.resume_condition).map_err(|e| {
445                    EngineError::Validation {
446                        kind: ValidationKind::Corruption,
447                        detail: format!("resume_condition serialize: {e}"),
448                    }
449                })?;
450            sqlx::query(
451                "INSERT INTO ff_suspension_current \
452                   (partition_key, execution_id, suspension_id, suspended_at_ms, \
453                    timeout_at_ms, reason_code, condition, satisfied_set, member_map, \
454                    timeout_behavior) \
455                 VALUES ($1, $2, $3, $4, $5, $6, $7, '[]'::jsonb, '{}'::jsonb, $8) \
456                 ON CONFLICT (partition_key, execution_id) DO UPDATE SET \
457                   suspension_id = EXCLUDED.suspension_id, \
458                   suspended_at_ms = EXCLUDED.suspended_at_ms, \
459                   timeout_at_ms = EXCLUDED.timeout_at_ms, \
460                   reason_code = EXCLUDED.reason_code, \
461                   condition = EXCLUDED.condition, \
462                   satisfied_set = '[]'::jsonb, \
463                   member_map = '{}'::jsonb, \
464                   timeout_behavior = EXCLUDED.timeout_behavior",
465            )
466            .bind(part)
467            .bind(exec_uuid)
468            .bind(susp_uuid(&args.suspension_id)?)
469            .bind(now)
470            .bind(args.timeout_at.map(|t| t.0))
471            .bind(args.reason_code.as_wire_str())
472            .bind(&condition_json)
473            .bind(args.timeout_behavior.as_wire_str())
474            .execute(&mut **tx)
475            .await
476            .map_err(map_sqlx_error)?;
477
478            // 6. Transition exec_core to suspended.
479            sqlx::query(
480                "UPDATE ff_exec_core \
481                    SET lifecycle_phase = 'suspended', \
482                        ownership_state = 'released', \
483                        eligibility_state = 'not_applicable', \
484                        public_state = 'suspended', \
485                        attempt_state = 'attempt_interrupted' \
486                  WHERE partition_key = $1 AND execution_id = $2",
487            )
488            .bind(part)
489            .bind(exec_uuid)
490            .execute(&mut **tx)
491            .await
492            .map_err(map_sqlx_error)?;
493
494            // 7. Release + bump lease epoch on ff_attempt.
495            sqlx::query(
496                "UPDATE ff_attempt \
497                    SET worker_id = NULL, \
498                        worker_instance_id = NULL, \
499                        lease_expires_at_ms = NULL, \
500                        lease_epoch = lease_epoch + 1, \
501                        outcome = 'attempt_interrupted' \
502                  WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3",
503            )
504            .bind(part)
505            .bind(exec_uuid)
506            .bind(attempt_index_i)
507            .execute(&mut **tx)
508            .await
509            .map_err(map_sqlx_error)?;
510
511            // RFC-019 Stage B outbox: lease revoked (suspend).
512            lease_event::emit(
513                tx,
514                part,
515                exec_uuid,
516                None,
517                lease_event::EVENT_REVOKED,
518                now,
519            )
520            .await?;
521
522            // 8. Assemble outcome.
523            let (primary_id, primary_key, primary_token) = signed[0].clone();
524            let extras: Vec<AdditionalWaitpointBinding> = signed
525                .iter()
526                .skip(1)
527                .map(|(id, key, tok)| {
528                    AdditionalWaitpointBinding::new(
529                        id.clone(),
530                        key.clone(),
531                        WaitpointHmac::new(tok.clone()),
532                    )
533                })
534                .collect();
535            let details = SuspendOutcomeDetails::new(
536                args.suspension_id.clone(),
537                primary_id,
538                primary_key,
539                WaitpointHmac::new(primary_token),
540            )
541            .with_additional_waitpoints(extras);
542
543            let opaque = encode_opaque(BackendTag::Postgres, &payload);
544            let suspended_handle =
545                Handle::new(BackendTag::Postgres, HandleKind::Suspended, opaque);
546            let outcome = SuspendOutcome::Suspended {
547                details,
548                handle: suspended_handle,
549            };
550
551            // 9. Cache outcome if idempotency key present.
552            if let Some(key) = idem.as_deref() {
553                let cached = outcome_to_dedup_json(&outcome);
554                sqlx::query(
555                    "INSERT INTO ff_suspend_dedup \
556                       (partition_key, idempotency_key, outcome_json, created_at_ms) \
557                     VALUES ($1, $2, $3, $4) \
558                     ON CONFLICT DO NOTHING",
559                )
560                .bind(part)
561                .bind(key)
562                .bind(&cached)
563                .bind(now)
564                .execute(&mut **tx)
565                .await
566                .map_err(map_sqlx_error)?;
567            }
568
569            Ok(outcome)
570        })
571    })
572    .await
573}
574
575// ─── deliver_signal ──────────────────────────────────────────────────────
576
577pub(crate) async fn deliver_signal_impl(
578    pool: &PgPool,
579    _partition_config: &PartitionConfig,
580    args: DeliverSignalArgs,
581) -> Result<DeliverSignalResult, EngineError> {
582    let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
583    let wp_u = wp_uuid(&args.waitpoint_id)?;
584
585    run_serializable(pool, move |tx| {
586        let args = args.clone();
587        Box::pin(async move {
588            // 1. Look up pending waitpoint + stored token + kid.
589            let row: Option<(String, String, String, Uuid)> = sqlx::query_as(
590                "SELECT token_kid, token, waitpoint_key, execution_id \
591                   FROM ff_waitpoint_pending \
592                  WHERE partition_key = $1 AND waitpoint_id = $2 \
593                  FOR UPDATE",
594            )
595            .bind(part)
596            .bind(wp_u)
597            .fetch_optional(&mut **tx)
598            .await
599            .map_err(map_sqlx_error)?;
600            let (kid, stored_token, wp_key, stored_exec) = match row {
601                Some(r) => r,
602                None => return Err(EngineError::NotFound { entity: "waitpoint" }),
603            };
604            if stored_exec != exec_uuid {
605                return Err(EngineError::Validation {
606                    kind: ValidationKind::InvalidInput,
607                    detail: "waitpoint belongs to a different execution".into(),
608                });
609            }
610
611            // 2. HMAC verify. Secret comes from the keystore — allows
612            //    post-rotation grace verification when kid is inactive
613            //    but still stored.
614            let secret_row: Option<(Vec<u8>,)> = sqlx::query_as(
615                "SELECT secret FROM ff_waitpoint_hmac WHERE kid = $1",
616            )
617            .bind(&kid)
618            .fetch_optional(&mut **tx)
619            .await
620            .map_err(map_sqlx_error)?;
621            let (secret,) = secret_row.ok_or_else(|| EngineError::Validation {
622                kind: ValidationKind::InvalidInput,
623                detail: format!("kid {kid} missing from keystore"),
624            })?;
625            let presented = args.waitpoint_token.as_str();
626            let msg = format!("{}:{}", args.execution_id, args.waitpoint_id);
627            hmac_verify(&secret, &kid, msg.as_bytes(), presented).map_err(|e| {
628                EngineError::Validation {
629                    kind: ValidationKind::InvalidInput,
630                    detail: format!("waitpoint_token verify: {e}"),
631                }
632            })?;
633            if presented != stored_token {
634                return Err(EngineError::Validation {
635                    kind: ValidationKind::InvalidInput,
636                    detail: "waitpoint_token does not match minted token".into(),
637                });
638            }
639
640            // 3. Load suspension_current (lock the row so concurrent
641            //    signals serialize on this execution).
642            let susp_row: Option<(JsonValue, JsonValue)> = sqlx::query_as(
643                "SELECT condition, member_map FROM ff_suspension_current \
644                  WHERE partition_key = $1 AND execution_id = $2 \
645                  FOR UPDATE",
646            )
647            .bind(part)
648            .bind(exec_uuid)
649            .fetch_optional(&mut **tx)
650            .await
651            .map_err(map_sqlx_error)?;
652            let (condition_json, mut member_map) = match susp_row {
653                Some(r) => r,
654                None => return Err(EngineError::NotFound { entity: "suspension" }),
655            };
656
657            // 4. Append signal into member_map[wp_key].
658            let signal_blob = json!({
659                "signal_id": args.signal_id.to_string(),
660                "signal_name": args.signal_name,
661                "signal_category": args.signal_category,
662                "source_type": args.source_type,
663                "source_identity": args.source_identity,
664                "correlation_id": args.correlation_id.clone().unwrap_or_default(),
665                "accepted_at": args.now.0,
666                "payload_hex": args.payload.as_ref().map(hex::encode),
667            });
668            let map_obj = member_map.as_object_mut().ok_or_else(|| {
669                EngineError::Validation {
670                    kind: ValidationKind::Corruption,
671                    detail: "member_map not a JSON object".into(),
672                }
673            })?;
674            let entry = map_obj.entry(wp_key.clone()).or_insert_with(|| json!([]));
675            entry
676                .as_array_mut()
677                .ok_or_else(|| EngineError::Validation {
678                    kind: ValidationKind::Corruption,
679                    detail: "member_map[wp_key] not a JSON array".into(),
680                })?
681                .push(signal_blob);
682
683            // 5. Evaluate composite condition against the updated map.
684            let condition: ResumeCondition = serde_json::from_value(condition_json)
685                .map_err(|e| EngineError::Validation {
686                    kind: ValidationKind::Corruption,
687                    detail: format!("condition deserialize: {e}"),
688                })?;
689            let signals_by_wp: HashMap<String, Vec<ResumeSignal>> = map_obj
690                .iter()
691                .map(|(k, v)| {
692                    let sigs: Vec<ResumeSignal> = v
693                        .as_array()
694                        .map(|arr| arr.iter().filter_map(resume_signal_from_json).collect())
695                        .unwrap_or_default();
696                    (k.clone(), sigs)
697                })
698                .collect();
699            let borrowed: HashMap<&str, &[ResumeSignal]> = signals_by_wp
700                .iter()
701                .map(|(k, v)| (k.as_str(), v.as_slice()))
702                .collect();
703            let satisfied = evaluate(&condition, &borrowed);
704
705            // 6. Persist member_map (always — both append + satisfy
706            //    cases need the updated view for observe_signals).
707            sqlx::query(
708                "UPDATE ff_suspension_current SET member_map = $1 \
709                  WHERE partition_key = $2 AND execution_id = $3",
710            )
711            .bind(&member_map)
712            .bind(part)
713            .bind(exec_uuid)
714            .execute(&mut **tx)
715            .await
716            .map_err(map_sqlx_error)?;
717
718            let effect = if satisfied {
719                sqlx::query(
720                    "UPDATE ff_exec_core \
721                        SET public_state = 'resumable', \
722                            lifecycle_phase = 'runnable', \
723                            eligibility_state = 'eligible_now' \
724                      WHERE partition_key = $1 AND execution_id = $2",
725                )
726                .bind(part)
727                .bind(exec_uuid)
728                .execute(&mut **tx)
729                .await
730                .map_err(map_sqlx_error)?;
731
732                sqlx::query(
733                    "DELETE FROM ff_waitpoint_pending \
734                      WHERE partition_key = $1 AND execution_id = $2",
735                )
736                .bind(part)
737                .bind(exec_uuid)
738                .execute(&mut **tx)
739                .await
740                .map_err(map_sqlx_error)?;
741
742                sqlx::query(
743                    "INSERT INTO ff_completion_event \
744                       (partition_key, execution_id, outcome, occurred_at_ms) \
745                     VALUES ($1, $2, 'resumable', $3)",
746                )
747                .bind(part)
748                .bind(exec_uuid)
749                .bind(args.now.0)
750                .execute(&mut **tx)
751                .await
752                .map_err(map_sqlx_error)?;
753
754                "resume_condition_satisfied"
755            } else {
756                "appended_to_waitpoint"
757            };
758
759            // RFC-019 Stage B — `ff_signal_event` outbox. Same tx as
760            // the state writes above so the NOTIFY fires iff the
761            // delivery commits.
762            let wp_id_str = args.waitpoint_id.to_string();
763            signal_event::emit(
764                tx,
765                part,
766                exec_uuid,
767                &args.signal_id.to_string(),
768                Some(wp_id_str.as_str()),
769                Some(args.source_identity.as_str()),
770                args.now.0,
771            )
772            .await?;
773
774            Ok(DeliverSignalResult::Accepted {
775                signal_id: args.signal_id.clone(),
776                effect: effect.to_owned(),
777            })
778        })
779    })
780    .await
781}
782
783fn resume_signal_from_json(v: &JsonValue) -> Option<ResumeSignal> {
784    let signal_id = SignalId::parse(v["signal_id"].as_str()?).ok()?;
785    Some(ResumeSignal {
786        signal_id,
787        signal_name: v["signal_name"].as_str()?.to_owned(),
788        signal_category: v["signal_category"].as_str().unwrap_or("").to_owned(),
789        source_type: v["source_type"].as_str().unwrap_or("").to_owned(),
790        source_identity: v["source_identity"].as_str().unwrap_or("").to_owned(),
791        correlation_id: v["correlation_id"].as_str().unwrap_or("").to_owned(),
792        accepted_at: TimestampMs::from_millis(v["accepted_at"].as_i64().unwrap_or(0)),
793        payload: v["payload_hex"].as_str().and_then(|h| hex::decode(h).ok()),
794    })
795}
796
797// ─── claim_resumed_execution ─────────────────────────────────────────────
798
799pub(crate) async fn claim_resumed_execution_impl(
800    pool: &PgPool,
801    _partition_config: &PartitionConfig,
802    args: ClaimResumedExecutionArgs,
803) -> Result<ClaimResumedExecutionResult, EngineError> {
804    let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
805    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
806
807    let row: Option<(String, i32)> = sqlx::query_as(
808        "SELECT public_state, attempt_index FROM ff_exec_core \
809          WHERE partition_key = $1 AND execution_id = $2 \
810          FOR UPDATE",
811    )
812    .bind(part)
813    .bind(exec_uuid)
814    .fetch_optional(&mut *tx)
815    .await
816    .map_err(map_sqlx_error)?;
817    let (public_state, attempt_index_i) = match row {
818        Some(r) => r,
819        None => {
820            tx.rollback().await.ok();
821            return Err(EngineError::NotFound { entity: "execution" });
822        }
823    };
824    if public_state != "resumable" {
825        tx.rollback().await.ok();
826        return Err(EngineError::Contention(
827            ContentionKind::NotAResumedExecution,
828        ));
829    }
830
831    let now = now_ms();
832    let lease_ttl = i64::try_from(args.lease_ttl_ms).unwrap_or(0);
833    let new_expires = now.saturating_add(lease_ttl);
834
835    sqlx::query(
836        "UPDATE ff_attempt \
837            SET worker_id = $1, worker_instance_id = $2, \
838                lease_epoch = lease_epoch + 1, \
839                lease_expires_at_ms = $3, started_at_ms = $4, outcome = NULL \
840          WHERE partition_key = $5 AND execution_id = $6 AND attempt_index = $7",
841    )
842    .bind(args.worker_id.as_str())
843    .bind(args.worker_instance_id.as_str())
844    .bind(new_expires)
845    .bind(now)
846    .bind(part)
847    .bind(exec_uuid)
848    .bind(attempt_index_i)
849    .execute(&mut *tx)
850    .await
851    .map_err(map_sqlx_error)?;
852
853    sqlx::query(
854        "UPDATE ff_exec_core \
855            SET lifecycle_phase = 'active', ownership_state = 'leased', \
856                eligibility_state = 'not_applicable', \
857                public_state = 'running', attempt_state = 'running_attempt' \
858          WHERE partition_key = $1 AND execution_id = $2",
859    )
860    .bind(part)
861    .bind(exec_uuid)
862    .execute(&mut *tx)
863    .await
864    .map_err(map_sqlx_error)?;
865
866    let epoch_row: (i64,) = sqlx::query_as(
867        "SELECT lease_epoch FROM ff_attempt \
868          WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3",
869    )
870    .bind(part)
871    .bind(exec_uuid)
872    .bind(attempt_index_i)
873    .fetch_one(&mut *tx)
874    .await
875    .map_err(map_sqlx_error)?;
876
877    // RFC-019 Stage B outbox: lease acquired (claim_resumed_execution).
878    let lease_id_str = args.lease_id.to_string();
879    lease_event::emit(
880        &mut tx,
881        part,
882        exec_uuid,
883        Some(&lease_id_str),
884        lease_event::EVENT_ACQUIRED,
885        now,
886    )
887    .await?;
888
889    tx.commit().await.map_err(map_sqlx_error)?;
890
891    let attempt_index = AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
892    let lease_epoch = LeaseEpoch(u64::try_from(epoch_row.0).unwrap_or(0));
893    let attempt_id = AttemptId::new();
894
895    Ok(ClaimResumedExecutionResult::Claimed(
896        ClaimedResumedExecution {
897            execution_id: args.execution_id.clone(),
898            lease_id: args.lease_id.clone(),
899            lease_epoch,
900            attempt_index,
901            attempt_id,
902            lease_expires_at: TimestampMs::from_millis(new_expires),
903        },
904    ))
905}
906
907// ─── observe_signals ─────────────────────────────────────────────────────
908
909pub(crate) async fn observe_signals_impl(
910    pool: &PgPool,
911    handle: &Handle,
912) -> Result<Vec<ResumeSignal>, EngineError> {
913    let payload = decode_handle(handle)?;
914    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
915
916    let row: Option<(JsonValue,)> = sqlx::query_as(
917        "SELECT member_map FROM ff_suspension_current \
918          WHERE partition_key = $1 AND execution_id = $2",
919    )
920    .bind(part)
921    .bind(exec_uuid)
922    .fetch_optional(pool)
923    .await
924    .map_err(map_sqlx_error)?;
925
926    let Some((member_map,)) = row else {
927        return Ok(Vec::new());
928    };
929    let mut out: Vec<ResumeSignal> = Vec::new();
930    if let Some(map) = member_map.as_object() {
931        for (_wp_key, arr) in map {
932            if let Some(sigs) = arr.as_array() {
933                for v in sigs {
934                    if let Some(s) = resume_signal_from_json(v) {
935                        out.push(s);
936                    }
937                }
938            }
939        }
940    }
941    Ok(out)
942}