Skip to main content

ff_backend_postgres/
suspend_ops.rs

1//! Suspend / deliver_signal / claim_resumed / observe_signals SQL bodies.
2//!
3//! **Wave 4d follow-up.** Builds on PR #238's primitives:
4//!
5//! * [`crate::signal::hmac_sign`] / [`crate::signal::hmac_verify`]
6//! * [`crate::signal::SERIALIZABLE_RETRY_BUDGET`] +
7//!   [`crate::signal::is_retryable_serialization`]
8//! * [`crate::suspend::evaluate`] — composite-condition evaluator
9//!
10//! HMAC signing, retry looping, and composite evaluation are never
11//! re-implemented here.
12//!
13//! # Isolation
14//!
15//! `suspend` + `deliver_signal` run at SERIALIZABLE with a 3-attempt
16//! retry budget (Q11). On retry exhaustion they surface
17//! `Contention(RetryExhausted)`. `claim_resumed_execution` +
18//! `observe_signals` stay at READ COMMITTED (row-level `FOR UPDATE`
19//! for the former, read-only for the latter).
20
21use std::collections::HashMap;
22use std::time::{SystemTime, UNIX_EPOCH};
23
24use ff_core::backend::{BackendTag, Handle, HandleKind, HandleOpaque, ResumeSignal, WaitpointHmac};
25use ff_core::contracts::{
26    AdditionalWaitpointBinding, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
27    ClaimedResumedExecution, CompositeBody, DeliverSignalArgs, DeliverSignalResult,
28    ListPendingWaitpointsArgs, ListPendingWaitpointsResult, PendingWaitpointInfo, ResumeCondition,
29    SignalMatcher, SuspendArgs, SuspendOutcome, SuspendOutcomeDetails, WaitpointBinding,
30};
31use ff_core::engine_error::{ContentionKind, EngineError, ValidationKind};
32use ff_core::handle_codec::{encode as encode_opaque, HandlePayload};
33use ff_core::partition::PartitionConfig;
34use ff_core::types::{
35    AttemptId, AttemptIndex, ExecutionId, LeaseEpoch, LeaseFence, SignalId, SuspensionId,
36    TimestampMs, WaitpointId,
37};
38use serde_json::{json, Value as JsonValue};
39use sqlx::{PgPool, Postgres, Transaction};
40use uuid::Uuid;
41
42use crate::error::map_sqlx_error;
43use crate::lease_event;
44use crate::signal::{hmac_sign, hmac_verify, is_retryable_serialization, SERIALIZABLE_RETRY_BUDGET};
45use crate::signal_event;
46use crate::suspend::evaluate;
47
48// ─── small shared helpers ────────────────────────────────────────────────
49
50fn now_ms() -> i64 {
51    let d = SystemTime::now()
52        .duration_since(UNIX_EPOCH)
53        .unwrap_or_default();
54    (d.as_millis() as i64).max(0)
55}
56
57fn split_exec_id(eid: &ExecutionId) -> Result<(i16, Uuid), EngineError> {
58    let s = eid.as_str();
59    let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
60        kind: ValidationKind::InvalidInput,
61        detail: format!("execution_id missing `{{fp:` prefix: {s}"),
62    })?;
63    let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
64        kind: ValidationKind::InvalidInput,
65        detail: format!("execution_id missing `}}:`: {s}"),
66    })?;
67    let part: i16 = rest[..close]
68        .parse()
69        .map_err(|_| EngineError::Validation {
70            kind: ValidationKind::InvalidInput,
71            detail: format!("execution_id partition index not u16: {s}"),
72        })?;
73    let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
74        kind: ValidationKind::InvalidInput,
75        detail: format!("execution_id UUID invalid: {s}"),
76    })?;
77    Ok((part, uuid))
78}
79
80fn decode_handle(handle: &Handle) -> Result<HandlePayload, EngineError> {
81    if handle.backend != BackendTag::Postgres {
82        return Err(EngineError::Validation {
83            kind: ValidationKind::HandleFromOtherBackend,
84            detail: format!("expected Postgres, got {:?}", handle.backend),
85        });
86    }
87    let decoded = ff_core::handle_codec::decode(&handle.opaque)?;
88    if decoded.tag != BackendTag::Postgres {
89        return Err(EngineError::Validation {
90            kind: ValidationKind::HandleFromOtherBackend,
91            detail: format!("embedded tag {:?}", decoded.tag),
92        });
93    }
94    Ok(decoded.payload)
95}
96
97fn wp_uuid(w: &WaitpointId) -> Result<Uuid, EngineError> {
98    Uuid::parse_str(&w.to_string()).map_err(|e| EngineError::Validation {
99        kind: ValidationKind::InvalidInput,
100        detail: format!("waitpoint_id not a UUID: {e}"),
101    })
102}
103
104fn susp_uuid(s: &SuspensionId) -> Result<Uuid, EngineError> {
105    Uuid::parse_str(&s.to_string()).map_err(|e| EngineError::Validation {
106        kind: ValidationKind::InvalidInput,
107        detail: format!("suspension_id not a UUID: {e}"),
108    })
109}
110
111/// RFC-020 §3.1.1 — derive `required_signal_names` for a single
112/// `waitpoint_key` from the suspend call's `ResumeCondition`. Walks
113/// the condition tree and collects every `SignalMatcher::ByName` that
114/// targets this specific waitpoint. `Wildcard` matchers contribute
115/// nothing (an empty result vec is the wire-level wildcard marker per
116/// `PendingWaitpointInfo::required_signal_names` docs).
117///
118/// `OperatorOnly` / `TimeoutOnly` are rendered with the same sentinel
119/// names that the Valkey wire format emits (`__operator_only__` /
120/// `__timeout_only__`, see `ff-backend-valkey/src/lib.rs:3205-3222`)
121/// so an operator-only or timeout-only waitpoint is distinguishable
122/// from a true wildcard at the `PendingWaitpointInfo` surface. The
123/// `__`-prefix sentinel values are never real signal names.
124///
125/// `Count { matcher: None }` returns empty (any signal on the listed
126/// waitpoints counts). Duplicates are de-duplicated preserving
127/// first-seen order.
128fn derive_required_signal_names(cond: &ResumeCondition, wp_key: &str) -> Vec<String> {
129    const OPERATOR_ONLY_SENTINEL: &str = "__operator_only__";
130    const TIMEOUT_ONLY_SENTINEL: &str = "__timeout_only__";
131
132    let mut out: Vec<String> = Vec::new();
133    let mut push = |name: &str| {
134        if !name.is_empty() && !out.iter().any(|e| e == name) {
135            out.push(name.to_owned());
136        }
137    };
138    fn walk(cond: &ResumeCondition, target: &str, push: &mut dyn FnMut(&str)) {
139        match cond {
140            ResumeCondition::Single {
141                waitpoint_key,
142                matcher,
143            } => {
144                if waitpoint_key == target
145                    && let SignalMatcher::ByName(name) = matcher
146                {
147                    push(name.as_str());
148                }
149            }
150            ResumeCondition::OperatorOnly => push(OPERATOR_ONLY_SENTINEL),
151            ResumeCondition::TimeoutOnly => push(TIMEOUT_ONLY_SENTINEL),
152            ResumeCondition::Composite(body) => walk_body(body, target, push),
153            _ => {}
154        }
155    }
156    fn walk_body(body: &CompositeBody, target: &str, push: &mut dyn FnMut(&str)) {
157        match body {
158            CompositeBody::AllOf { members } => {
159                for m in members {
160                    walk(m, target, push);
161                }
162            }
163            CompositeBody::Count {
164                matcher, waitpoints, ..
165            } => {
166                if waitpoints.iter().any(|w| w == target)
167                    && let Some(SignalMatcher::ByName(name)) = matcher
168                {
169                    push(name.as_str());
170                }
171            }
172            _ => {}
173        }
174    }
175    walk(cond, wp_key, &mut push);
176    out
177}
178
179/// RFC-017 §8 / RFC-020 §4.5 — parse the stored `<kid>:<hex>` waitpoint
180/// token into a `(token_kid, token_fingerprint)` pair. `token_fingerprint`
181/// is the first 16 hex chars (8 bytes) of the HMAC digest — the §8 audit-
182/// friendly handle. Malformed input collapses to `("", "")` so callers
183/// can skip / log without surfacing a typed error. Mirrors
184/// `ff-backend-valkey::parse_waitpoint_token_kid_fp`.
185fn parse_waitpoint_token_kid_fp(raw: &str) -> (String, String) {
186    match raw.split_once(':') {
187        Some((kid, hex)) if !kid.is_empty() && !hex.is_empty() => {
188            let fp_len = hex.len().min(16);
189            (kid.to_owned(), hex[..fp_len].to_owned())
190        }
191        _ => (String::new(), String::new()),
192    }
193}
194
195// ─── SERIALIZABLE retry loop ─────────────────────────────────────────────
196
197/// Return true if an `EngineError::Transport` carries a sqlx
198/// serialization-failure SQLSTATE. `map_sqlx_error` stringifies the
199/// underlying code; we match defensively on both the raw codes
200/// (40001/40P01) and the symbolic labels.
201fn is_retryable_engine(err: &EngineError) -> bool {
202    match err {
203        EngineError::Transport { source, .. } => {
204            let s = source.to_string();
205            s.contains("40001")
206                || s.contains("40P01")
207                || s.contains("serialization_failure")
208                || s.contains("deadlock_detected")
209        }
210        // `map_sqlx_error` collapses serialization_failure (40001) +
211        // deadlock_detected (40P01) into `Contention(LeaseConflict)`.
212        // Inside a SERIALIZABLE retry loop those are retryable; the
213        // explicit in-body fence-mismatch case is NOT (a bumped epoch
214        // won't unbump on retry). We can't distinguish them by
215        // discriminant alone, so the retry loop treats LeaseConflict
216        // as retryable — a genuine fence mismatch will still bail
217        // after budget exhaustion as RetryExhausted, which callers
218        // reconcile by re-reading exec_core.
219        EngineError::Contention(ContentionKind::LeaseConflict) => true,
220        _ => false,
221    }
222}
223
224/// Run `op` inside a SERIALIZABLE transaction, retrying up to
225/// [`SERIALIZABLE_RETRY_BUDGET`] times on retryable faults. On
226/// exhaustion returns `Contention(RetryExhausted)`.
227async fn run_serializable<T, F>(pool: &PgPool, mut op: F) -> Result<T, EngineError>
228where
229    T: Send,
230    F: for<'a> FnMut(
231            &'a mut Transaction<'_, Postgres>,
232        ) -> std::pin::Pin<
233            Box<dyn std::future::Future<Output = Result<T, EngineError>> + Send + 'a>,
234        > + Send,
235{
236    for _ in 0..SERIALIZABLE_RETRY_BUDGET {
237        let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
238        sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
239            .execute(&mut *tx)
240            .await
241            .map_err(map_sqlx_error)?;
242        let body_res = op(&mut tx).await;
243        match body_res {
244            Ok(v) => match tx.commit().await {
245                Ok(()) => return Ok(v),
246                Err(e) if is_retryable_serialization(&e) => continue,
247                Err(e) => return Err(map_sqlx_error(e)),
248            },
249            Err(e) if is_retryable_engine(&e) => {
250                let _ = tx.rollback().await;
251                continue;
252            }
253            Err(e) => {
254                let _ = tx.rollback().await;
255                return Err(e);
256            }
257        }
258    }
259    Err(EngineError::Contention(ContentionKind::RetryExhausted))
260}
261
262// ─── dedup outcome (de)serialization ─────────────────────────────────────
263
264fn outcome_to_dedup_json(outcome: &SuspendOutcome) -> JsonValue {
265    let details = outcome.details();
266    let extras: Vec<JsonValue> = details
267        .additional_waitpoints
268        .iter()
269        .map(|e| {
270            json!({
271                "waitpoint_id": e.waitpoint_id.to_string(),
272                "waitpoint_key": e.waitpoint_key,
273                "token": e.waitpoint_token.as_str(),
274            })
275        })
276        .collect();
277    let (variant, handle_opaque) = match outcome {
278        SuspendOutcome::Suspended { handle, .. } => {
279            ("Suspended", Some(hex::encode(handle.opaque.as_bytes())))
280        }
281        SuspendOutcome::AlreadySatisfied { .. } => ("AlreadySatisfied", None),
282        _ => ("Suspended", None),
283    };
284    json!({
285        "variant": variant,
286        "details": {
287            "suspension_id": details.suspension_id.to_string(),
288            "waitpoint_id": details.waitpoint_id.to_string(),
289            "waitpoint_key": details.waitpoint_key,
290            "token": details.waitpoint_token.as_str(),
291            "extras": extras,
292        },
293        "handle_opaque_hex": handle_opaque,
294    })
295}
296
297fn outcome_from_dedup_json(v: &JsonValue) -> Result<SuspendOutcome, EngineError> {
298    let corrupt = |s: String| EngineError::Validation {
299        kind: ValidationKind::Corruption,
300        detail: s,
301    };
302    let det = &v["details"];
303    let suspension_id = SuspensionId::parse(det["suspension_id"].as_str().unwrap_or(""))
304        .map_err(|e| corrupt(format!("dedup suspension_id: {e}")))?;
305    let waitpoint_id = WaitpointId::parse(det["waitpoint_id"].as_str().unwrap_or(""))
306        .map_err(|e| corrupt(format!("dedup waitpoint_id: {e}")))?;
307    let waitpoint_key = det["waitpoint_key"].as_str().unwrap_or("").to_owned();
308    let token = det["token"].as_str().unwrap_or("").to_owned();
309    let mut extras: Vec<AdditionalWaitpointBinding> = Vec::new();
310    if let Some(arr) = det["extras"].as_array() {
311        for e in arr {
312            let wid = WaitpointId::parse(e["waitpoint_id"].as_str().unwrap_or(""))
313                .map_err(|err| corrupt(format!("dedup extra wp_id: {err}")))?;
314            let wkey = e["waitpoint_key"].as_str().unwrap_or("").to_owned();
315            let tok = e["token"].as_str().unwrap_or("").to_owned();
316            extras.push(AdditionalWaitpointBinding::new(
317                wid,
318                wkey,
319                WaitpointHmac::new(tok),
320            ));
321        }
322    }
323    let details = SuspendOutcomeDetails::new(
324        suspension_id,
325        waitpoint_id,
326        waitpoint_key,
327        WaitpointHmac::new(token),
328    )
329    .with_additional_waitpoints(extras);
330
331    match v["variant"].as_str().unwrap_or("Suspended") {
332        "AlreadySatisfied" => Ok(SuspendOutcome::AlreadySatisfied { details }),
333        _ => {
334            let opaque_hex = v["handle_opaque_hex"].as_str().unwrap_or("");
335            let bytes = hex::decode(opaque_hex)
336                .map_err(|e| corrupt(format!("dedup handle hex: {e}")))?;
337            let opaque = HandleOpaque::new(bytes.into_boxed_slice());
338            let handle = Handle::new(BackendTag::Postgres, HandleKind::Suspended, opaque);
339            Ok(SuspendOutcome::Suspended { details, handle })
340        }
341    }
342}
343
344// ─── suspend ─────────────────────────────────────────────────────────────
345
346pub(crate) async fn suspend_impl(
347    pool: &PgPool,
348    _partition_config: &PartitionConfig,
349    handle: &Handle,
350    args: SuspendArgs,
351) -> Result<SuspendOutcome, EngineError> {
352    let payload = decode_handle(handle)?;
353    suspend_core(pool, payload, args).await
354}
355
356/// Cairn #322 — service-layer entry point: suspend when the caller
357/// holds a lease fence triple but no `Handle`. Resolves `attempt_index`
358/// from `ff_attempt` by `(exec_id, attempt_id)` then delegates to the
359/// same transactional body used by [`suspend_impl`].
360pub(crate) async fn suspend_by_triple_impl(
361    pool: &PgPool,
362    _partition_config: &PartitionConfig,
363    exec_id: ExecutionId,
364    triple: LeaseFence,
365    args: SuspendArgs,
366) -> Result<SuspendOutcome, EngineError> {
367    let (part, exec_uuid) = split_exec_id(&exec_id)?;
368    // Postgres-side attempts are keyed by `(execution_id, attempt_index)`;
369    // there is no `attempt_id` column on `ff_attempt`. The triple's
370    // `attempt_id` is therefore advisory on this backend — the
371    // authoritative "which attempt" pointer lives on `ff_exec_core`.
372    // Read it outside the serializable body; `suspend_core` re-fences
373    // against `lease_epoch` (`FOR UPDATE`) inside the txn so a racing
374    // attempt-bump or lease-bump surfaces as `Contention(LeaseConflict)`.
375    let row: Option<(i32,)> = sqlx::query_as(
376        "SELECT attempt_index FROM ff_exec_core \
377         WHERE partition_key = $1 AND execution_id = $2",
378    )
379    .bind(part)
380    .bind(exec_uuid)
381    .fetch_optional(pool)
382    .await
383    .map_err(map_sqlx_error)?;
384    let attempt_index_i = match row {
385        Some((i,)) => i,
386        None => return Err(EngineError::NotFound { entity: "execution" }),
387    };
388    let attempt_index =
389        AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
390
391    // Synthesize a HandlePayload — `suspend_core` only reads
392    // `execution_id`, `attempt_index`, and `lease_epoch`; the rest of
393    // the payload fields are carried through to the replayed-outcome
394    // handle encoding (kind = Suspended).
395    let payload = HandlePayload::new(
396        exec_id,
397        attempt_index,
398        triple.attempt_id,
399        triple.lease_id,
400        triple.lease_epoch,
401        0,
402        ff_core::types::LaneId::new(""),
403        ff_core::types::WorkerInstanceId::new(""),
404    );
405    suspend_core(pool, payload, args).await
406}
407
408async fn suspend_core(
409    pool: &PgPool,
410    payload: HandlePayload,
411    args: SuspendArgs,
412) -> Result<SuspendOutcome, EngineError> {
413    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
414    let attempt_index_i = i32::try_from(payload.attempt_index.0).unwrap_or(0);
415    let expected_epoch = payload.lease_epoch.0;
416    let idem_key = args.idempotency_key.as_ref().map(|k| k.as_str().to_owned());
417
418    run_serializable(pool, move |tx| {
419        let args = args.clone();
420        let idem = idem_key.clone();
421        let payload = payload.clone();
422        Box::pin(async move {
423            // 1. Dedup replay check.
424            if let Some(key) = idem.as_deref() {
425                let row: Option<(JsonValue,)> = sqlx::query_as(
426                    "SELECT outcome_json FROM ff_suspend_dedup \
427                     WHERE partition_key = $1 AND idempotency_key = $2",
428                )
429                .bind(part)
430                .bind(key)
431                .fetch_optional(&mut **tx)
432                .await
433                .map_err(map_sqlx_error)?;
434                if let Some((cached,)) = row {
435                    return outcome_from_dedup_json(&cached);
436                }
437            }
438
439            // 2. Fence check against ff_attempt.
440            let epoch_row: Option<(i64,)> = sqlx::query_as(
441                "SELECT lease_epoch FROM ff_attempt \
442                 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3 \
443                 FOR UPDATE",
444            )
445            .bind(part)
446            .bind(exec_uuid)
447            .bind(attempt_index_i)
448            .fetch_optional(&mut **tx)
449            .await
450            .map_err(map_sqlx_error)?;
451            let observed_epoch: u64 = match epoch_row {
452                Some((e,)) => u64::try_from(e).unwrap_or(0),
453                None => return Err(EngineError::NotFound { entity: "attempt" }),
454            };
455            if observed_epoch != expected_epoch {
456                return Err(EngineError::Contention(ContentionKind::LeaseConflict));
457            }
458
459            // 3. Resolve the active HMAC kid inside the txn.
460            let kid_row: Option<(String, Vec<u8>)> = sqlx::query_as(
461                "SELECT kid, secret FROM ff_waitpoint_hmac \
462                 WHERE active = TRUE \
463                 ORDER BY rotated_at_ms DESC LIMIT 1",
464            )
465            .fetch_optional(&mut **tx)
466            .await
467            .map_err(map_sqlx_error)?;
468            let (kid, secret) = kid_row.ok_or_else(|| EngineError::Validation {
469                kind: ValidationKind::InvalidInput,
470                detail: "ff_waitpoint_hmac empty — rotate a kid before suspend".into(),
471            })?;
472
473            // 4. Sign + insert waitpoint_pending for each binding.
474            let now = args.now.0;
475            let mut signed: Vec<(WaitpointId, String, String)> = Vec::new();
476            for binding in args.waitpoints.iter() {
477                let (wp_id, wp_key) = match binding {
478                    WaitpointBinding::Fresh {
479                        waitpoint_id,
480                        waitpoint_key,
481                    } => (waitpoint_id.clone(), waitpoint_key.clone()),
482                    WaitpointBinding::UsePending { waitpoint_id } => {
483                        let row: Option<(String,)> = sqlx::query_as(
484                            "SELECT waitpoint_key FROM ff_waitpoint_pending \
485                             WHERE partition_key = $1 AND waitpoint_id = $2",
486                        )
487                        .bind(part)
488                        .bind(wp_uuid(waitpoint_id)?)
489                        .fetch_optional(&mut **tx)
490                        .await
491                        .map_err(map_sqlx_error)?;
492                        let wp_key = row.map(|(k,)| k).unwrap_or_default();
493                        (waitpoint_id.clone(), wp_key)
494                    }
495                    _ => {
496                        return Err(EngineError::Validation {
497                            kind: ValidationKind::InvalidInput,
498                            detail: "unsupported WaitpointBinding variant".into(),
499                        });
500                    }
501                };
502                let msg = format!("{}:{}", payload.execution_id, wp_id);
503                let token = hmac_sign(&secret, &kid, msg.as_bytes());
504                // RFC-020 §3.1.1 — populate 0011 columns on insert.
505                // `suspend_core` atomically lands the suspension (exec_core
506                // flips to `suspended` in the same txn as this INSERT), so
507                // from any observer's perspective the waitpoint is already
508                // activated at the moment it becomes visible — there is no
509                // separate pending→active transition on Postgres. Write
510                // `state = 'active'` + `activated_at_ms = now` directly.
511                // `required_signal_names` is derived per-waitpoint from the
512                // resume condition; an empty vec denotes the wildcard case
513                // per the `PendingWaitpointInfo::required_signal_names`
514                // contract.
515                let required_names =
516                    derive_required_signal_names(&args.resume_condition, &wp_key);
517                sqlx::query(
518                    "INSERT INTO ff_waitpoint_pending \
519                       (partition_key, waitpoint_id, execution_id, token_kid, token, \
520                        created_at_ms, expires_at_ms, waitpoint_key, \
521                        state, required_signal_names, activated_at_ms) \
522                     VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'active', $9, $6) \
523                     ON CONFLICT (partition_key, waitpoint_id) DO UPDATE SET \
524                       token_kid = EXCLUDED.token_kid, token = EXCLUDED.token, \
525                       waitpoint_key = EXCLUDED.waitpoint_key, \
526                       state = EXCLUDED.state, \
527                       required_signal_names = EXCLUDED.required_signal_names, \
528                       activated_at_ms = EXCLUDED.activated_at_ms",
529                )
530                .bind(part)
531                .bind(wp_uuid(&wp_id)?)
532                .bind(exec_uuid)
533                .bind(&kid)
534                .bind(&token)
535                .bind(now)
536                .bind(args.timeout_at.map(|t| t.0))
537                .bind(&wp_key)
538                .bind(&required_names)
539                .execute(&mut **tx)
540                .await
541                .map_err(map_sqlx_error)?;
542                signed.push((wp_id, wp_key, token));
543            }
544
545            // 5. Insert ff_suspension_current.
546            let condition_json =
547                serde_json::to_value(&args.resume_condition).map_err(|e| {
548                    EngineError::Validation {
549                        kind: ValidationKind::Corruption,
550                        detail: format!("resume_condition serialize: {e}"),
551                    }
552                })?;
553            sqlx::query(
554                "INSERT INTO ff_suspension_current \
555                   (partition_key, execution_id, suspension_id, suspended_at_ms, \
556                    timeout_at_ms, reason_code, condition, satisfied_set, member_map, \
557                    timeout_behavior) \
558                 VALUES ($1, $2, $3, $4, $5, $6, $7, '[]'::jsonb, '{}'::jsonb, $8) \
559                 ON CONFLICT (partition_key, execution_id) DO UPDATE SET \
560                   suspension_id = EXCLUDED.suspension_id, \
561                   suspended_at_ms = EXCLUDED.suspended_at_ms, \
562                   timeout_at_ms = EXCLUDED.timeout_at_ms, \
563                   reason_code = EXCLUDED.reason_code, \
564                   condition = EXCLUDED.condition, \
565                   satisfied_set = '[]'::jsonb, \
566                   member_map = '{}'::jsonb, \
567                   timeout_behavior = EXCLUDED.timeout_behavior",
568            )
569            .bind(part)
570            .bind(exec_uuid)
571            .bind(susp_uuid(&args.suspension_id)?)
572            .bind(now)
573            .bind(args.timeout_at.map(|t| t.0))
574            .bind(args.reason_code.as_wire_str())
575            .bind(&condition_json)
576            .bind(args.timeout_behavior.as_wire_str())
577            .execute(&mut **tx)
578            .await
579            .map_err(map_sqlx_error)?;
580
581            // 6. Transition exec_core to suspended.
582            sqlx::query(
583                "UPDATE ff_exec_core \
584                    SET lifecycle_phase = 'suspended', \
585                        ownership_state = 'released', \
586                        eligibility_state = 'not_applicable', \
587                        public_state = 'suspended', \
588                        attempt_state = 'attempt_interrupted' \
589                  WHERE partition_key = $1 AND execution_id = $2",
590            )
591            .bind(part)
592            .bind(exec_uuid)
593            .execute(&mut **tx)
594            .await
595            .map_err(map_sqlx_error)?;
596
597            // 7. Release + bump lease epoch on ff_attempt.
598            sqlx::query(
599                "UPDATE ff_attempt \
600                    SET worker_id = NULL, \
601                        worker_instance_id = NULL, \
602                        lease_expires_at_ms = NULL, \
603                        lease_epoch = lease_epoch + 1, \
604                        outcome = 'attempt_interrupted' \
605                  WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3",
606            )
607            .bind(part)
608            .bind(exec_uuid)
609            .bind(attempt_index_i)
610            .execute(&mut **tx)
611            .await
612            .map_err(map_sqlx_error)?;
613
614            // RFC-019 Stage B outbox: lease revoked (suspend).
615            lease_event::emit(
616                tx,
617                part,
618                exec_uuid,
619                None,
620                lease_event::EVENT_REVOKED,
621                now,
622            )
623            .await?;
624
625            // 8. Assemble outcome.
626            let (primary_id, primary_key, primary_token) = signed[0].clone();
627            let extras: Vec<AdditionalWaitpointBinding> = signed
628                .iter()
629                .skip(1)
630                .map(|(id, key, tok)| {
631                    AdditionalWaitpointBinding::new(
632                        id.clone(),
633                        key.clone(),
634                        WaitpointHmac::new(tok.clone()),
635                    )
636                })
637                .collect();
638            let details = SuspendOutcomeDetails::new(
639                args.suspension_id.clone(),
640                primary_id,
641                primary_key,
642                WaitpointHmac::new(primary_token),
643            )
644            .with_additional_waitpoints(extras);
645
646            let opaque = encode_opaque(BackendTag::Postgres, &payload);
647            let suspended_handle =
648                Handle::new(BackendTag::Postgres, HandleKind::Suspended, opaque);
649            let outcome = SuspendOutcome::Suspended {
650                details,
651                handle: suspended_handle,
652            };
653
654            // 9. Cache outcome if idempotency key present.
655            if let Some(key) = idem.as_deref() {
656                let cached = outcome_to_dedup_json(&outcome);
657                sqlx::query(
658                    "INSERT INTO ff_suspend_dedup \
659                       (partition_key, idempotency_key, outcome_json, created_at_ms) \
660                     VALUES ($1, $2, $3, $4) \
661                     ON CONFLICT DO NOTHING",
662                )
663                .bind(part)
664                .bind(key)
665                .bind(&cached)
666                .bind(now)
667                .execute(&mut **tx)
668                .await
669                .map_err(map_sqlx_error)?;
670            }
671
672            Ok(outcome)
673        })
674    })
675    .await
676}
677
678// ─── deliver_signal ──────────────────────────────────────────────────────
679
680pub(crate) async fn deliver_signal_impl(
681    pool: &PgPool,
682    _partition_config: &PartitionConfig,
683    args: DeliverSignalArgs,
684) -> Result<DeliverSignalResult, EngineError> {
685    let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
686    let wp_u = wp_uuid(&args.waitpoint_id)?;
687
688    run_serializable(pool, move |tx| {
689        let args = args.clone();
690        Box::pin(async move {
691            // 1. Look up pending waitpoint + stored token + kid.
692            let row: Option<(String, String, String, Uuid)> = sqlx::query_as(
693                "SELECT token_kid, token, waitpoint_key, execution_id \
694                   FROM ff_waitpoint_pending \
695                  WHERE partition_key = $1 AND waitpoint_id = $2 \
696                  FOR UPDATE",
697            )
698            .bind(part)
699            .bind(wp_u)
700            .fetch_optional(&mut **tx)
701            .await
702            .map_err(map_sqlx_error)?;
703            let (kid, stored_token, wp_key, stored_exec) = match row {
704                Some(r) => r,
705                None => return Err(EngineError::NotFound { entity: "waitpoint" }),
706            };
707            if stored_exec != exec_uuid {
708                return Err(EngineError::Validation {
709                    kind: ValidationKind::InvalidInput,
710                    detail: "waitpoint belongs to a different execution".into(),
711                });
712            }
713
714            // 2. HMAC verify. Secret comes from the keystore — allows
715            //    post-rotation grace verification when kid is inactive
716            //    but still stored.
717            let secret_row: Option<(Vec<u8>,)> = sqlx::query_as(
718                "SELECT secret FROM ff_waitpoint_hmac WHERE kid = $1",
719            )
720            .bind(&kid)
721            .fetch_optional(&mut **tx)
722            .await
723            .map_err(map_sqlx_error)?;
724            let (secret,) = secret_row.ok_or_else(|| EngineError::Validation {
725                kind: ValidationKind::InvalidInput,
726                detail: format!("kid {kid} missing from keystore"),
727            })?;
728            let presented = args.waitpoint_token.as_str();
729            let msg = format!("{}:{}", args.execution_id, args.waitpoint_id);
730            hmac_verify(&secret, &kid, msg.as_bytes(), presented).map_err(|e| {
731                EngineError::Validation {
732                    kind: ValidationKind::InvalidInput,
733                    detail: format!("waitpoint_token verify: {e}"),
734                }
735            })?;
736            if presented != stored_token {
737                return Err(EngineError::Validation {
738                    kind: ValidationKind::InvalidInput,
739                    detail: "waitpoint_token does not match minted token".into(),
740                });
741            }
742
743            // 3. Load suspension_current (lock the row so concurrent
744            //    signals serialize on this execution).
745            let susp_row: Option<(JsonValue, JsonValue)> = sqlx::query_as(
746                "SELECT condition, member_map FROM ff_suspension_current \
747                  WHERE partition_key = $1 AND execution_id = $2 \
748                  FOR UPDATE",
749            )
750            .bind(part)
751            .bind(exec_uuid)
752            .fetch_optional(&mut **tx)
753            .await
754            .map_err(map_sqlx_error)?;
755            let (condition_json, mut member_map) = match susp_row {
756                Some(r) => r,
757                None => return Err(EngineError::NotFound { entity: "suspension" }),
758            };
759
760            // 4. Append signal into member_map[wp_key].
761            let signal_blob = json!({
762                "signal_id": args.signal_id.to_string(),
763                "signal_name": args.signal_name,
764                "signal_category": args.signal_category,
765                "source_type": args.source_type,
766                "source_identity": args.source_identity,
767                "correlation_id": args.correlation_id.clone().unwrap_or_default(),
768                "accepted_at": args.now.0,
769                "payload_hex": args.payload.as_ref().map(hex::encode),
770            });
771            let map_obj = member_map.as_object_mut().ok_or_else(|| {
772                EngineError::Validation {
773                    kind: ValidationKind::Corruption,
774                    detail: "member_map not a JSON object".into(),
775                }
776            })?;
777            let entry = map_obj.entry(wp_key.clone()).or_insert_with(|| json!([]));
778            entry
779                .as_array_mut()
780                .ok_or_else(|| EngineError::Validation {
781                    kind: ValidationKind::Corruption,
782                    detail: "member_map[wp_key] not a JSON array".into(),
783                })?
784                .push(signal_blob);
785
786            // 5. Evaluate composite condition against the updated map.
787            let condition: ResumeCondition = serde_json::from_value(condition_json)
788                .map_err(|e| EngineError::Validation {
789                    kind: ValidationKind::Corruption,
790                    detail: format!("condition deserialize: {e}"),
791                })?;
792            let signals_by_wp: HashMap<String, Vec<ResumeSignal>> = map_obj
793                .iter()
794                .map(|(k, v)| {
795                    let sigs: Vec<ResumeSignal> = v
796                        .as_array()
797                        .map(|arr| arr.iter().filter_map(resume_signal_from_json).collect())
798                        .unwrap_or_default();
799                    (k.clone(), sigs)
800                })
801                .collect();
802            let borrowed: HashMap<&str, &[ResumeSignal]> = signals_by_wp
803                .iter()
804                .map(|(k, v)| (k.as_str(), v.as_slice()))
805                .collect();
806            let satisfied = evaluate(&condition, &borrowed);
807
808            // 6. Persist member_map (always — both append + satisfy
809            //    cases need the updated view for observe_signals).
810            sqlx::query(
811                "UPDATE ff_suspension_current SET member_map = $1 \
812                  WHERE partition_key = $2 AND execution_id = $3",
813            )
814            .bind(&member_map)
815            .bind(part)
816            .bind(exec_uuid)
817            .execute(&mut **tx)
818            .await
819            .map_err(map_sqlx_error)?;
820
821            let effect = if satisfied {
822                sqlx::query(
823                    "UPDATE ff_exec_core \
824                        SET public_state = 'resumable', \
825                            lifecycle_phase = 'runnable', \
826                            eligibility_state = 'eligible_now' \
827                      WHERE partition_key = $1 AND execution_id = $2",
828                )
829                .bind(part)
830                .bind(exec_uuid)
831                .execute(&mut **tx)
832                .await
833                .map_err(map_sqlx_error)?;
834
835                sqlx::query(
836                    "DELETE FROM ff_waitpoint_pending \
837                      WHERE partition_key = $1 AND execution_id = $2",
838                )
839                .bind(part)
840                .bind(exec_uuid)
841                .execute(&mut **tx)
842                .await
843                .map_err(map_sqlx_error)?;
844
845                sqlx::query(
846                    "INSERT INTO ff_completion_event \
847                       (partition_key, execution_id, outcome, occurred_at_ms) \
848                     VALUES ($1, $2, 'resumable', $3)",
849                )
850                .bind(part)
851                .bind(exec_uuid)
852                .bind(args.now.0)
853                .execute(&mut **tx)
854                .await
855                .map_err(map_sqlx_error)?;
856
857                "resume_condition_satisfied"
858            } else {
859                "appended_to_waitpoint"
860            };
861
862            // RFC-019 Stage B — `ff_signal_event` outbox. Same tx as
863            // the state writes above so the NOTIFY fires iff the
864            // delivery commits.
865            let wp_id_str = args.waitpoint_id.to_string();
866            signal_event::emit(
867                tx,
868                part,
869                exec_uuid,
870                &args.signal_id.to_string(),
871                Some(wp_id_str.as_str()),
872                Some(args.source_identity.as_str()),
873                args.now.0,
874            )
875            .await?;
876
877            Ok(DeliverSignalResult::Accepted {
878                signal_id: args.signal_id.clone(),
879                effect: effect.to_owned(),
880            })
881        })
882    })
883    .await
884}
885
886fn resume_signal_from_json(v: &JsonValue) -> Option<ResumeSignal> {
887    let signal_id = SignalId::parse(v["signal_id"].as_str()?).ok()?;
888    Some(ResumeSignal {
889        signal_id,
890        signal_name: v["signal_name"].as_str()?.to_owned(),
891        signal_category: v["signal_category"].as_str().unwrap_or("").to_owned(),
892        source_type: v["source_type"].as_str().unwrap_or("").to_owned(),
893        source_identity: v["source_identity"].as_str().unwrap_or("").to_owned(),
894        correlation_id: v["correlation_id"].as_str().unwrap_or("").to_owned(),
895        accepted_at: TimestampMs::from_millis(v["accepted_at"].as_i64().unwrap_or(0)),
896        payload: v["payload_hex"].as_str().and_then(|h| hex::decode(h).ok()),
897    })
898}
899
900// ─── claim_resumed_execution ─────────────────────────────────────────────
901
902pub(crate) async fn claim_resumed_execution_impl(
903    pool: &PgPool,
904    _partition_config: &PartitionConfig,
905    args: ClaimResumedExecutionArgs,
906) -> Result<ClaimResumedExecutionResult, EngineError> {
907    let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
908    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
909
910    let row: Option<(String, i32)> = sqlx::query_as(
911        "SELECT public_state, attempt_index FROM ff_exec_core \
912          WHERE partition_key = $1 AND execution_id = $2 \
913          FOR UPDATE",
914    )
915    .bind(part)
916    .bind(exec_uuid)
917    .fetch_optional(&mut *tx)
918    .await
919    .map_err(map_sqlx_error)?;
920    let (public_state, attempt_index_i) = match row {
921        Some(r) => r,
922        None => {
923            tx.rollback().await.ok();
924            return Err(EngineError::NotFound { entity: "execution" });
925        }
926    };
927    if public_state != "resumable" {
928        tx.rollback().await.ok();
929        return Err(EngineError::Contention(
930            ContentionKind::NotAResumedExecution,
931        ));
932    }
933
934    let now = now_ms();
935    let lease_ttl = i64::try_from(args.lease_ttl_ms).unwrap_or(0);
936    let new_expires = now.saturating_add(lease_ttl);
937
938    sqlx::query(
939        "UPDATE ff_attempt \
940            SET worker_id = $1, worker_instance_id = $2, \
941                lease_epoch = lease_epoch + 1, \
942                lease_expires_at_ms = $3, started_at_ms = $4, outcome = NULL \
943          WHERE partition_key = $5 AND execution_id = $6 AND attempt_index = $7",
944    )
945    .bind(args.worker_id.as_str())
946    .bind(args.worker_instance_id.as_str())
947    .bind(new_expires)
948    .bind(now)
949    .bind(part)
950    .bind(exec_uuid)
951    .bind(attempt_index_i)
952    .execute(&mut *tx)
953    .await
954    .map_err(map_sqlx_error)?;
955
956    sqlx::query(
957        "UPDATE ff_exec_core \
958            SET lifecycle_phase = 'active', ownership_state = 'leased', \
959                eligibility_state = 'not_applicable', \
960                public_state = 'running', attempt_state = 'running_attempt' \
961          WHERE partition_key = $1 AND execution_id = $2",
962    )
963    .bind(part)
964    .bind(exec_uuid)
965    .execute(&mut *tx)
966    .await
967    .map_err(map_sqlx_error)?;
968
969    let epoch_row: (i64,) = sqlx::query_as(
970        "SELECT lease_epoch FROM ff_attempt \
971          WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3",
972    )
973    .bind(part)
974    .bind(exec_uuid)
975    .bind(attempt_index_i)
976    .fetch_one(&mut *tx)
977    .await
978    .map_err(map_sqlx_error)?;
979
980    // RFC-019 Stage B outbox: lease acquired (claim_resumed_execution).
981    let lease_id_str = args.lease_id.to_string();
982    lease_event::emit(
983        &mut tx,
984        part,
985        exec_uuid,
986        Some(&lease_id_str),
987        lease_event::EVENT_ACQUIRED,
988        now,
989    )
990    .await?;
991
992    tx.commit().await.map_err(map_sqlx_error)?;
993
994    let attempt_index = AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
995    let lease_epoch = LeaseEpoch(u64::try_from(epoch_row.0).unwrap_or(0));
996    let attempt_id = AttemptId::new();
997
998    Ok(ClaimResumedExecutionResult::Claimed(
999        ClaimedResumedExecution {
1000            execution_id: args.execution_id.clone(),
1001            lease_id: args.lease_id.clone(),
1002            lease_epoch,
1003            attempt_index,
1004            attempt_id,
1005            lease_expires_at: TimestampMs::from_millis(new_expires),
1006        },
1007    ))
1008}
1009
1010// ─── observe_signals ─────────────────────────────────────────────────────
1011
1012pub(crate) async fn observe_signals_impl(
1013    pool: &PgPool,
1014    handle: &Handle,
1015) -> Result<Vec<ResumeSignal>, EngineError> {
1016    let payload = decode_handle(handle)?;
1017    let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
1018
1019    let row: Option<(JsonValue,)> = sqlx::query_as(
1020        "SELECT member_map FROM ff_suspension_current \
1021          WHERE partition_key = $1 AND execution_id = $2",
1022    )
1023    .bind(part)
1024    .bind(exec_uuid)
1025    .fetch_optional(pool)
1026    .await
1027    .map_err(map_sqlx_error)?;
1028
1029    let Some((member_map,)) = row else {
1030        return Ok(Vec::new());
1031    };
1032    let mut out: Vec<ResumeSignal> = Vec::new();
1033    if let Some(map) = member_map.as_object() {
1034        for (_wp_key, arr) in map {
1035            if let Some(sigs) = arr.as_array() {
1036                for v in sigs {
1037                    if let Some(s) = resume_signal_from_json(v) {
1038                        out.push(s);
1039                    }
1040                }
1041            }
1042        }
1043    }
1044    Ok(out)
1045}
1046
1047// ─── list_pending_waitpoints ─────────────────────────────────────────────
1048
1049/// RFC-020 §4.5 — read-only projection of pending-or-active waitpoints
1050/// for a given execution. SQL parity with Valkey's SSCAN + 2× HMGET
1051/// shape: single-table scan of `ff_waitpoint_pending` with cursor
1052/// `(waitpoint_id > $after ORDER BY waitpoint_id LIMIT $limit+1)`,
1053/// surfaces the `PendingWaitpointInfo` 10-field contract.
1054/// `token_fingerprint` is computed from the stored `<kid>:<hex>` token
1055/// — the raw token never crosses the trait boundary per RFC-017 Stage
1056/// D1 / §8.
1057///
1058/// Pre-read existence check on `ff_exec_core` mirrors Valkey's
1059/// `EXISTS exec_core` so a non-existent execution surfaces `NotFound`
1060/// rather than an empty page.
1061///
1062/// Rows with `state NOT IN ('pending', 'active')` (e.g. `'closed'`)
1063/// are filtered server-side to match Valkey's client-side keep-filter.
1064pub(crate) async fn list_pending_waitpoints_impl(
1065    pool: &PgPool,
1066    args: ListPendingWaitpointsArgs,
1067) -> Result<ListPendingWaitpointsResult, EngineError> {
1068    const DEFAULT_LIMIT: u32 = 100;
1069    const MAX_LIMIT: u32 = 1000;
1070
1071    let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
1072    let limit = args.limit.unwrap_or(DEFAULT_LIMIT).clamp(1, MAX_LIMIT) as i64;
1073    let after_uuid = match args.after.as_ref() {
1074        Some(wp) => Some(wp_uuid(wp)?),
1075        None => None,
1076    };
1077
1078    // Existence probe — a non-existent execution is `NotFound`, not an
1079    // empty page. Matches Valkey's `EXISTS exec_core` pre-check.
1080    let exists: Option<(i16,)> = sqlx::query_as(
1081        "SELECT 1::smallint FROM ff_exec_core \
1082          WHERE partition_key = $1 AND execution_id = $2",
1083    )
1084    .bind(part)
1085    .bind(exec_uuid)
1086    .fetch_optional(pool)
1087    .await
1088    .map_err(map_sqlx_error)?;
1089    if exists.is_none() {
1090        return Err(EngineError::NotFound { entity: "execution" });
1091    }
1092
1093    // Page: request `limit + 1` so we can detect "more to come" without
1094    // a second round-trip. Row tuple order matches the SELECT below:
1095    // (waitpoint_id, waitpoint_key, state, required_signal_names,
1096    //  created_at_ms, activated_at_ms, expires_at_ms, token_kid, token).
1097    // Raw token is fingerprinted client-side — never returned across
1098    // the trait boundary per RFC-017 Stage D1 / §8.
1099    type Row = (
1100        Uuid,
1101        String,
1102        String,
1103        Vec<String>,
1104        i64,
1105        Option<i64>,
1106        Option<i64>,
1107        String,
1108        String,
1109    );
1110    let rows: Vec<Row> = sqlx::query_as(
1111        "SELECT waitpoint_id, waitpoint_key, state, required_signal_names, \
1112                created_at_ms, activated_at_ms, expires_at_ms, token_kid, token \
1113           FROM ff_waitpoint_pending \
1114          WHERE partition_key = $1 \
1115            AND execution_id = $2 \
1116            AND state IN ('pending', 'active') \
1117            AND ($3::uuid IS NULL OR waitpoint_id > $3) \
1118          ORDER BY waitpoint_id \
1119          LIMIT $4",
1120    )
1121    .bind(part)
1122    .bind(exec_uuid)
1123    .bind(after_uuid)
1124    .bind(limit + 1)
1125    .fetch_all(pool)
1126    .await
1127    .map_err(map_sqlx_error)?;
1128
1129    let has_more = rows.len() as i64 > limit;
1130    let take_n = if has_more { limit as usize } else { rows.len() };
1131
1132    let mut entries: Vec<PendingWaitpointInfo> = Vec::with_capacity(take_n);
1133    for (wp_uid, wp_key, state, req_names, created_ms, activated_ms, expires_ms, _kid, token)
1134        in rows.into_iter().take(take_n)
1135    {
1136        let wp_id = WaitpointId::from_uuid(wp_uid);
1137        // Parse stored `<kid>:<hex>` into audit-safe pair. The `token_kid`
1138        // column is redundant with the parsed kid — we prefer the parsed
1139        // one so the surface stays byte-identical to Valkey's.
1140        let (token_kid, token_fingerprint) = parse_waitpoint_token_kid_fp(&token);
1141        let mut info = PendingWaitpointInfo::new(
1142            wp_id,
1143            wp_key,
1144            state,
1145            TimestampMs(created_ms),
1146            args.execution_id.clone(),
1147            token_kid,
1148            token_fingerprint,
1149        );
1150        if !req_names.is_empty() {
1151            info = info.with_required_signal_names(req_names);
1152        }
1153        if let Some(ms) = activated_ms {
1154            info = info.with_activated_at(TimestampMs(ms));
1155        }
1156        if let Some(ms) = expires_ms {
1157            info = info.with_expires_at(TimestampMs(ms));
1158        }
1159        entries.push(info);
1160    }
1161
1162    let next_cursor = if has_more {
1163        entries.last().map(|e| e.waitpoint_id.clone())
1164    } else {
1165        None
1166    };
1167    let mut result = ListPendingWaitpointsResult::new(entries);
1168    if let Some(cursor) = next_cursor {
1169        result = result.with_next_cursor(cursor);
1170    }
1171    Ok(result)
1172}