ff_backend_postgres/signal.rs
1//! Signal delivery + HMAC secret rotation for the Postgres backend.
2//!
3//! **Wave 4 Agent D (RFC-v0.7 v0.7 migration-master).**
4//!
5//! **Scope note.** The Wave-4 brief asks for an 8-method
6//! suspend+signal family. The actual `EngineBackend` trait today
7//! exposes 6 of those (`suspend`, `observe_signals`,
8//! `list_suspended`, `claim_resumed_execution`, `deliver_signal`,
9//! `rotate_waitpoint_hmac_secret_all`); `try_suspend` and the
10//! single-partition `rotate_waitpoint_hmac_secret` are not on the
11//! trait surface. Extending the trait touches every parallel Wave-4
12//! agent's compile graph and needs owner adjudication, so this
13//! tranche ships the in-trait method that is fully standalone
14//! (`rotate_waitpoint_hmac_secret_all`) plus the primitives the
15//! other 5 methods will consume in a follow-up:
16//!
17//! * [`hmac_sign`] / [`hmac_verify`] — first Rust-side HMAC code in
18//! the workspace. The Valkey backend signs inside Lua; this module
19//! owns server-side signing on the Postgres path.
20//! * [`SERIALIZABLE_RETRY_BUDGET`] — the Q11 retry cap used by the
21//! suspend / deliver_signal SERIALIZABLE sites when those land.
22//! * [`current_active_kid`] / [`fetch_kid`] — keystore helpers.
23//! * [`rotate_waitpoint_hmac_secret_all_impl`] — Q4 single-global-
24//! row write + active-flag flip. Wired into `EngineBackend` in
25//! [`crate::lib.rs`].
26//!
27//! HMAC sign/verify primitives hoisted to `ff_core::crypto::hmac` in
28//! RFC-023 Phase 2b.2.1 so the Postgres + SQLite backends share one
29//! Rust implementation. The Valkey backend still signs inside Lua, so
30//! no third consumer is pending. This module re-exports for backward
31//! compatibility of internal call sites.
32
33use ff_core::engine_error::EngineError;
34use sqlx::PgPool;
35
36use crate::error::map_sqlx_error;
37
38pub use ff_core::crypto::hmac::{hmac_sign, hmac_verify, HmacVerifyError};
39
40/// Q11 retry budget for SERIALIZABLE transactions. On retry exhaustion
41/// the suspend / deliver_signal call sites are expected to return
42/// `EngineError::Contention(LeaseConflict)` so the reconciler (or
43/// calling worker) reconstructs intent rather than spinning in-process.
44pub const SERIALIZABLE_RETRY_BUDGET: usize = 3;
45
46/// True iff `err` is a retryable serialization/deadlock fault.
47/// Exposed for callers that run their own SERIALIZABLE-tx retry loop
48/// and need to tell retryable from fatal on sqlx errors.
49pub fn is_retryable_serialization(err: &sqlx::Error) -> bool {
50 if let Some(db) = err.as_database_error()
51 && let Some(code) = db.code()
52 {
53 matches!(code.as_ref(), "40001" | "40P01")
54 } else {
55 false
56 }
57}
58
59/// Resolve the currently-active HMAC secret (kid + bytes) from
60/// `ff_waitpoint_hmac`. Returns `Ok(None)` when the keystore is empty
61/// (bootstrap race); callers treat that as a state error.
62pub async fn current_active_kid(
63 pool: &PgPool,
64) -> Result<Option<(String, Vec<u8>)>, EngineError> {
65 let row: Option<(String, Vec<u8>)> = sqlx::query_as(
66 "SELECT kid, secret FROM ff_waitpoint_hmac \
67 WHERE active = TRUE \
68 ORDER BY rotated_at_ms DESC LIMIT 1",
69 )
70 .fetch_optional(pool)
71 .await
72 .map_err(map_sqlx_error)?;
73 Ok(row)
74}
75
76/// Fetch a specific kid's secret. Returns `Ok(None)` when the kid is
77/// unknown. Includes inactive kids (inside the rotation grace window).
78pub async fn fetch_kid(pool: &PgPool, kid: &str) -> Result<Option<Vec<u8>>, EngineError> {
79 let row: Option<(Vec<u8>,)> = sqlx::query_as(
80 "SELECT secret FROM ff_waitpoint_hmac WHERE kid = $1",
81 )
82 .bind(kid)
83 .fetch_optional(pool)
84 .await
85 .map_err(map_sqlx_error)?;
86 Ok(row.map(|(s,)| s))
87}
88
89/// Implementation of `EngineBackend::rotate_waitpoint_hmac_secret_all`.
90///
91/// Q4 (v0.7 migration-master, adjudicated 2026-04-24): the Postgres
92/// backend stores one global row per kid (no `partition_id` column).
93/// The "all" fan-out collapses to
94///
95/// ```sql
96/// UPDATE ff_waitpoint_hmac SET active = FALSE WHERE active = TRUE;
97/// INSERT INTO ff_waitpoint_hmac(kid, secret, rotated_at_ms, active)
98/// VALUES ($1, $2, $3, TRUE);
99/// ```
100///
101/// Result shape: `entries.len() == 1`, `partition = 0`. This is the
102/// wire parity the adjudication pinned — consumers that walk
103/// `entries` work against both Valkey and Postgres backends.
104///
105/// **Replay contract.** If `new_kid` is already installed with the
106/// same secret bytes, return `Noop { kid }` — operator retries are
107/// safe. Same kid with *different* bytes is a `RotationConflict`:
108/// callers must pick a fresh kid instead of silently overwriting.
109pub async fn rotate_waitpoint_hmac_secret_all_impl(
110 pool: &PgPool,
111 args: ff_core::contracts::RotateWaitpointHmacSecretAllArgs,
112 now_ms: i64,
113) -> Result<ff_core::contracts::RotateWaitpointHmacSecretAllResult, EngineError> {
114 use ff_core::contracts::{
115 RotateWaitpointHmacSecretAllEntry, RotateWaitpointHmacSecretAllResult,
116 RotateWaitpointHmacSecretOutcome,
117 };
118
119 // Decode the hex secret up front so a malformed input fails
120 // Validation rather than smuggling through as Transport.
121 let secret_bytes = hex::decode(&args.new_secret_hex).map_err(|_| {
122 EngineError::Validation {
123 kind: ff_core::engine_error::ValidationKind::InvalidInput,
124 detail: "new_secret_hex is not valid hex".into(),
125 }
126 })?;
127
128 let outcome_res: Result<RotateWaitpointHmacSecretOutcome, EngineError> = async {
129 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
130
131 // Exact replay check: same kid + same bytes ⇒ Noop.
132 let existing: Option<(Vec<u8>,)> = sqlx::query_as(
133 "SELECT secret FROM ff_waitpoint_hmac WHERE kid = $1",
134 )
135 .bind(&args.new_kid)
136 .fetch_optional(&mut *tx)
137 .await
138 .map_err(map_sqlx_error)?;
139 if let Some((prior,)) = existing {
140 if prior == secret_bytes {
141 tx.commit().await.map_err(map_sqlx_error)?;
142 return Ok(RotateWaitpointHmacSecretOutcome::Noop {
143 kid: args.new_kid.clone(),
144 });
145 }
146 tx.rollback().await.ok();
147 return Err(EngineError::Conflict(
148 ff_core::engine_error::ConflictKind::RotationConflict(format!(
149 "kid {} already installed with a different secret",
150 args.new_kid
151 )),
152 ));
153 }
154
155 // Capture the prior current-kid for the outcome envelope.
156 let prior_active: Option<(String,)> = sqlx::query_as(
157 "SELECT kid FROM ff_waitpoint_hmac \
158 WHERE active = TRUE \
159 ORDER BY rotated_at_ms DESC LIMIT 1",
160 )
161 .fetch_optional(&mut *tx)
162 .await
163 .map_err(map_sqlx_error)?;
164
165 // `grace_ms` is not stored per-row in the Wave-3 schema; the
166 // grace window is enforced by retaining prior `active=false`
167 // rows in the table so [`fetch_kid`] still returns their
168 // secret during verification within the window.
169 let _ = args.grace_ms;
170
171 sqlx::query("UPDATE ff_waitpoint_hmac SET active = FALSE WHERE active = TRUE")
172 .execute(&mut *tx)
173 .await
174 .map_err(map_sqlx_error)?;
175
176 sqlx::query(
177 "INSERT INTO ff_waitpoint_hmac (kid, secret, rotated_at_ms, active) \
178 VALUES ($1, $2, $3, TRUE)",
179 )
180 .bind(&args.new_kid)
181 .bind(&secret_bytes)
182 .bind(now_ms)
183 .execute(&mut *tx)
184 .await
185 .map_err(map_sqlx_error)?;
186
187 tx.commit().await.map_err(map_sqlx_error)?;
188
189 Ok(RotateWaitpointHmacSecretOutcome::Rotated {
190 previous_kid: prior_active.map(|(k,)| k),
191 new_kid: args.new_kid.clone(),
192 gc_count: 0,
193 })
194 }
195 .await;
196
197 Ok(RotateWaitpointHmacSecretAllResult::new(vec![
198 RotateWaitpointHmacSecretAllEntry::new(0, outcome_res),
199 ]))
200}
201
202/// Implementation of `EngineBackend::seed_waitpoint_hmac_secret` (issue #280).
203///
204/// Semantics against the global `ff_waitpoint_hmac` table:
205///
206/// * No active row exists and no row for the supplied kid → INSERT
207/// the new kid as the active signing kid. Returns
208/// `SeedOutcome::Seeded { kid }`.
209/// * Row for the supplied kid exists (regardless of `active`) →
210/// `SeedOutcome::AlreadySeeded { kid, same_secret }` where
211/// `same_secret` reports whether the stored bytes match.
212/// * Different kid is currently `active=TRUE` → `Validation(InvalidInput)`;
213/// operators must rotate rather than re-seed under a conflicting
214/// kid. Mirrors the Valkey backend's behaviour when the per-partition
215/// `current_kid` differs from the supplied one.
216pub async fn seed_waitpoint_hmac_secret_impl(
217 pool: &PgPool,
218 args: ff_core::contracts::SeedWaitpointHmacSecretArgs,
219 now_ms: i64,
220) -> Result<ff_core::contracts::SeedOutcome, EngineError> {
221 use ff_core::contracts::SeedOutcome;
222
223 if args.secret_hex.len() != 64 || !args.secret_hex.chars().all(|c| c.is_ascii_hexdigit()) {
224 return Err(EngineError::Validation {
225 kind: ff_core::engine_error::ValidationKind::InvalidInput,
226 detail: "secret_hex must be 64 hex characters (256-bit secret)".into(),
227 });
228 }
229 if args.kid.is_empty() {
230 return Err(EngineError::Validation {
231 kind: ff_core::engine_error::ValidationKind::InvalidInput,
232 detail: "kid must be non-empty".into(),
233 });
234 }
235 let secret_bytes = hex::decode(&args.secret_hex).map_err(|_| EngineError::Validation {
236 kind: ff_core::engine_error::ValidationKind::InvalidInput,
237 detail: "secret_hex is not valid hex".into(),
238 })?;
239
240 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
241
242 let existing: Option<(Vec<u8>,)> =
243 sqlx::query_as("SELECT secret FROM ff_waitpoint_hmac WHERE kid = $1")
244 .bind(&args.kid)
245 .fetch_optional(&mut *tx)
246 .await
247 .map_err(map_sqlx_error)?;
248 if let Some((prior,)) = existing {
249 tx.commit().await.map_err(map_sqlx_error)?;
250 return Ok(SeedOutcome::AlreadySeeded {
251 kid: args.kid,
252 same_secret: prior == secret_bytes,
253 });
254 }
255
256 let active: Option<(String,)> = sqlx::query_as(
257 "SELECT kid FROM ff_waitpoint_hmac WHERE active = TRUE \
258 ORDER BY rotated_at_ms DESC LIMIT 1",
259 )
260 .fetch_optional(&mut *tx)
261 .await
262 .map_err(map_sqlx_error)?;
263 if let Some((active_kid,)) = active {
264 tx.rollback().await.ok();
265 return Err(EngineError::Validation {
266 kind: ff_core::engine_error::ValidationKind::InvalidInput,
267 detail: format!(
268 "seed_waitpoint_hmac_secret: a different kid {active_kid:?} is already active; \
269 use rotate_waitpoint_hmac_secret_all to change kid"
270 ),
271 });
272 }
273
274 sqlx::query(
275 "INSERT INTO ff_waitpoint_hmac (kid, secret, rotated_at_ms, active) \
276 VALUES ($1, $2, $3, TRUE)",
277 )
278 .bind(&args.kid)
279 .bind(&secret_bytes)
280 .bind(now_ms)
281 .execute(&mut *tx)
282 .await
283 .map_err(map_sqlx_error)?;
284
285 tx.commit().await.map_err(map_sqlx_error)?;
286 Ok(SeedOutcome::Seeded { kid: args.kid })
287}
288
289// HMAC round-trip + tamper tests live in `ff_core::crypto::hmac`
290// alongside the extracted primitive.