Skip to main content

ff_backend_postgres/
suspend_ops.rs

1//! Suspend / deliver_signal / claim_resumed / observe_signals SQL bodies.
2//!
3//! **Wave 4d follow-up.** Builds on PR #238's primitives:
4//!
5//! * [`crate::signal::hmac_sign`] / [`crate::signal::hmac_verify`]
6//! * [`crate::signal::SERIALIZABLE_RETRY_BUDGET`] +
7//!   [`crate::signal::is_retryable_serialization`]
8//! * [`crate::suspend::evaluate`] — composite-condition evaluator
9//!
10//! HMAC signing, retry looping, and composite evaluation are never
11//! re-implemented here.
12//!
13//! # Isolation
14//!
15//! `suspend` + `deliver_signal` run at SERIALIZABLE with a 3-attempt
16//! retry budget (Q11). On retry exhaustion they surface
17//! `Contention(RetryExhausted)`. `claim_resumed_execution` +
18//! `observe_signals` stay at READ COMMITTED (row-level `FOR UPDATE`
19//! for the former, read-only for the latter).
20
21use std::collections::HashMap;
22use std::time::{SystemTime, UNIX_EPOCH};
23
24use ff_core::backend::{
25    BackendTag, Handle, HandleKind, HandleOpaque, PendingWaitpoint, ResumeSignal, WaitpointHmac,
26};
27use ff_core::contracts::{
28    AdditionalWaitpointBinding, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
29    ClaimedResumedExecution, CompositeBody, DeliverApprovalSignalArgs, DeliverSignalArgs,
30    DeliverSignalResult, ListPendingWaitpointsArgs, ListPendingWaitpointsResult,
31    PendingWaitpointInfo, ResumeCondition, SignalMatcher, SuspendArgs, SuspendOutcome,
32    SuspendOutcomeDetails, WaitpointBinding,
33};
34use ff_core::engine_error::{ContentionKind, EngineError, ValidationKind};
35use ff_core::handle_codec::{encode as encode_opaque, HandlePayload};
36use ff_core::partition::PartitionConfig;
37use ff_core::types::{
38    AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseFence, SignalId, SuspensionId,
39    TimestampMs, WaitpointId, WaitpointToken,
40};
41use serde_json::{json, Value as JsonValue};
42use sqlx::{PgPool, Postgres, Transaction};
43use uuid::Uuid;
44
45use crate::error::map_sqlx_error;
46use crate::lease_event;
47use crate::signal::{hmac_sign, hmac_verify, is_retryable_serialization, SERIALIZABLE_RETRY_BUDGET};
48use crate::signal_event;
49use crate::suspend::evaluate;
50
51// ─── small shared helpers ────────────────────────────────────────────────
52
53fn now_ms() -> i64 {
54    let d = SystemTime::now()
55        .duration_since(UNIX_EPOCH)
56        .unwrap_or_default();
57    (d.as_millis() as i64).max(0)
58}
59
60fn split_exec_id(eid: &ExecutionId) -> Result<(i16, Uuid), EngineError> {
61    let s = eid.as_str();
62    let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
63        kind: ValidationKind::InvalidInput,
64        detail: format!("execution_id missing `{{fp:` prefix: {s}"),
65    })?;
66    let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
67        kind: ValidationKind::InvalidInput,
68        detail: format!("execution_id missing `}}:`: {s}"),
69    })?;
70    let part: i16 = rest[..close]
71        .parse()
72        .map_err(|_| EngineError::Validation {
73            kind: ValidationKind::InvalidInput,
74            detail: format!("execution_id partition index not u16: {s}"),
75        })?;
76    let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
77        kind: ValidationKind::InvalidInput,
78        detail: format!("execution_id UUID invalid: {s}"),
79    })?;
80    Ok((part, uuid))
81}
82
83fn decode_handle(handle: &Handle) -> Result<HandlePayload, EngineError> {
84    if handle.backend != BackendTag::Postgres {
85        return Err(EngineError::Validation {
86            kind: ValidationKind::HandleFromOtherBackend,
87            detail: format!("expected Postgres, got {:?}", handle.backend),
88        });
89    }
90    let decoded = ff_core::handle_codec::decode(&handle.opaque)?;
91    if decoded.tag != BackendTag::Postgres {
92        return Err(EngineError::Validation {
93            kind: ValidationKind::HandleFromOtherBackend,
94            detail: format!("embedded tag {:?}", decoded.tag),
95        });
96    }
97    Ok(decoded.payload)
98}
99
100fn wp_uuid(w: &WaitpointId) -> Result<Uuid, EngineError> {
101    Uuid::parse_str(&w.to_string()).map_err(|e| EngineError::Validation {
102        kind: ValidationKind::InvalidInput,
103        detail: format!("waitpoint_id not a UUID: {e}"),
104    })
105}
106
107fn susp_uuid(s: &SuspensionId) -> Result<Uuid, EngineError> {
108    Uuid::parse_str(&s.to_string()).map_err(|e| EngineError::Validation {
109        kind: ValidationKind::InvalidInput,
110        detail: format!("suspension_id not a UUID: {e}"),
111    })
112}
113
114/// RFC-020 §3.1.1 — derive `required_signal_names` for a single
115/// `waitpoint_key` from the suspend call's `ResumeCondition`. Walks
116/// the condition tree and collects every `SignalMatcher::ByName` that
117/// targets this specific waitpoint. `Wildcard` matchers contribute
118/// nothing (an empty result vec is the wire-level wildcard marker per
119/// `PendingWaitpointInfo::required_signal_names` docs).
120///
121/// `OperatorOnly` / `TimeoutOnly` are rendered with the same sentinel
122/// names that the Valkey wire format emits (`__operator_only__` /
123/// `__timeout_only__`, see `ff-backend-valkey/src/lib.rs:3205-3222`)
124/// so an operator-only or timeout-only waitpoint is distinguishable
125/// from a true wildcard at the `PendingWaitpointInfo` surface. The
126/// `__`-prefix sentinel values are never real signal names.
127///
128/// `Count { matcher: None }` returns empty (any signal on the listed
129/// waitpoints counts). Duplicates are de-duplicated preserving
130/// first-seen order.
131fn derive_required_signal_names(cond: &ResumeCondition, wp_key: &str) -> Vec<String> {
132    const OPERATOR_ONLY_SENTINEL: &str = "__operator_only__";
133    const TIMEOUT_ONLY_SENTINEL: &str = "__timeout_only__";
134
135    let mut out: Vec<String> = Vec::new();
136    let mut push = |name: &str| {
137        if !name.is_empty() && !out.iter().any(|e| e == name) {
138            out.push(name.to_owned());
139        }
140    };
141    fn walk(cond: &ResumeCondition, target: &str, push: &mut dyn FnMut(&str)) {
142        match cond {
143            ResumeCondition::Single {
144                waitpoint_key,
145                matcher,
146            } => {
147                if waitpoint_key == target
148                    && let SignalMatcher::ByName(name) = matcher
149                {
150                    push(name.as_str());
151                }
152            }
153            ResumeCondition::OperatorOnly => push(OPERATOR_ONLY_SENTINEL),
154            ResumeCondition::TimeoutOnly => push(TIMEOUT_ONLY_SENTINEL),
155            ResumeCondition::Composite(body) => walk_body(body, target, push),
156            _ => {}
157        }
158    }
159    fn walk_body(body: &CompositeBody, target: &str, push: &mut dyn FnMut(&str)) {
160        match body {
161            CompositeBody::AllOf { members } => {
162                for m in members {
163                    walk(m, target, push);
164                }
165            }
166            CompositeBody::Count {
167                matcher, waitpoints, ..
168            } => {
169                if waitpoints.iter().any(|w| w == target)
170                    && let Some(SignalMatcher::ByName(name)) = matcher
171                {
172                    push(name.as_str());
173                }
174            }
175            _ => {}
176        }
177    }
178    walk(cond, wp_key, &mut push);
179    out
180}
181
182/// RFC-017 §8 / RFC-020 §4.5 — parse the stored `<kid>:<hex>` waitpoint
183/// token into a `(token_kid, token_fingerprint)` pair. `token_fingerprint`
184/// is the first 16 hex chars (8 bytes) of the HMAC digest — the §8 audit-
185/// friendly handle. Malformed input collapses to `("", "")` so callers
186/// can skip / log without surfacing a typed error. Mirrors
187/// `ff-backend-valkey::parse_waitpoint_token_kid_fp`.
188fn parse_waitpoint_token_kid_fp(raw: &str) -> (String, String) {
189    match raw.split_once(':') {
190        Some((kid, hex)) if !kid.is_empty() && !hex.is_empty() => {
191            let fp_len = hex.len().min(16);
192            (kid.to_owned(), hex[..fp_len].to_owned())
193        }
194        _ => (String::new(), String::new()),
195    }
196}
197
198// ─── SERIALIZABLE retry loop ─────────────────────────────────────────────
199
200/// Return true if an `EngineError::Transport` carries a sqlx
201/// serialization-failure SQLSTATE. `map_sqlx_error` stringifies the
202/// underlying code; we match defensively on both the raw codes
203/// (40001/40P01) and the symbolic labels.
204fn is_retryable_engine(err: &EngineError) -> bool {
205    match err {
206        EngineError::Transport { source, .. } => {
207            let s = source.to_string();
208            s.contains("40001")
209                || s.contains("40P01")
210                || s.contains("serialization_failure")
211                || s.contains("deadlock_detected")
212        }
213        // `map_sqlx_error` collapses serialization_failure (40001) +
214        // deadlock_detected (40P01) into `Contention(LeaseConflict)`.
215        // Inside a SERIALIZABLE retry loop those are retryable; the
216        // explicit in-body fence-mismatch case is NOT (a bumped epoch
217        // won't unbump on retry). We can't distinguish them by
218        // discriminant alone, so the retry loop treats LeaseConflict
219        // as retryable — a genuine fence mismatch will still bail
220        // after budget exhaustion as RetryExhausted, which callers
221        // reconcile by re-reading exec_core.
222        EngineError::Contention(ContentionKind::LeaseConflict) => true,
223        _ => false,
224    }
225}
226
227/// Run `op` inside a SERIALIZABLE transaction, retrying up to
228/// [`SERIALIZABLE_RETRY_BUDGET`] times on retryable faults. On
229/// exhaustion returns `Contention(RetryExhausted)`.
230async fn run_serializable<T, F>(pool: &PgPool, mut op: F) -> Result<T, EngineError>
231where
232    T: Send,
233    F: for<'a> FnMut(
234            &'a mut Transaction<'_, Postgres>,
235        ) -> std::pin::Pin<
236            Box<dyn std::future::Future<Output = Result<T, EngineError>> + Send + 'a>,
237        > + Send,
238{
239    for _ in 0..SERIALIZABLE_RETRY_BUDGET {
240        let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
241        sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
242            .execute(&mut *tx)
243            .await
244            .map_err(map_sqlx_error)?;
245        let body_res = op(&mut tx).await;
246        match body_res {
247            Ok(v) => match tx.commit().await {
248                Ok(()) => return Ok(v),
249                Err(e) if is_retryable_serialization(&e) => continue,
250                Err(e) => return Err(map_sqlx_error(e)),
251            },
252            Err(e) if is_retryable_engine(&e) => {
253                let _ = tx.rollback().await;
254                continue;
255            }
256            Err(e) => {
257                let _ = tx.rollback().await;
258                return Err(e);
259            }
260        }
261    }
262    Err(EngineError::Contention(ContentionKind::RetryExhausted))
263}
264
265// ─── dedup outcome (de)serialization ─────────────────────────────────────
266
267fn outcome_to_dedup_json(outcome: &SuspendOutcome) -> JsonValue {
268    let details = outcome.details();
269    let extras: Vec<JsonValue> = details
270        .additional_waitpoints
271        .iter()
272        .map(|e| {
273            json!({
274                "waitpoint_id": e.waitpoint_id.to_string(),
275                "waitpoint_key": e.waitpoint_key,
276                "token": e.waitpoint_token.as_str(),
277            })
278        })
279        .collect();
280    let (variant, handle_opaque) = match outcome {
281        SuspendOutcome::Suspended { handle, .. } => {
282            ("Suspended", Some(hex::encode(handle.opaque.as_bytes())))
283        }
284        SuspendOutcome::AlreadySatisfied { .. } => ("AlreadySatisfied", None),
285        _ => ("Suspended", None),
286    };
287    json!({
288        "variant": variant,
289        "details": {
290            "suspension_id": details.suspension_id.to_string(),
291            "waitpoint_id": details.waitpoint_id.to_string(),
292            "waitpoint_key": details.waitpoint_key,
293            "token": details.waitpoint_token.as_str(),
294            "extras": extras,
295        },
296        "handle_opaque_hex": handle_opaque,
297    })
298}
299
300fn outcome_from_dedup_json(v: &JsonValue) -> Result<SuspendOutcome, EngineError> {
301    let corrupt = |s: String| EngineError::Validation {
302        kind: ValidationKind::Corruption,
303        detail: s,
304    };
305    let det = &v["details"];
306    let suspension_id = SuspensionId::parse(det["suspension_id"].as_str().unwrap_or(""))
307        .map_err(|e| corrupt(format!("dedup suspension_id: {e}")))?;
308    let waitpoint_id = WaitpointId::parse(det["waitpoint_id"].as_str().unwrap_or(""))
309        .map_err(|e| corrupt(format!("dedup waitpoint_id: {e}")))?;
310    let waitpoint_key = det["waitpoint_key"].as_str().unwrap_or("").to_owned();
311    let token = det["token"].as_str().unwrap_or("").to_owned();
312    let mut extras: Vec<AdditionalWaitpointBinding> = Vec::new();
313    if let Some(arr) = det["extras"].as_array() {
314        for e in arr {
315            let wid = WaitpointId::parse(e["waitpoint_id"].as_str().unwrap_or(""))
316                .map_err(|err| corrupt(format!("dedup extra wp_id: {err}")))?;
317            let wkey = e["waitpoint_key"].as_str().unwrap_or("").to_owned();
318            let tok = e["token"].as_str().unwrap_or("").to_owned();
319            extras.push(AdditionalWaitpointBinding::new(
320                wid,
321                wkey,
322                WaitpointHmac::new(tok),
323            ));
324        }
325    }
326    let details = SuspendOutcomeDetails::new(
327        suspension_id,
328        waitpoint_id,
329        waitpoint_key,
330        WaitpointHmac::new(token),
331    )
332    .with_additional_waitpoints(extras);
333
334    match v["variant"].as_str().unwrap_or("Suspended") {
335        "AlreadySatisfied" => Ok(SuspendOutcome::AlreadySatisfied { details }),
336        _ => {
337            let opaque_hex = v["handle_opaque_hex"].as_str().unwrap_or("");
338            let bytes = hex::decode(opaque_hex)
339                .map_err(|e| corrupt(format!("dedup handle hex: {e}")))?;
340            let opaque = HandleOpaque::new(bytes.into_boxed_slice());
341            let handle = Handle::new(BackendTag::Postgres, HandleKind::Suspended, opaque);
342            Ok(SuspendOutcome::Suspended { details, handle })
343        }
344    }
345}
346
347// ─── suspend ─────────────────────────────────────────────────────────────
348
349pub(crate) async fn suspend_impl(
350    pool: &PgPool,
351    _partition_config: &PartitionConfig,
352    handle: &Handle,
353    args: SuspendArgs,
354) -> Result<SuspendOutcome, EngineError> {
355    let payload = decode_handle(handle)?;
356    suspend_core(pool, payload, args).await
357}
358
359/// Cairn #322 — service-layer entry point: suspend when the caller
360/// holds a lease fence triple but no `Handle`. Resolves `attempt_index`
361/// from `ff_attempt` by `(exec_id, attempt_id)` then delegates to the
362/// same transactional body used by [`suspend_impl`].
363pub(crate) async fn suspend_by_triple_impl(
364    pool: &PgPool,
365    _partition_config: &PartitionConfig,
366    exec_id: ExecutionId,
367    triple: LeaseFence,
368    args: SuspendArgs,
369) -> Result<SuspendOutcome, EngineError> {
370    let (part, exec_uuid) = split_exec_id(&exec_id)?;
371    // Postgres-side attempts are keyed by `(execution_id, attempt_index)`;
372    // there is no `attempt_id` column on `ff_attempt`. The triple's
373    // `attempt_id` is therefore advisory on this backend — the
374    // authoritative "which attempt" pointer lives on `ff_exec_core`.
375    // Read it outside the serializable body; `suspend_core` re-fences
376    // against `lease_epoch` (`FOR UPDATE`) inside the txn so a racing
377    // attempt-bump or lease-bump surfaces as `Contention(LeaseConflict)`.
378    let row: Option<(i32,)> = sqlx::query_as(
379        "SELECT attempt_index FROM ff_exec_core \
380         WHERE partition_key = $1 AND execution_id = $2",
381    )
382    .bind(part)
383    .bind(exec_uuid)
384    .fetch_optional(pool)
385    .await
386    .map_err(map_sqlx_error)?;
387    let attempt_index_i = match row {
388        Some((i,)) => i,
389        None => return Err(EngineError::NotFound { entity: "execution" }),
390    };
391    let attempt_index =
392        AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
393
394    // Synthesize a HandlePayload — `suspend_core` only reads
395    // `execution_id`, `attempt_index`, and `lease_epoch`; the rest of
396    // the payload fields are carried through to the replayed-outcome
397    // handle encoding (kind = Suspended).
398    let payload = HandlePayload::new(
399        exec_id,
400        attempt_index,
401        triple.attempt_id,
402        triple.lease_id,
403        triple.lease_epoch,
404        0,
405        ff_core::types::LaneId::new(""),
406        ff_core::types::WorkerInstanceId::new(""),
407    );
408    suspend_core(pool, payload, args).await
409}
410
411async fn suspend_core(
412    pool: &PgPool,
413    payload: HandlePayload,
414    args: SuspendArgs,
415) -> Result<SuspendOutcome, EngineError> {
416    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
417    let attempt_index_i = i32::try_from(payload.attempt_index.0).unwrap_or(0);
418    let expected_epoch = payload.lease_epoch.0;
419    let idem_key = args.idempotency_key.as_ref().map(|k| k.as_str().to_owned());
420
421    run_serializable(pool, move |tx| {
422        let args = args.clone();
423        let idem = idem_key.clone();
424        let payload = payload.clone();
425        Box::pin(async move {
426            // 1. Dedup replay check.
427            if let Some(key) = idem.as_deref() {
428                let row: Option<(JsonValue,)> = sqlx::query_as(
429                    "SELECT outcome_json FROM ff_suspend_dedup \
430                     WHERE partition_key = $1 AND idempotency_key = $2",
431                )
432                .bind(part)
433                .bind(key)
434                .fetch_optional(&mut **tx)
435                .await
436                .map_err(map_sqlx_error)?;
437                if let Some((cached,)) = row {
438                    return outcome_from_dedup_json(&cached);
439                }
440            }
441
442            // 2. Fence check against ff_attempt.
443            let epoch_row: Option<(i64,)> = sqlx::query_as(
444                "SELECT lease_epoch FROM ff_attempt \
445                 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3 \
446                 FOR UPDATE",
447            )
448            .bind(part)
449            .bind(exec_uuid)
450            .bind(attempt_index_i)
451            .fetch_optional(&mut **tx)
452            .await
453            .map_err(map_sqlx_error)?;
454            let observed_epoch: u64 = match epoch_row {
455                Some((e,)) => u64::try_from(e).unwrap_or(0),
456                None => return Err(EngineError::NotFound { entity: "attempt" }),
457            };
458            if observed_epoch != expected_epoch {
459                return Err(EngineError::Contention(ContentionKind::LeaseConflict));
460            }
461
462            // 3. Resolve the active HMAC kid inside the txn.
463            let kid_row: Option<(String, Vec<u8>)> = sqlx::query_as(
464                "SELECT kid, secret FROM ff_waitpoint_hmac \
465                 WHERE active = TRUE \
466                 ORDER BY rotated_at_ms DESC LIMIT 1",
467            )
468            .fetch_optional(&mut **tx)
469            .await
470            .map_err(map_sqlx_error)?;
471            let (kid, secret) = kid_row.ok_or_else(|| EngineError::Validation {
472                kind: ValidationKind::InvalidInput,
473                detail: "ff_waitpoint_hmac empty — rotate a kid before suspend".into(),
474            })?;
475
476            // 4. Sign + insert waitpoint_pending for each binding.
477            let now = args.now.0;
478            let mut signed: Vec<(WaitpointId, String, String)> = Vec::new();
479            for binding in args.waitpoints.iter() {
480                let (wp_id, wp_key) = match binding {
481                    WaitpointBinding::Fresh {
482                        waitpoint_id,
483                        waitpoint_key,
484                    } => (waitpoint_id.clone(), waitpoint_key.clone()),
485                    WaitpointBinding::UsePending { waitpoint_id } => {
486                        let row: Option<(String,)> = sqlx::query_as(
487                            "SELECT waitpoint_key FROM ff_waitpoint_pending \
488                             WHERE partition_key = $1 AND waitpoint_id = $2",
489                        )
490                        .bind(part)
491                        .bind(wp_uuid(waitpoint_id)?)
492                        .fetch_optional(&mut **tx)
493                        .await
494                        .map_err(map_sqlx_error)?;
495                        let wp_key = row.map(|(k,)| k).unwrap_or_default();
496                        (waitpoint_id.clone(), wp_key)
497                    }
498                    _ => {
499                        return Err(EngineError::Validation {
500                            kind: ValidationKind::InvalidInput,
501                            detail: "unsupported WaitpointBinding variant".into(),
502                        });
503                    }
504                };
505                let msg = format!("{}:{}", payload.execution_id, wp_id);
506                let token = hmac_sign(&secret, &kid, msg.as_bytes());
507                // RFC-020 §3.1.1 — populate 0011 columns on insert.
508                // `suspend_core` atomically lands the suspension (exec_core
509                // flips to `suspended` in the same txn as this INSERT), so
510                // from any observer's perspective the waitpoint is already
511                // activated at the moment it becomes visible — there is no
512                // separate pending→active transition on Postgres. Write
513                // `state = 'active'` + `activated_at_ms = now` directly.
514                // `required_signal_names` is derived per-waitpoint from the
515                // resume condition; an empty vec denotes the wildcard case
516                // per the `PendingWaitpointInfo::required_signal_names`
517                // contract.
518                let required_names =
519                    derive_required_signal_names(&args.resume_condition, &wp_key);
520                sqlx::query(
521                    "INSERT INTO ff_waitpoint_pending \
522                       (partition_key, waitpoint_id, execution_id, token_kid, token, \
523                        created_at_ms, expires_at_ms, waitpoint_key, \
524                        state, required_signal_names, activated_at_ms) \
525                     VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'active', $9, $6) \
526                     ON CONFLICT (partition_key, waitpoint_id) DO UPDATE SET \
527                       token_kid = EXCLUDED.token_kid, token = EXCLUDED.token, \
528                       waitpoint_key = EXCLUDED.waitpoint_key, \
529                       state = EXCLUDED.state, \
530                       required_signal_names = EXCLUDED.required_signal_names, \
531                       activated_at_ms = EXCLUDED.activated_at_ms",
532                )
533                .bind(part)
534                .bind(wp_uuid(&wp_id)?)
535                .bind(exec_uuid)
536                .bind(&kid)
537                .bind(&token)
538                .bind(now)
539                .bind(args.timeout_at.map(|t| t.0))
540                .bind(&wp_key)
541                .bind(&required_names)
542                .execute(&mut **tx)
543                .await
544                .map_err(map_sqlx_error)?;
545                signed.push((wp_id, wp_key, token));
546            }
547
548            // 5. Insert ff_suspension_current.
549            let condition_json =
550                serde_json::to_value(&args.resume_condition).map_err(|e| {
551                    EngineError::Validation {
552                        kind: ValidationKind::Corruption,
553                        detail: format!("resume_condition serialize: {e}"),
554                    }
555                })?;
556            sqlx::query(
557                "INSERT INTO ff_suspension_current \
558                   (partition_key, execution_id, suspension_id, suspended_at_ms, \
559                    timeout_at_ms, reason_code, condition, satisfied_set, member_map, \
560                    timeout_behavior) \
561                 VALUES ($1, $2, $3, $4, $5, $6, $7, '[]'::jsonb, '{}'::jsonb, $8) \
562                 ON CONFLICT (partition_key, execution_id) DO UPDATE SET \
563                   suspension_id = EXCLUDED.suspension_id, \
564                   suspended_at_ms = EXCLUDED.suspended_at_ms, \
565                   timeout_at_ms = EXCLUDED.timeout_at_ms, \
566                   reason_code = EXCLUDED.reason_code, \
567                   condition = EXCLUDED.condition, \
568                   satisfied_set = '[]'::jsonb, \
569                   member_map = '{}'::jsonb, \
570                   timeout_behavior = EXCLUDED.timeout_behavior",
571            )
572            .bind(part)
573            .bind(exec_uuid)
574            .bind(susp_uuid(&args.suspension_id)?)
575            .bind(now)
576            .bind(args.timeout_at.map(|t| t.0))
577            .bind(args.reason_code.as_wire_str())
578            .bind(&condition_json)
579            .bind(args.timeout_behavior.as_wire_str())
580            .execute(&mut **tx)
581            .await
582            .map_err(map_sqlx_error)?;
583
584            // 6. Transition exec_core to suspended.
585            sqlx::query(
586                "UPDATE ff_exec_core \
587                    SET lifecycle_phase = 'suspended', \
588                        ownership_state = 'released', \
589                        eligibility_state = 'not_applicable', \
590                        public_state = 'suspended', \
591                        attempt_state = 'attempt_interrupted' \
592                  WHERE partition_key = $1 AND execution_id = $2",
593            )
594            .bind(part)
595            .bind(exec_uuid)
596            .execute(&mut **tx)
597            .await
598            .map_err(map_sqlx_error)?;
599
600            // 7. Release + bump lease epoch on ff_attempt.
601            sqlx::query(
602                "UPDATE ff_attempt \
603                    SET worker_id = NULL, \
604                        worker_instance_id = NULL, \
605                        lease_expires_at_ms = NULL, \
606                        lease_epoch = lease_epoch + 1, \
607                        outcome = 'attempt_interrupted' \
608                  WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3",
609            )
610            .bind(part)
611            .bind(exec_uuid)
612            .bind(attempt_index_i)
613            .execute(&mut **tx)
614            .await
615            .map_err(map_sqlx_error)?;
616
617            // RFC-019 Stage B outbox: lease revoked (suspend).
618            lease_event::emit(
619                tx,
620                part,
621                exec_uuid,
622                None,
623                lease_event::EVENT_REVOKED,
624                now,
625            )
626            .await?;
627
628            // 8. Assemble outcome.
629            let (primary_id, primary_key, primary_token) = signed[0].clone();
630            let extras: Vec<AdditionalWaitpointBinding> = signed
631                .iter()
632                .skip(1)
633                .map(|(id, key, tok)| {
634                    AdditionalWaitpointBinding::new(
635                        id.clone(),
636                        key.clone(),
637                        WaitpointHmac::new(tok.clone()),
638                    )
639                })
640                .collect();
641            let details = SuspendOutcomeDetails::new(
642                args.suspension_id.clone(),
643                primary_id,
644                primary_key,
645                WaitpointHmac::new(primary_token),
646            )
647            .with_additional_waitpoints(extras);
648
649            let opaque = encode_opaque(BackendTag::Postgres, &payload);
650            let suspended_handle =
651                Handle::new(BackendTag::Postgres, HandleKind::Suspended, opaque);
652            let outcome = SuspendOutcome::Suspended {
653                details,
654                handle: suspended_handle,
655            };
656
657            // 9. Cache outcome if idempotency key present.
658            if let Some(key) = idem.as_deref() {
659                let cached = outcome_to_dedup_json(&outcome);
660                sqlx::query(
661                    "INSERT INTO ff_suspend_dedup \
662                       (partition_key, idempotency_key, outcome_json, created_at_ms) \
663                     VALUES ($1, $2, $3, $4) \
664                     ON CONFLICT DO NOTHING",
665                )
666                .bind(part)
667                .bind(key)
668                .bind(&cached)
669                .bind(now)
670                .execute(&mut **tx)
671                .await
672                .map_err(map_sqlx_error)?;
673            }
674
675            Ok(outcome)
676        })
677    })
678    .await
679}
680
681// ─── create_waitpoint ────────────────────────────────────────────────────
682
683/// Cairn #435 — caller-initiated waitpoint creation. Mints a fresh
684/// `WaitpointId`, signs a token under the active HMAC kid, and INSERTs
685/// a `pending` row into `ff_waitpoint_pending`. Mirrors
686/// `ff-backend-sqlite`'s `create_waitpoint_impl` + Valkey's
687/// `ff_create_pending_waitpoint` Lua: state stays `'pending'` (no
688/// activation — only `suspend` flips a waitpoint to `'active'`),
689/// `required_signal_names` is empty (wildcard wire marker), and the
690/// token is bound to `"{execution_id}:{waitpoint_id}"` so the
691/// pre-existing `deliver_signal` verify path accepts it unchanged.
692///
693/// Not idempotent by `waitpoint_key` — every call mints a fresh
694/// `WaitpointId`, matching both reference backends. Two calls with
695/// the same key produce two distinct waitpoints.
696pub(crate) async fn create_waitpoint_impl(
697    pool: &PgPool,
698    handle: &Handle,
699    waitpoint_key: &str,
700    expires_in: std::time::Duration,
701) -> Result<PendingWaitpoint, EngineError> {
702    let payload = decode_handle(handle)?;
703    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
704
705    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
706
707    let kid_row: Option<(String, Vec<u8>)> = sqlx::query_as(
708        "SELECT kid, secret FROM ff_waitpoint_hmac \
709         WHERE active = TRUE \
710         ORDER BY rotated_at_ms DESC LIMIT 1",
711    )
712    .fetch_optional(&mut *tx)
713    .await
714    .map_err(map_sqlx_error)?;
715    let (kid, secret) = kid_row.ok_or_else(|| EngineError::Validation {
716        kind: ValidationKind::InvalidInput,
717        detail: "ff_waitpoint_hmac empty — seed a kid before create_waitpoint".into(),
718    })?;
719
720    let wp_id = WaitpointId::new();
721    let wp_u = wp_uuid(&wp_id)?;
722    let now = now_ms();
723    // Clamp oversized `expires_in` to match the other backends:
724    //   * `ff-backend-sqlite::create_waitpoint_impl` uses
725    //     `i64::try_from(...).unwrap_or(i64::MAX)` + `saturating_add`.
726    //   * `ff-backend-valkey` / `ff_create_pending_waitpoint` Lua casts
727    //     with `as i64`.
728    //
729    // A stricter "overflow => Validation" contract would diverge from
730    // both — that tightening needs a one-shot trait-docs + all-three-
731    // backends change, not a PG-only fork. Keep parity until then.
732    let expires_ms = i64::try_from(expires_in.as_millis()).unwrap_or(i64::MAX);
733    let expires_at = now.saturating_add(expires_ms);
734    let msg = format!("{}:{}", payload.execution_id, wp_id);
735    let token = hmac_sign(&secret, &kid, msg.as_bytes());
736
737    // State stays 'pending' — only the suspend path activates. Empty
738    // `required_signal_names` is the wildcard-wire marker per the
739    // `PendingWaitpointInfo` contract.
740    sqlx::query(
741        "INSERT INTO ff_waitpoint_pending \
742           (partition_key, waitpoint_id, execution_id, token_kid, token, \
743            created_at_ms, expires_at_ms, waitpoint_key, state, required_signal_names) \
744         VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'pending', '{}')",
745    )
746    .bind(part)
747    .bind(wp_u)
748    .bind(exec_uuid)
749    .bind(&kid)
750    .bind(&token)
751    .bind(now)
752    .bind(expires_at)
753    .bind(waitpoint_key)
754    .execute(&mut *tx)
755    .await
756    .map_err(map_sqlx_error)?;
757
758    tx.commit().await.map_err(map_sqlx_error)?;
759
760    Ok(PendingWaitpoint::new(wp_id, WaitpointHmac::new(token)))
761}
762
763// ─── deliver_signal ──────────────────────────────────────────────────────
764
765pub(crate) async fn deliver_signal_impl(
766    pool: &PgPool,
767    _partition_config: &PartitionConfig,
768    args: DeliverSignalArgs,
769) -> Result<DeliverSignalResult, EngineError> {
770    let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
771    let wp_u = wp_uuid(&args.waitpoint_id)?;
772
773    run_serializable(pool, move |tx| {
774        let args = args.clone();
775        Box::pin(async move {
776            // 1. Look up pending waitpoint + stored token + kid.
777            let row: Option<(String, String, String, Uuid)> = sqlx::query_as(
778                "SELECT token_kid, token, waitpoint_key, execution_id \
779                   FROM ff_waitpoint_pending \
780                  WHERE partition_key = $1 AND waitpoint_id = $2 \
781                  FOR UPDATE",
782            )
783            .bind(part)
784            .bind(wp_u)
785            .fetch_optional(&mut **tx)
786            .await
787            .map_err(map_sqlx_error)?;
788            let (kid, stored_token, wp_key, stored_exec) = match row {
789                Some(r) => r,
790                None => return Err(EngineError::NotFound { entity: "waitpoint" }),
791            };
792            if stored_exec != exec_uuid {
793                return Err(EngineError::Validation {
794                    kind: ValidationKind::InvalidInput,
795                    detail: "waitpoint belongs to a different execution".into(),
796                });
797            }
798
799            // 2. HMAC verify. Secret comes from the keystore — allows
800            //    post-rotation grace verification when kid is inactive
801            //    but still stored.
802            let secret_row: Option<(Vec<u8>,)> = sqlx::query_as(
803                "SELECT secret FROM ff_waitpoint_hmac WHERE kid = $1",
804            )
805            .bind(&kid)
806            .fetch_optional(&mut **tx)
807            .await
808            .map_err(map_sqlx_error)?;
809            let (secret,) = secret_row.ok_or_else(|| EngineError::Validation {
810                kind: ValidationKind::InvalidInput,
811                detail: format!("kid {kid} missing from keystore"),
812            })?;
813            let presented = args.waitpoint_token.as_str();
814            let msg = format!("{}:{}", args.execution_id, args.waitpoint_id);
815            hmac_verify(&secret, &kid, msg.as_bytes(), presented).map_err(|e| {
816                EngineError::Validation {
817                    kind: ValidationKind::InvalidInput,
818                    detail: format!("waitpoint_token verify: {e}"),
819                }
820            })?;
821            if presented != stored_token {
822                return Err(EngineError::Validation {
823                    kind: ValidationKind::InvalidInput,
824                    detail: "waitpoint_token does not match minted token".into(),
825                });
826            }
827
828            // 3. Load suspension_current (lock the row so concurrent
829            //    signals serialize on this execution).
830            let susp_row: Option<(JsonValue, JsonValue)> = sqlx::query_as(
831                "SELECT condition, member_map FROM ff_suspension_current \
832                  WHERE partition_key = $1 AND execution_id = $2 \
833                  FOR UPDATE",
834            )
835            .bind(part)
836            .bind(exec_uuid)
837            .fetch_optional(&mut **tx)
838            .await
839            .map_err(map_sqlx_error)?;
840            let (condition_json, mut member_map) = match susp_row {
841                Some(r) => r,
842                None => return Err(EngineError::NotFound { entity: "suspension" }),
843            };
844
845            // 4. Append signal into member_map[wp_key].
846            let signal_blob = json!({
847                "signal_id": args.signal_id.to_string(),
848                "signal_name": args.signal_name,
849                "signal_category": args.signal_category,
850                "source_type": args.source_type,
851                "source_identity": args.source_identity,
852                "correlation_id": args.correlation_id.clone().unwrap_or_default(),
853                "accepted_at": args.now.0,
854                "payload_hex": args.payload.as_ref().map(hex::encode),
855            });
856            let map_obj = member_map.as_object_mut().ok_or_else(|| {
857                EngineError::Validation {
858                    kind: ValidationKind::Corruption,
859                    detail: "member_map not a JSON object".into(),
860                }
861            })?;
862            let entry = map_obj.entry(wp_key.clone()).or_insert_with(|| json!([]));
863            entry
864                .as_array_mut()
865                .ok_or_else(|| EngineError::Validation {
866                    kind: ValidationKind::Corruption,
867                    detail: "member_map[wp_key] not a JSON array".into(),
868                })?
869                .push(signal_blob);
870
871            // 5. Evaluate composite condition against the updated map.
872            let condition: ResumeCondition = serde_json::from_value(condition_json)
873                .map_err(|e| EngineError::Validation {
874                    kind: ValidationKind::Corruption,
875                    detail: format!("condition deserialize: {e}"),
876                })?;
877            let signals_by_wp: HashMap<String, Vec<ResumeSignal>> = map_obj
878                .iter()
879                .map(|(k, v)| {
880                    let sigs: Vec<ResumeSignal> = v
881                        .as_array()
882                        .map(|arr| arr.iter().filter_map(resume_signal_from_json).collect())
883                        .unwrap_or_default();
884                    (k.clone(), sigs)
885                })
886                .collect();
887            let borrowed: HashMap<&str, &[ResumeSignal]> = signals_by_wp
888                .iter()
889                .map(|(k, v)| (k.as_str(), v.as_slice()))
890                .collect();
891            let satisfied = evaluate(&condition, &borrowed);
892
893            // 6. Persist member_map (always — both append + satisfy
894            //    cases need the updated view for observe_signals).
895            sqlx::query(
896                "UPDATE ff_suspension_current SET member_map = $1 \
897                  WHERE partition_key = $2 AND execution_id = $3",
898            )
899            .bind(&member_map)
900            .bind(part)
901            .bind(exec_uuid)
902            .execute(&mut **tx)
903            .await
904            .map_err(map_sqlx_error)?;
905
906            let effect = if satisfied {
907                sqlx::query(
908                    "UPDATE ff_exec_core \
909                        SET public_state = 'resumable', \
910                            lifecycle_phase = 'runnable', \
911                            eligibility_state = 'eligible_now' \
912                      WHERE partition_key = $1 AND execution_id = $2",
913                )
914                .bind(part)
915                .bind(exec_uuid)
916                .execute(&mut **tx)
917                .await
918                .map_err(map_sqlx_error)?;
919
920                sqlx::query(
921                    "DELETE FROM ff_waitpoint_pending \
922                      WHERE partition_key = $1 AND execution_id = $2",
923                )
924                .bind(part)
925                .bind(exec_uuid)
926                .execute(&mut **tx)
927                .await
928                .map_err(map_sqlx_error)?;
929
930                sqlx::query(
931                    "INSERT INTO ff_completion_event \
932                       (partition_key, execution_id, outcome, occurred_at_ms) \
933                     VALUES ($1, $2, 'resumable', $3)",
934                )
935                .bind(part)
936                .bind(exec_uuid)
937                .bind(args.now.0)
938                .execute(&mut **tx)
939                .await
940                .map_err(map_sqlx_error)?;
941
942                "resume_condition_satisfied"
943            } else {
944                "appended_to_waitpoint"
945            };
946
947            // RFC-019 Stage B — `ff_signal_event` outbox. Same tx as
948            // the state writes above so the NOTIFY fires iff the
949            // delivery commits.
950            let wp_id_str = args.waitpoint_id.to_string();
951            signal_event::emit(
952                tx,
953                part,
954                exec_uuid,
955                &args.signal_id.to_string(),
956                Some(wp_id_str.as_str()),
957                Some(args.source_identity.as_str()),
958                args.now.0,
959            )
960            .await?;
961
962            Ok(DeliverSignalResult::Accepted {
963                signal_id: args.signal_id.clone(),
964                effect: effect.to_owned(),
965            })
966        })
967    })
968    .await
969}
970
971fn resume_signal_from_json(v: &JsonValue) -> Option<ResumeSignal> {
972    let signal_id = SignalId::parse(v["signal_id"].as_str()?).ok()?;
973    Some(ResumeSignal {
974        signal_id,
975        signal_name: v["signal_name"].as_str()?.to_owned(),
976        signal_category: v["signal_category"].as_str().unwrap_or("").to_owned(),
977        source_type: v["source_type"].as_str().unwrap_or("").to_owned(),
978        source_identity: v["source_identity"].as_str().unwrap_or("").to_owned(),
979        correlation_id: v["correlation_id"].as_str().unwrap_or("").to_owned(),
980        accepted_at: TimestampMs::from_millis(v["accepted_at"].as_i64().unwrap_or(0)),
981        payload: v["payload_hex"].as_str().and_then(|h| hex::decode(h).ok()),
982    })
983}
984
985// ─── claim_resumed_execution ─────────────────────────────────────────────
986
987pub(crate) async fn claim_resumed_execution_impl(
988    pool: &PgPool,
989    _partition_config: &PartitionConfig,
990    args: ClaimResumedExecutionArgs,
991) -> Result<ClaimResumedExecutionResult, EngineError> {
992    let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
993    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
994
995    let row: Option<(String, i32)> = sqlx::query_as(
996        "SELECT public_state, attempt_index FROM ff_exec_core \
997          WHERE partition_key = $1 AND execution_id = $2 \
998          FOR UPDATE",
999    )
1000    .bind(part)
1001    .bind(exec_uuid)
1002    .fetch_optional(&mut *tx)
1003    .await
1004    .map_err(map_sqlx_error)?;
1005    let (public_state, attempt_index_i) = match row {
1006        Some(r) => r,
1007        None => {
1008            tx.rollback().await.ok();
1009            return Err(EngineError::NotFound { entity: "execution" });
1010        }
1011    };
1012    if public_state != "resumable" {
1013        tx.rollback().await.ok();
1014        return Err(EngineError::Contention(
1015            ContentionKind::NotAResumedExecution,
1016        ));
1017    }
1018
1019    let now = now_ms();
1020    let lease_ttl = i64::try_from(args.lease_ttl_ms).unwrap_or(0);
1021    let new_expires = now.saturating_add(lease_ttl);
1022
1023    sqlx::query(
1024        "UPDATE ff_attempt \
1025            SET worker_id = $1, worker_instance_id = $2, \
1026                lease_epoch = lease_epoch + 1, \
1027                lease_expires_at_ms = $3, started_at_ms = $4, outcome = NULL \
1028          WHERE partition_key = $5 AND execution_id = $6 AND attempt_index = $7",
1029    )
1030    .bind(args.worker_id.as_str())
1031    .bind(args.worker_instance_id.as_str())
1032    .bind(new_expires)
1033    .bind(now)
1034    .bind(part)
1035    .bind(exec_uuid)
1036    .bind(attempt_index_i)
1037    .execute(&mut *tx)
1038    .await
1039    .map_err(map_sqlx_error)?;
1040
1041    // #356: started_at_ms is set-once on ff_exec_core; the resume-claim
1042    // path preserves the original first-claim timestamp via COALESCE.
1043    // On a suspended exec that was resumed before 0016 backfill, the
1044    // column may be NULL here — we seed it with `now` as a defensible
1045    // fallback (post-suspension resume is as close to "first observable
1046    // claim" as the column can get in that edge case).
1047    sqlx::query(
1048        "UPDATE ff_exec_core \
1049            SET lifecycle_phase = 'active', ownership_state = 'leased', \
1050                eligibility_state = 'not_applicable', \
1051                public_state = 'running', attempt_state = 'running_attempt', \
1052                started_at_ms = COALESCE(started_at_ms, $3) \
1053          WHERE partition_key = $1 AND execution_id = $2",
1054    )
1055    .bind(part)
1056    .bind(exec_uuid)
1057    .bind(now)
1058    .execute(&mut *tx)
1059    .await
1060    .map_err(map_sqlx_error)?;
1061
1062    let epoch_row: (i64,) = sqlx::query_as(
1063        "SELECT lease_epoch FROM ff_attempt \
1064          WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3",
1065    )
1066    .bind(part)
1067    .bind(exec_uuid)
1068    .bind(attempt_index_i)
1069    .fetch_one(&mut *tx)
1070    .await
1071    .map_err(map_sqlx_error)?;
1072
1073    // RFC-019 Stage B outbox: lease acquired (claim_resumed_execution).
1074    let lease_id_str = args.lease_id.to_string();
1075    lease_event::emit(
1076        &mut tx,
1077        part,
1078        exec_uuid,
1079        Some(&lease_id_str),
1080        lease_event::EVENT_ACQUIRED,
1081        now,
1082    )
1083    .await?;
1084
1085    tx.commit().await.map_err(map_sqlx_error)?;
1086
1087    let attempt_index = AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
1088    let lease_epoch = LeaseEpoch(u64::try_from(epoch_row.0).unwrap_or(0));
1089    let attempt_id = AttemptId::new();
1090
1091    // v0.12 PR-5.5: populate the handle alongside the claim result so
1092    // the SDK's `ClaimedTask::new` no longer synthesises one via
1093    // `ValkeyBackend::encode_handle`. PG encodes a real Postgres-tagged
1094    // handle that ops (`renew`, `progress`, …) can decode on the same
1095    // backend.
1096    let payload = HandlePayload::new(
1097        args.execution_id.clone(),
1098        attempt_index,
1099        attempt_id.clone(),
1100        args.lease_id.clone(),
1101        lease_epoch,
1102        args.lease_ttl_ms,
1103        args.lane_id.clone(),
1104        args.worker_instance_id.clone(),
1105    );
1106    let handle = Handle::new(
1107        BackendTag::Postgres,
1108        HandleKind::Resumed,
1109        encode_opaque(BackendTag::Postgres, &payload),
1110    );
1111
1112    Ok(ClaimResumedExecutionResult::Claimed(
1113        ClaimedResumedExecution::new(
1114            args.execution_id.clone(),
1115            args.lease_id.clone(),
1116            lease_epoch,
1117            attempt_index,
1118            attempt_id,
1119            TimestampMs::from_millis(new_expires),
1120            handle,
1121        ),
1122    ))
1123}
1124
1125// ─── observe_signals ─────────────────────────────────────────────────────
1126
1127pub(crate) async fn observe_signals_impl(
1128    pool: &PgPool,
1129    handle: &Handle,
1130) -> Result<Vec<ResumeSignal>, EngineError> {
1131    let payload = decode_handle(handle)?;
1132    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
1133
1134    let row: Option<(JsonValue,)> = sqlx::query_as(
1135        "SELECT member_map FROM ff_suspension_current \
1136          WHERE partition_key = $1 AND execution_id = $2",
1137    )
1138    .bind(part)
1139    .bind(exec_uuid)
1140    .fetch_optional(pool)
1141    .await
1142    .map_err(map_sqlx_error)?;
1143
1144    let Some((member_map,)) = row else {
1145        return Ok(Vec::new());
1146    };
1147    let mut out: Vec<ResumeSignal> = Vec::new();
1148    if let Some(map) = member_map.as_object() {
1149        for (_wp_key, arr) in map {
1150            if let Some(sigs) = arr.as_array() {
1151                for v in sigs {
1152                    if let Some(s) = resume_signal_from_json(v) {
1153                        out.push(s);
1154                    }
1155                }
1156            }
1157        }
1158    }
1159    Ok(out)
1160}
1161
1162// ─── list_pending_waitpoints ─────────────────────────────────────────────
1163
1164/// RFC-020 §4.5 — read-only projection of pending-or-active waitpoints
1165/// for a given execution. SQL parity with Valkey's SSCAN + 2× HMGET
1166/// shape: single-table scan of `ff_waitpoint_pending` with cursor
1167/// `(waitpoint_id > $after ORDER BY waitpoint_id LIMIT $limit+1)`,
1168/// surfaces the `PendingWaitpointInfo` 10-field contract.
1169/// `token_fingerprint` is computed from the stored `<kid>:<hex>` token
1170/// — the raw token never crosses the trait boundary per RFC-017 Stage
1171/// D1 / §8.
1172///
1173/// Pre-read existence check on `ff_exec_core` mirrors Valkey's
1174/// `EXISTS exec_core` so a non-existent execution surfaces `NotFound`
1175/// rather than an empty page.
1176///
1177/// Rows with `state NOT IN ('pending', 'active')` (e.g. `'closed'`)
1178/// are filtered server-side to match Valkey's client-side keep-filter.
1179pub(crate) async fn list_pending_waitpoints_impl(
1180    pool: &PgPool,
1181    args: ListPendingWaitpointsArgs,
1182) -> Result<ListPendingWaitpointsResult, EngineError> {
1183    const DEFAULT_LIMIT: u32 = 100;
1184    const MAX_LIMIT: u32 = 1000;
1185
1186    let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
1187    let limit = args.limit.unwrap_or(DEFAULT_LIMIT).clamp(1, MAX_LIMIT) as i64;
1188    let after_uuid = match args.after.as_ref() {
1189        Some(wp) => Some(wp_uuid(wp)?),
1190        None => None,
1191    };
1192
1193    // Existence probe — a non-existent execution is `NotFound`, not an
1194    // empty page. Matches Valkey's `EXISTS exec_core` pre-check.
1195    let exists: Option<(i16,)> = sqlx::query_as(
1196        "SELECT 1::smallint FROM ff_exec_core \
1197          WHERE partition_key = $1 AND execution_id = $2",
1198    )
1199    .bind(part)
1200    .bind(exec_uuid)
1201    .fetch_optional(pool)
1202    .await
1203    .map_err(map_sqlx_error)?;
1204    if exists.is_none() {
1205        return Err(EngineError::NotFound { entity: "execution" });
1206    }
1207
1208    // Page: request `limit + 1` so we can detect "more to come" without
1209    // a second round-trip. Row tuple order matches the SELECT below:
1210    // (waitpoint_id, waitpoint_key, state, required_signal_names,
1211    //  created_at_ms, activated_at_ms, expires_at_ms, token_kid, token).
1212    // Raw token is fingerprinted client-side — never returned across
1213    // the trait boundary per RFC-017 Stage D1 / §8.
1214    type Row = (
1215        Uuid,
1216        String,
1217        String,
1218        Vec<String>,
1219        i64,
1220        Option<i64>,
1221        Option<i64>,
1222        String,
1223        String,
1224    );
1225    let rows: Vec<Row> = sqlx::query_as(
1226        "SELECT waitpoint_id, waitpoint_key, state, required_signal_names, \
1227                created_at_ms, activated_at_ms, expires_at_ms, token_kid, token \
1228           FROM ff_waitpoint_pending \
1229          WHERE partition_key = $1 \
1230            AND execution_id = $2 \
1231            AND state IN ('pending', 'active') \
1232            AND ($3::uuid IS NULL OR waitpoint_id > $3) \
1233          ORDER BY waitpoint_id \
1234          LIMIT $4",
1235    )
1236    .bind(part)
1237    .bind(exec_uuid)
1238    .bind(after_uuid)
1239    .bind(limit + 1)
1240    .fetch_all(pool)
1241    .await
1242    .map_err(map_sqlx_error)?;
1243
1244    let has_more = rows.len() as i64 > limit;
1245    let take_n = if has_more { limit as usize } else { rows.len() };
1246
1247    let mut entries: Vec<PendingWaitpointInfo> = Vec::with_capacity(take_n);
1248    for (wp_uid, wp_key, state, req_names, created_ms, activated_ms, expires_ms, _kid, token)
1249        in rows.into_iter().take(take_n)
1250    {
1251        let wp_id = WaitpointId::from_uuid(wp_uid);
1252        // Parse stored `<kid>:<hex>` into audit-safe pair. The `token_kid`
1253        // column is redundant with the parsed kid — we prefer the parsed
1254        // one so the surface stays byte-identical to Valkey's.
1255        let (token_kid, token_fingerprint) = parse_waitpoint_token_kid_fp(&token);
1256        let mut info = PendingWaitpointInfo::new(
1257            wp_id,
1258            wp_key,
1259            state,
1260            TimestampMs(created_ms),
1261            args.execution_id.clone(),
1262            token_kid,
1263            token_fingerprint,
1264        );
1265        if !req_names.is_empty() {
1266            info = info.with_required_signal_names(req_names);
1267        }
1268        if let Some(ms) = activated_ms {
1269            info = info.with_activated_at(TimestampMs(ms));
1270        }
1271        if let Some(ms) = expires_ms {
1272            info = info.with_expires_at(TimestampMs(ms));
1273        }
1274        entries.push(info);
1275    }
1276
1277    let next_cursor = if has_more {
1278        entries.last().map(|e| e.waitpoint_id.clone())
1279    } else {
1280        None
1281    };
1282    let mut result = ListPendingWaitpointsResult::new(entries);
1283    if let Some(cursor) = next_cursor {
1284        result = result.with_next_cursor(cursor);
1285    }
1286    Ok(result)
1287}
1288
1289/// Point-read of a waitpoint's HMAC token from `ff_waitpoint_pending`
1290/// keyed by `(partition_key, waitpoint_id)`. Used by cairn's
1291/// signal-bridge to authenticate signal-resume requests on Postgres
1292/// deployments.
1293///
1294/// Missing row → `Ok(None)` (signal may legitimately arrive after
1295/// the waitpoint has been consumed or expired). Empty `token` →
1296/// `Ok(None)` for parity with the Valkey HGET mapping, even though
1297/// the column is `NOT NULL` and Postgres writers always populate it.
1298pub(crate) async fn read_waitpoint_token_impl(
1299    pool: &PgPool,
1300    partition: &ff_core::partition::PartitionKey,
1301    waitpoint_id: &WaitpointId,
1302) -> Result<Option<String>, EngineError> {
1303    let part: i16 = partition
1304        .parse()
1305        .map_err(|e| EngineError::Validation {
1306            kind: ValidationKind::InvalidInput,
1307            detail: format!("partition_key: {e}"),
1308        })?
1309        .index as i16;
1310    let wp_uuid = wp_uuid(waitpoint_id)?;
1311    let row: Option<(String,)> = sqlx::query_as(
1312        "SELECT token FROM ff_waitpoint_pending \
1313         WHERE partition_key = $1 AND waitpoint_id = $2 \
1314         LIMIT 1",
1315    )
1316    .bind(part)
1317    .bind(wp_uuid)
1318    .fetch_optional(pool)
1319    .await
1320    .map_err(map_sqlx_error)?;
1321    Ok(row.map(|(t,)| t).filter(|s| !s.is_empty()))
1322}
1323
1324// ─── deliver_approval_signal ─────────────────────────────────────────────
1325//
1326// Cairn #454 Phase 4b (Postgres parity with the Valkey PR #465 body).
1327//
1328// The operator path never carries the waitpoint HMAC token — the
1329// backend reads it server-side from `ff_waitpoint_pending`, verifies
1330// the caller's `lane_id` matches `ff_exec_core.lane_id`, composes a
1331// `DeliverSignalArgs` (signal_category="approval" / source_type=
1332// "operator" / target_scope="execution" / idempotency_key=
1333// "approval:<signal>:<suffix>"), and dispatches through the shared
1334// `deliver_signal_impl`. The inner body re-verifies the HMAC token
1335// we just read, and its SERIALIZABLE tx enforces dedup / capacity
1336// invariants unchanged.
1337
1338pub(crate) async fn deliver_approval_signal_impl(
1339    pool: &PgPool,
1340    partition_config: &PartitionConfig,
1341    args: DeliverApprovalSignalArgs,
1342) -> Result<DeliverSignalResult, EngineError> {
1343    let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
1344
1345    // 1. Authoritative lane_id comes from exec_core, not the caller.
1346    //    Matches the Valkey HGET lane_id guard (PR #465 Copilot review).
1347    let lane_row: Option<(String,)> = sqlx::query_as(
1348        "SELECT lane_id FROM ff_exec_core \
1349         WHERE partition_key = $1 AND execution_id = $2",
1350    )
1351    .bind(part)
1352    .bind(exec_uuid)
1353    .fetch_optional(pool)
1354    .await
1355    .map_err(map_sqlx_error)?;
1356    let authoritative_lane = match lane_row {
1357        Some((s,)) => LaneId::new(s),
1358        None => {
1359            return Err(EngineError::NotFound {
1360                entity: "execution",
1361            });
1362        }
1363    };
1364    if authoritative_lane.as_str() != args.lane_id.as_str() {
1365        return Err(EngineError::Validation {
1366            kind: ValidationKind::InvalidInput,
1367            detail: format!(
1368                "lane_mismatch: args.lane_id={} exec_core.lane_id={}",
1369                args.lane_id.as_str(),
1370                authoritative_lane.as_str()
1371            ),
1372        });
1373    }
1374
1375    // 2. Server-side token lookup. Missing row → NotFound(waitpoint).
1376    let partition = ff_core::partition::Partition {
1377        family: ff_core::partition::PartitionFamily::Execution,
1378        index: part as u16,
1379    };
1380    let pkey = ff_core::partition::PartitionKey::from(&partition);
1381    let token_str = read_waitpoint_token_impl(pool, &pkey, &args.waitpoint_id)
1382        .await?
1383        .ok_or(EngineError::NotFound {
1384            entity: "waitpoint",
1385        })?;
1386
1387    // 3. Compose DeliverSignalArgs and dispatch. Single `now` sample
1388    //    reused for `created_at` + `now` (PR #465 gemini review).
1389    let now = TimestampMs::now();
1390    let idempotency_key =
1391        format!("approval:{}:{}", args.signal_name, args.idempotency_suffix);
1392    let ds = DeliverSignalArgs {
1393        execution_id: args.execution_id.clone(),
1394        waitpoint_id: args.waitpoint_id.clone(),
1395        signal_id: SignalId::new(),
1396        signal_name: args.signal_name.clone(),
1397        signal_category: "approval".to_owned(),
1398        source_type: "operator".to_owned(),
1399        source_identity: String::new(),
1400        payload: None,
1401        payload_encoding: None,
1402        correlation_id: None,
1403        idempotency_key: Some(idempotency_key),
1404        target_scope: "execution".to_owned(),
1405        created_at: Some(now),
1406        dedup_ttl_ms: Some(args.signal_dedup_ttl_ms),
1407        resume_delay_ms: None,
1408        max_signals_per_execution: args.max_signals_per_execution,
1409        signal_maxlen: args.maxlen,
1410        waitpoint_token: WaitpointToken::new(token_str),
1411        now,
1412    };
1413
1414    deliver_signal_impl(pool, partition_config, ds).await
1415}