Skip to main content

ff_backend_postgres/
signal.rs

1//! Signal delivery + HMAC secret rotation for the Postgres backend.
2//!
3//! **Wave 4 Agent D (RFC-v0.7 v0.7 migration-master).**
4//!
5//! **Scope note.** The Wave-4 brief asks for an 8-method
6//! suspend+signal family. The actual `EngineBackend` trait today
7//! exposes 6 of those (`suspend`, `observe_signals`,
8//! `list_suspended`, `claim_resumed_execution`, `deliver_signal`,
9//! `rotate_waitpoint_hmac_secret_all`); `try_suspend` and the
10//! single-partition `rotate_waitpoint_hmac_secret` are not on the
11//! trait surface. Extending the trait touches every parallel Wave-4
12//! agent's compile graph and needs owner adjudication, so this
13//! tranche ships the in-trait method that is fully standalone
14//! (`rotate_waitpoint_hmac_secret_all`) plus the primitives the
15//! other 5 methods will consume in a follow-up:
16//!
17//! * [`hmac_sign`] / [`hmac_verify`] — first Rust-side HMAC code in
18//!   the workspace. The Valkey backend signs inside Lua; this module
19//!   owns server-side signing on the Postgres path.
20//! * [`SERIALIZABLE_RETRY_BUDGET`] — the Q11 retry cap used by the
21//!   suspend / deliver_signal SERIALIZABLE sites when those land.
22//! * [`current_active_kid`] / [`fetch_kid`] — keystore helpers.
23//! * [`rotate_waitpoint_hmac_secret_all_impl`] — Q4 single-global-
24//!   row write + active-flag flip. Wired into `EngineBackend` in
25//!   [`crate::lib.rs`].
26//!
27//! HMAC sign/verify primitives hoisted to `ff_core::crypto::hmac` in
28//! RFC-023 Phase 2b.2.1 so the Postgres + SQLite backends share one
29//! Rust implementation. The Valkey backend still signs inside Lua, so
30//! no third consumer is pending. This module re-exports for backward
31//! compatibility of internal call sites.
32
33use ff_core::engine_error::EngineError;
34use sqlx::PgPool;
35
36use crate::error::map_sqlx_error;
37
38pub use ff_core::crypto::hmac::{hmac_sign, hmac_verify, HmacVerifyError};
39
40/// Q11 retry budget for SERIALIZABLE transactions. On retry exhaustion
41/// the suspend / deliver_signal call sites are expected to return
42/// `EngineError::Contention(LeaseConflict)` so the reconciler (or
43/// calling worker) reconstructs intent rather than spinning in-process.
44pub const SERIALIZABLE_RETRY_BUDGET: usize = 3;
45
46/// True iff `err` is a retryable serialization/deadlock fault.
47/// Exposed for callers that run their own SERIALIZABLE-tx retry loop
48/// and need to tell retryable from fatal on sqlx errors.
49pub fn is_retryable_serialization(err: &sqlx::Error) -> bool {
50    if let Some(db) = err.as_database_error()
51        && let Some(code) = db.code()
52    {
53        matches!(code.as_ref(), "40001" | "40P01")
54    } else {
55        false
56    }
57}
58
59/// Resolve the currently-active HMAC secret (kid + bytes) from
60/// `ff_waitpoint_hmac`. Returns `Ok(None)` when the keystore is empty
61/// (bootstrap race); callers treat that as a state error.
62pub async fn current_active_kid(
63    pool: &PgPool,
64) -> Result<Option<(String, Vec<u8>)>, EngineError> {
65    let row: Option<(String, Vec<u8>)> = sqlx::query_as(
66        "SELECT kid, secret FROM ff_waitpoint_hmac \
67         WHERE active = TRUE \
68         ORDER BY rotated_at_ms DESC LIMIT 1",
69    )
70    .fetch_optional(pool)
71    .await
72    .map_err(map_sqlx_error)?;
73    Ok(row)
74}
75
76/// Fetch a specific kid's secret. Returns `Ok(None)` when the kid is
77/// unknown. Includes inactive kids (inside the rotation grace window).
78pub async fn fetch_kid(pool: &PgPool, kid: &str) -> Result<Option<Vec<u8>>, EngineError> {
79    let row: Option<(Vec<u8>,)> = sqlx::query_as(
80        "SELECT secret FROM ff_waitpoint_hmac WHERE kid = $1",
81    )
82    .bind(kid)
83    .fetch_optional(pool)
84    .await
85    .map_err(map_sqlx_error)?;
86    Ok(row.map(|(s,)| s))
87}
88
89/// Implementation of `EngineBackend::rotate_waitpoint_hmac_secret_all`.
90///
91/// Q4 (v0.7 migration-master, adjudicated 2026-04-24): the Postgres
92/// backend stores one global row per kid (no `partition_id` column).
93/// The "all" fan-out collapses to
94///
95/// ```sql
96/// UPDATE ff_waitpoint_hmac SET active = FALSE WHERE active = TRUE;
97/// INSERT INTO ff_waitpoint_hmac(kid, secret, rotated_at_ms, active)
98///     VALUES ($1, $2, $3, TRUE);
99/// ```
100///
101/// Result shape: `entries.len() == 1`, `partition = 0`. This is the
102/// wire parity the adjudication pinned — consumers that walk
103/// `entries` work against both Valkey and Postgres backends.
104///
105/// **Replay contract.** If `new_kid` is already installed with the
106/// same secret bytes, return `Noop { kid }` — operator retries are
107/// safe. Same kid with *different* bytes is a `RotationConflict`:
108/// callers must pick a fresh kid instead of silently overwriting.
109pub async fn rotate_waitpoint_hmac_secret_all_impl(
110    pool: &PgPool,
111    args: ff_core::contracts::RotateWaitpointHmacSecretAllArgs,
112    now_ms: i64,
113) -> Result<ff_core::contracts::RotateWaitpointHmacSecretAllResult, EngineError> {
114    use ff_core::contracts::{
115        RotateWaitpointHmacSecretAllEntry, RotateWaitpointHmacSecretAllResult,
116        RotateWaitpointHmacSecretOutcome,
117    };
118
119    // Decode the hex secret up front so a malformed input fails
120    // Validation rather than smuggling through as Transport.
121    let secret_bytes = hex::decode(&args.new_secret_hex).map_err(|_| {
122        EngineError::Validation {
123            kind: ff_core::engine_error::ValidationKind::InvalidInput,
124            detail: "new_secret_hex is not valid hex".into(),
125        }
126    })?;
127
128    let outcome_res: Result<RotateWaitpointHmacSecretOutcome, EngineError> = async {
129        let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
130
131        // Exact replay check: same kid + same bytes ⇒ Noop.
132        let existing: Option<(Vec<u8>,)> = sqlx::query_as(
133            "SELECT secret FROM ff_waitpoint_hmac WHERE kid = $1",
134        )
135        .bind(&args.new_kid)
136        .fetch_optional(&mut *tx)
137        .await
138        .map_err(map_sqlx_error)?;
139        if let Some((prior,)) = existing {
140            if prior == secret_bytes {
141                tx.commit().await.map_err(map_sqlx_error)?;
142                return Ok(RotateWaitpointHmacSecretOutcome::Noop {
143                    kid: args.new_kid.clone(),
144                });
145            }
146            tx.rollback().await.ok();
147            return Err(EngineError::Conflict(
148                ff_core::engine_error::ConflictKind::RotationConflict(format!(
149                    "kid {} already installed with a different secret",
150                    args.new_kid
151                )),
152            ));
153        }
154
155        // Capture the prior current-kid for the outcome envelope.
156        let prior_active: Option<(String,)> = sqlx::query_as(
157            "SELECT kid FROM ff_waitpoint_hmac \
158             WHERE active = TRUE \
159             ORDER BY rotated_at_ms DESC LIMIT 1",
160        )
161        .fetch_optional(&mut *tx)
162        .await
163        .map_err(map_sqlx_error)?;
164
165        // `grace_ms` is not stored per-row in the Wave-3 schema; the
166        // grace window is enforced by retaining prior `active=false`
167        // rows in the table so [`fetch_kid`] still returns their
168        // secret during verification within the window.
169        let _ = args.grace_ms;
170
171        sqlx::query("UPDATE ff_waitpoint_hmac SET active = FALSE WHERE active = TRUE")
172            .execute(&mut *tx)
173            .await
174            .map_err(map_sqlx_error)?;
175
176        sqlx::query(
177            "INSERT INTO ff_waitpoint_hmac (kid, secret, rotated_at_ms, active) \
178             VALUES ($1, $2, $3, TRUE)",
179        )
180        .bind(&args.new_kid)
181        .bind(&secret_bytes)
182        .bind(now_ms)
183        .execute(&mut *tx)
184        .await
185        .map_err(map_sqlx_error)?;
186
187        tx.commit().await.map_err(map_sqlx_error)?;
188
189        Ok(RotateWaitpointHmacSecretOutcome::Rotated {
190            previous_kid: prior_active.map(|(k,)| k),
191            new_kid: args.new_kid.clone(),
192            gc_count: 0,
193        })
194    }
195    .await;
196
197    Ok(RotateWaitpointHmacSecretAllResult::new(vec![
198        RotateWaitpointHmacSecretAllEntry::new(0, outcome_res),
199    ]))
200}
201
202/// Implementation of `EngineBackend::seed_waitpoint_hmac_secret` (issue #280).
203///
204/// Semantics against the global `ff_waitpoint_hmac` table:
205///
206/// * No active row exists and no row for the supplied kid → INSERT
207///   the new kid as the active signing kid. Returns
208///   `SeedOutcome::Seeded { kid }`.
209/// * Row for the supplied kid exists (regardless of `active`) →
210///   `SeedOutcome::AlreadySeeded { kid, same_secret }` where
211///   `same_secret` reports whether the stored bytes match.
212/// * Different kid is currently `active=TRUE` → `Validation(InvalidInput)`;
213///   operators must rotate rather than re-seed under a conflicting
214///   kid. Mirrors the Valkey backend's behaviour when the per-partition
215///   `current_kid` differs from the supplied one.
216pub async fn seed_waitpoint_hmac_secret_impl(
217    pool: &PgPool,
218    args: ff_core::contracts::SeedWaitpointHmacSecretArgs,
219    now_ms: i64,
220) -> Result<ff_core::contracts::SeedOutcome, EngineError> {
221    use ff_core::contracts::SeedOutcome;
222
223    if args.secret_hex.len() != 64 || !args.secret_hex.chars().all(|c| c.is_ascii_hexdigit()) {
224        return Err(EngineError::Validation {
225            kind: ff_core::engine_error::ValidationKind::InvalidInput,
226            detail: "secret_hex must be 64 hex characters (256-bit secret)".into(),
227        });
228    }
229    if args.kid.is_empty() {
230        return Err(EngineError::Validation {
231            kind: ff_core::engine_error::ValidationKind::InvalidInput,
232            detail: "kid must be non-empty".into(),
233        });
234    }
235    let secret_bytes = hex::decode(&args.secret_hex).map_err(|_| EngineError::Validation {
236        kind: ff_core::engine_error::ValidationKind::InvalidInput,
237        detail: "secret_hex is not valid hex".into(),
238    })?;
239
240    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
241
242    let existing: Option<(Vec<u8>,)> =
243        sqlx::query_as("SELECT secret FROM ff_waitpoint_hmac WHERE kid = $1")
244            .bind(&args.kid)
245            .fetch_optional(&mut *tx)
246            .await
247            .map_err(map_sqlx_error)?;
248    if let Some((prior,)) = existing {
249        tx.commit().await.map_err(map_sqlx_error)?;
250        return Ok(SeedOutcome::AlreadySeeded {
251            kid: args.kid,
252            same_secret: prior == secret_bytes,
253        });
254    }
255
256    let active: Option<(String,)> = sqlx::query_as(
257        "SELECT kid FROM ff_waitpoint_hmac WHERE active = TRUE \
258         ORDER BY rotated_at_ms DESC LIMIT 1",
259    )
260    .fetch_optional(&mut *tx)
261    .await
262    .map_err(map_sqlx_error)?;
263    if let Some((active_kid,)) = active {
264        tx.rollback().await.ok();
265        return Err(EngineError::Validation {
266            kind: ff_core::engine_error::ValidationKind::InvalidInput,
267            detail: format!(
268                "seed_waitpoint_hmac_secret: a different kid {active_kid:?} is already active; \
269                 use rotate_waitpoint_hmac_secret_all to change kid"
270            ),
271        });
272    }
273
274    sqlx::query(
275        "INSERT INTO ff_waitpoint_hmac (kid, secret, rotated_at_ms, active) \
276         VALUES ($1, $2, $3, TRUE)",
277    )
278    .bind(&args.kid)
279    .bind(&secret_bytes)
280    .bind(now_ms)
281    .execute(&mut *tx)
282    .await
283    .map_err(map_sqlx_error)?;
284
285    tx.commit().await.map_err(map_sqlx_error)?;
286    Ok(SeedOutcome::Seeded { kid: args.kid })
287}
288
289// HMAC round-trip + tamper tests live in `ff_core::crypto::hmac`
290// alongside the extracted primitive.