Skip to main content

ff_backend_postgres/
signal.rs

1//! Signal delivery + HMAC secret rotation for the Postgres backend.
2//!
3//! **Wave 4 Agent D (RFC-v0.7 v0.7 migration-master).**
4//!
5//! **Scope note.** The Wave-4 brief asks for an 8-method
6//! suspend+signal family. The actual `EngineBackend` trait today
7//! exposes 6 of those (`suspend`, `observe_signals`,
8//! `list_suspended`, `claim_resumed_execution`, `deliver_signal`,
9//! `rotate_waitpoint_hmac_secret_all`); `try_suspend` and the
10//! single-partition `rotate_waitpoint_hmac_secret` are not on the
11//! trait surface. Extending the trait touches every parallel Wave-4
12//! agent's compile graph and needs owner adjudication, so this
13//! tranche ships the in-trait method that is fully standalone
14//! (`rotate_waitpoint_hmac_secret_all`) plus the primitives the
15//! other 5 methods will consume in a follow-up:
16//!
17//! * [`hmac_sign`] / [`hmac_verify`] — first Rust-side HMAC code in
18//!   the workspace. The Valkey backend signs inside Lua; this module
19//!   owns server-side signing on the Postgres path.
20//! * [`SERIALIZABLE_RETRY_BUDGET`] — the Q11 retry cap used by the
21//!   suspend / deliver_signal SERIALIZABLE sites when those land.
22//! * [`current_active_kid`] / [`fetch_kid`] — keystore helpers.
23//! * [`rotate_waitpoint_hmac_secret_all_impl`] — Q4 single-global-
24//!   row write + active-flag flip. Wired into `EngineBackend` in
25//!   [`crate::lib.rs`].
26//!
27//! HMAC sign/verify primitives live here (not in `ff-core`) so the
28//! Cargo.toml delta stays scoped to this crate while parallel Wave-4
29//! agents are churning ff-core. A follow-up can hoist the primitive
30//! into `ff_core::waitpoint_hmac` once both backends converge.
31
32use ff_core::engine_error::EngineError;
33use hmac::{Hmac, Mac};
34use sha2::Sha256;
35use sqlx::PgPool;
36
37use crate::error::map_sqlx_error;
38
39/// Q11 retry budget for SERIALIZABLE transactions. On retry exhaustion
40/// the suspend / deliver_signal call sites are expected to return
41/// `EngineError::Contention(LeaseConflict)` so the reconciler (or
42/// calling worker) reconstructs intent rather than spinning in-process.
43pub const SERIALIZABLE_RETRY_BUDGET: usize = 3;
44
45/// True iff `err` is a retryable serialization/deadlock fault.
46/// Exposed for callers that run their own SERIALIZABLE-tx retry loop
47/// and need to tell retryable from fatal on sqlx errors.
48pub fn is_retryable_serialization(err: &sqlx::Error) -> bool {
49    if let Some(db) = err.as_database_error()
50        && let Some(code) = db.code()
51    {
52        matches!(code.as_ref(), "40001" | "40P01")
53    } else {
54        false
55    }
56}
57
58/// HMAC-SHA256 signature over `kid || ":" || message`. Returns a
59/// `kid:hex` token. Matches the conceptual shape of the Valkey Lua
60/// signer (`kid:40hex`); SHA256 rather than SHA1 so we use the
61/// stdlib-friendly primitive. The two backends never cross-verify
62/// tokens.
63pub fn hmac_sign(secret: &[u8], kid: &str, message: &[u8]) -> String {
64    let mut mac = <Hmac<Sha256> as Mac>::new_from_slice(secret)
65        .expect("HMAC-SHA256 accepts any key length");
66    mac.update(kid.as_bytes());
67    mac.update(b":");
68    mac.update(message);
69    let out = mac.finalize().into_bytes();
70    format!("{kid}:{}", hex::encode(out))
71}
72
73/// Verify a `kid:hex` token. Returns `Ok(())` iff the digest matches
74/// `secret` over `message`. Constant-time via
75/// [`hmac::Mac::verify_slice`].
76pub fn hmac_verify(
77    secret: &[u8],
78    kid: &str,
79    message: &[u8],
80    token: &str,
81) -> Result<(), HmacVerifyError> {
82    let (tok_kid, tok_hex) =
83        token.split_once(':').ok_or(HmacVerifyError::Malformed)?;
84    if tok_kid != kid {
85        return Err(HmacVerifyError::WrongKid {
86            expected: kid.to_owned(),
87            actual: tok_kid.to_owned(),
88        });
89    }
90    let expected = hex::decode(tok_hex).map_err(|_| HmacVerifyError::Malformed)?;
91    let mut mac = <Hmac<Sha256> as Mac>::new_from_slice(secret)
92        .map_err(|_| HmacVerifyError::Malformed)?;
93    mac.update(kid.as_bytes());
94    mac.update(b":");
95    mac.update(message);
96    mac.verify_slice(&expected)
97        .map_err(|_| HmacVerifyError::SignatureMismatch)
98}
99
100/// Errors from [`hmac_verify`]. Callers map these onto
101/// `EngineError::Validation(InvalidToken)` at the trait boundary.
102#[derive(Debug, thiserror::Error)]
103pub enum HmacVerifyError {
104    #[error("token malformed; expected kid:hex shape")]
105    Malformed,
106    #[error("token kid mismatch; expected {expected}, got {actual}")]
107    WrongKid { expected: String, actual: String },
108    #[error("HMAC signature mismatch")]
109    SignatureMismatch,
110}
111
112/// Resolve the currently-active HMAC secret (kid + bytes) from
113/// `ff_waitpoint_hmac`. Returns `Ok(None)` when the keystore is empty
114/// (bootstrap race); callers treat that as a state error.
115pub async fn current_active_kid(
116    pool: &PgPool,
117) -> Result<Option<(String, Vec<u8>)>, EngineError> {
118    let row: Option<(String, Vec<u8>)> = sqlx::query_as(
119        "SELECT kid, secret FROM ff_waitpoint_hmac \
120         WHERE active = TRUE \
121         ORDER BY rotated_at_ms DESC LIMIT 1",
122    )
123    .fetch_optional(pool)
124    .await
125    .map_err(map_sqlx_error)?;
126    Ok(row)
127}
128
129/// Fetch a specific kid's secret. Returns `Ok(None)` when the kid is
130/// unknown. Includes inactive kids (inside the rotation grace window).
131pub async fn fetch_kid(pool: &PgPool, kid: &str) -> Result<Option<Vec<u8>>, EngineError> {
132    let row: Option<(Vec<u8>,)> = sqlx::query_as(
133        "SELECT secret FROM ff_waitpoint_hmac WHERE kid = $1",
134    )
135    .bind(kid)
136    .fetch_optional(pool)
137    .await
138    .map_err(map_sqlx_error)?;
139    Ok(row.map(|(s,)| s))
140}
141
142/// Implementation of `EngineBackend::rotate_waitpoint_hmac_secret_all`.
143///
144/// Q4 (v0.7 migration-master, adjudicated 2026-04-24): the Postgres
145/// backend stores one global row per kid (no `partition_id` column).
146/// The "all" fan-out collapses to
147///
148/// ```sql
149/// UPDATE ff_waitpoint_hmac SET active = FALSE WHERE active = TRUE;
150/// INSERT INTO ff_waitpoint_hmac(kid, secret, rotated_at_ms, active)
151///     VALUES ($1, $2, $3, TRUE);
152/// ```
153///
154/// Result shape: `entries.len() == 1`, `partition = 0`. This is the
155/// wire parity the adjudication pinned — consumers that walk
156/// `entries` work against both Valkey and Postgres backends.
157///
158/// **Replay contract.** If `new_kid` is already installed with the
159/// same secret bytes, return `Noop { kid }` — operator retries are
160/// safe. Same kid with *different* bytes is a `RotationConflict`:
161/// callers must pick a fresh kid instead of silently overwriting.
162pub async fn rotate_waitpoint_hmac_secret_all_impl(
163    pool: &PgPool,
164    args: ff_core::contracts::RotateWaitpointHmacSecretAllArgs,
165    now_ms: i64,
166) -> Result<ff_core::contracts::RotateWaitpointHmacSecretAllResult, EngineError> {
167    use ff_core::contracts::{
168        RotateWaitpointHmacSecretAllEntry, RotateWaitpointHmacSecretAllResult,
169        RotateWaitpointHmacSecretOutcome,
170    };
171
172    // Decode the hex secret up front so a malformed input fails
173    // Validation rather than smuggling through as Transport.
174    let secret_bytes = hex::decode(&args.new_secret_hex).map_err(|_| {
175        EngineError::Validation {
176            kind: ff_core::engine_error::ValidationKind::InvalidInput,
177            detail: "new_secret_hex is not valid hex".into(),
178        }
179    })?;
180
181    let outcome_res: Result<RotateWaitpointHmacSecretOutcome, EngineError> = async {
182        let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
183
184        // Exact replay check: same kid + same bytes ⇒ Noop.
185        let existing: Option<(Vec<u8>,)> = sqlx::query_as(
186            "SELECT secret FROM ff_waitpoint_hmac WHERE kid = $1",
187        )
188        .bind(&args.new_kid)
189        .fetch_optional(&mut *tx)
190        .await
191        .map_err(map_sqlx_error)?;
192        if let Some((prior,)) = existing {
193            if prior == secret_bytes {
194                tx.commit().await.map_err(map_sqlx_error)?;
195                return Ok(RotateWaitpointHmacSecretOutcome::Noop {
196                    kid: args.new_kid.clone(),
197                });
198            }
199            tx.rollback().await.ok();
200            return Err(EngineError::Conflict(
201                ff_core::engine_error::ConflictKind::RotationConflict(format!(
202                    "kid {} already installed with a different secret",
203                    args.new_kid
204                )),
205            ));
206        }
207
208        // Capture the prior current-kid for the outcome envelope.
209        let prior_active: Option<(String,)> = sqlx::query_as(
210            "SELECT kid FROM ff_waitpoint_hmac \
211             WHERE active = TRUE \
212             ORDER BY rotated_at_ms DESC LIMIT 1",
213        )
214        .fetch_optional(&mut *tx)
215        .await
216        .map_err(map_sqlx_error)?;
217
218        // `grace_ms` is not stored per-row in the Wave-3 schema; the
219        // grace window is enforced by retaining prior `active=false`
220        // rows in the table so [`fetch_kid`] still returns their
221        // secret during verification within the window.
222        let _ = args.grace_ms;
223
224        sqlx::query("UPDATE ff_waitpoint_hmac SET active = FALSE WHERE active = TRUE")
225            .execute(&mut *tx)
226            .await
227            .map_err(map_sqlx_error)?;
228
229        sqlx::query(
230            "INSERT INTO ff_waitpoint_hmac (kid, secret, rotated_at_ms, active) \
231             VALUES ($1, $2, $3, TRUE)",
232        )
233        .bind(&args.new_kid)
234        .bind(&secret_bytes)
235        .bind(now_ms)
236        .execute(&mut *tx)
237        .await
238        .map_err(map_sqlx_error)?;
239
240        tx.commit().await.map_err(map_sqlx_error)?;
241
242        Ok(RotateWaitpointHmacSecretOutcome::Rotated {
243            previous_kid: prior_active.map(|(k,)| k),
244            new_kid: args.new_kid.clone(),
245            gc_count: 0,
246        })
247    }
248    .await;
249
250    Ok(RotateWaitpointHmacSecretAllResult::new(vec![
251        RotateWaitpointHmacSecretAllEntry::new(0, outcome_res),
252    ]))
253}
254
255/// Implementation of `EngineBackend::seed_waitpoint_hmac_secret` (issue #280).
256///
257/// Semantics against the global `ff_waitpoint_hmac` table:
258///
259/// * No active row exists and no row for the supplied kid → INSERT
260///   the new kid as the active signing kid. Returns
261///   `SeedOutcome::Seeded { kid }`.
262/// * Row for the supplied kid exists (regardless of `active`) →
263///   `SeedOutcome::AlreadySeeded { kid, same_secret }` where
264///   `same_secret` reports whether the stored bytes match.
265/// * Different kid is currently `active=TRUE` → `Validation(InvalidInput)`;
266///   operators must rotate rather than re-seed under a conflicting
267///   kid. Mirrors the Valkey backend's behaviour when the per-partition
268///   `current_kid` differs from the supplied one.
269pub async fn seed_waitpoint_hmac_secret_impl(
270    pool: &PgPool,
271    args: ff_core::contracts::SeedWaitpointHmacSecretArgs,
272    now_ms: i64,
273) -> Result<ff_core::contracts::SeedOutcome, EngineError> {
274    use ff_core::contracts::SeedOutcome;
275
276    if args.secret_hex.len() != 64 || !args.secret_hex.chars().all(|c| c.is_ascii_hexdigit()) {
277        return Err(EngineError::Validation {
278            kind: ff_core::engine_error::ValidationKind::InvalidInput,
279            detail: "secret_hex must be 64 hex characters (256-bit secret)".into(),
280        });
281    }
282    if args.kid.is_empty() {
283        return Err(EngineError::Validation {
284            kind: ff_core::engine_error::ValidationKind::InvalidInput,
285            detail: "kid must be non-empty".into(),
286        });
287    }
288    let secret_bytes = hex::decode(&args.secret_hex).map_err(|_| EngineError::Validation {
289        kind: ff_core::engine_error::ValidationKind::InvalidInput,
290        detail: "secret_hex is not valid hex".into(),
291    })?;
292
293    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
294
295    let existing: Option<(Vec<u8>,)> =
296        sqlx::query_as("SELECT secret FROM ff_waitpoint_hmac WHERE kid = $1")
297            .bind(&args.kid)
298            .fetch_optional(&mut *tx)
299            .await
300            .map_err(map_sqlx_error)?;
301    if let Some((prior,)) = existing {
302        tx.commit().await.map_err(map_sqlx_error)?;
303        return Ok(SeedOutcome::AlreadySeeded {
304            kid: args.kid,
305            same_secret: prior == secret_bytes,
306        });
307    }
308
309    let active: Option<(String,)> = sqlx::query_as(
310        "SELECT kid FROM ff_waitpoint_hmac WHERE active = TRUE \
311         ORDER BY rotated_at_ms DESC LIMIT 1",
312    )
313    .fetch_optional(&mut *tx)
314    .await
315    .map_err(map_sqlx_error)?;
316    if let Some((active_kid,)) = active {
317        tx.rollback().await.ok();
318        return Err(EngineError::Validation {
319            kind: ff_core::engine_error::ValidationKind::InvalidInput,
320            detail: format!(
321                "seed_waitpoint_hmac_secret: a different kid {active_kid:?} is already active; \
322                 use rotate_waitpoint_hmac_secret_all to change kid"
323            ),
324        });
325    }
326
327    sqlx::query(
328        "INSERT INTO ff_waitpoint_hmac (kid, secret, rotated_at_ms, active) \
329         VALUES ($1, $2, $3, TRUE)",
330    )
331    .bind(&args.kid)
332    .bind(&secret_bytes)
333    .bind(now_ms)
334    .execute(&mut *tx)
335    .await
336    .map_err(map_sqlx_error)?;
337
338    tx.commit().await.map_err(map_sqlx_error)?;
339    Ok(SeedOutcome::Seeded { kid: args.kid })
340}
341
342#[cfg(test)]
343mod hmac_tests {
344    use super::*;
345
346    #[test]
347    fn sign_then_verify_round_trip() {
348        let secret = b"super-secret-key";
349        let tok = hmac_sign(secret, "kid1", b"exec-id:wp-id");
350        assert!(tok.starts_with("kid1:"));
351        hmac_verify(secret, "kid1", b"exec-id:wp-id", &tok).expect("verify ok");
352    }
353
354    #[test]
355    fn verify_rejects_tampered_message() {
356        let secret = b"s";
357        let tok = hmac_sign(secret, "k", b"msg");
358        let err = hmac_verify(secret, "k", b"tampered", &tok).unwrap_err();
359        assert!(matches!(err, HmacVerifyError::SignatureMismatch));
360    }
361
362    #[test]
363    fn verify_rejects_wrong_kid() {
364        let secret = b"s";
365        let tok = hmac_sign(secret, "k1", b"msg");
366        let err = hmac_verify(secret, "k2", b"msg", &tok).unwrap_err();
367        assert!(matches!(err, HmacVerifyError::WrongKid { .. }));
368    }
369
370    #[test]
371    fn verify_rejects_malformed() {
372        assert!(matches!(
373            hmac_verify(b"s", "k", b"msg", "no-colon-token"),
374            Err(HmacVerifyError::Malformed)
375        ));
376        assert!(matches!(
377            hmac_verify(b"s", "k", b"msg", "k:not-hex-zzzz"),
378            Err(HmacVerifyError::Malformed)
379        ));
380    }
381}