Skip to main content

ff_backend_postgres/
claim_grant.rs

1//! RFC-024 PR-D — `ff_claim_grant` table accessors + the two new
2//! RFC-024 methods (`issue_reclaim_grant`, `reclaim_execution`).
3//!
4//! Replaces the pre-RFC JSON stash at
5//! `ff_exec_core.raw_fields.claim_grant` (see `scheduler.rs:252` pre-
6//! PR-D). The scheduler's write path now calls
7//! [`write_claim_grant`] below; the scheduler's verify/read path
8//! (`scheduler::verify_grant`) reads from the same table.
9//!
10//! # Isolation
11//!
12//! Both RFC-024 methods run under SERIALIZABLE + a 3-attempt retry
13//! loop (mirrors `operator::cancel_execution_impl`).
14//!
15//! # Grant identity
16//!
17//! `grant_id` (BYTEA PK) is the sha256 of the Valkey/HMAC-signed
18//! `grant_key` string. Deterministic, stable across retries, and
19//! decoupled from the HMAC encoding (no need to re-sign on backfill).
20
21use std::time::Duration;
22
23use ff_core::backend::{BackendTag, Handle, HandleKind};
24use ff_core::contracts::{
25    IssueReclaimGrantArgs, IssueReclaimGrantOutcome, ReclaimExecutionArgs,
26    ReclaimExecutionOutcome, ReclaimGrant,
27};
28use ff_core::engine_error::{ContentionKind, EngineError, ValidationKind};
29use ff_core::handle_codec::{encode as encode_opaque, HandlePayload};
30use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
31use ff_core::types::{AttemptIndex, ExecutionId, LeaseEpoch};
32use sha2::{Digest, Sha256};
33use sqlx::{PgPool, Postgres, Row};
34use uuid::Uuid;
35
36use crate::error::map_sqlx_error;
37use crate::lease_event;
38use crate::signal::{current_active_kid, hmac_sign};
39
40const MAX_ATTEMPTS: u32 = 3;
41
42/// Default per-Rust-surface ceiling per RFC-024 §4.6.
43pub const DEFAULT_MAX_RECLAIM_COUNT: u32 = 1000;
44
45/// sha256(grant_key) → 32-byte grant_id.
46pub fn grant_id_from_key(grant_key: &str) -> Vec<u8> {
47    let mut h = Sha256::new();
48    h.update(grant_key.as_bytes());
49    h.finalize().to_vec()
50}
51
52fn capabilities_jsonb(caps: &std::collections::BTreeSet<String>) -> serde_json::Value {
53    serde_json::Value::Array(
54        caps.iter()
55            .map(|c| serde_json::Value::String(c.clone()))
56            .collect(),
57    )
58}
59
60/// Insert a fresh claim grant row. Called by the scheduler's Stage 5.
61#[allow(clippy::too_many_arguments)]
62pub async fn write_claim_grant<'c>(
63    tx: &mut sqlx::Transaction<'c, Postgres>,
64    partition_key: i16,
65    grant_key: &str,
66    execution_id: Uuid,
67    worker_id: &str,
68    worker_instance_id: &str,
69    grant_ttl_ms: u64,
70    issued_at_ms: i64,
71    expires_at_ms: i64,
72) -> Result<(), EngineError> {
73    let grant_id = grant_id_from_key(grant_key);
74    sqlx::query(
75        r#"
76        INSERT INTO ff_claim_grant (
77            partition_key, grant_id, execution_id, kind,
78            worker_id, worker_instance_id, lane_id,
79            capability_hash, worker_capabilities,
80            route_snapshot_json, admission_summary,
81            grant_ttl_ms, issued_at_ms, expires_at_ms
82        ) VALUES (
83            $1, $2, $3, 'claim',
84            $4, $5, NULL,
85            NULL, '[]'::jsonb,
86            NULL, NULL,
87            $6, $7, $8
88        )
89        ON CONFLICT (partition_key, grant_id) DO UPDATE SET
90            worker_id = EXCLUDED.worker_id,
91            worker_instance_id = EXCLUDED.worker_instance_id,
92            grant_ttl_ms = EXCLUDED.grant_ttl_ms,
93            issued_at_ms = EXCLUDED.issued_at_ms,
94            expires_at_ms = EXCLUDED.expires_at_ms
95        "#,
96    )
97    .bind(partition_key)
98    .bind(&grant_id)
99    .bind(execution_id)
100    .bind(worker_id)
101    .bind(worker_instance_id)
102    .bind(i64::try_from(grant_ttl_ms).unwrap_or(i64::MAX))
103    .bind(issued_at_ms)
104    .bind(expires_at_ms)
105    .execute(&mut **tx)
106    .await
107    .map_err(map_sqlx_error)?;
108
109    // RFC-024 PR-D rollout dual-write — mirror the grant into the
110    // legacy `ff_exec_core.raw_fields.claim_grant` JSON blob so an
111    // older-version reader still in the rolling-deploy window can
112    // verify grants issued by the new code. The new code reads from
113    // `ff_claim_grant` exclusively; this write exists only for
114    // mixed-version read compat and is dropped in a future
115    // v0.13+ migration (0018) once the deploy window closes.
116    //
117    // Shape matches the pre-PR-D payload at `scheduler.rs:252`:
118    //   { grant_key, worker_id, worker_instance_id,
119    //     expires_at_ms, issued_at_ms, kid }
120    // `kid` is not available here (sig lives inside `grant_key`);
121    // legacy verify code only needs the 5 fields it actually reads.
122    let grant_json = serde_json::json!({
123        "grant_key": grant_key,
124        "worker_id": worker_id,
125        "worker_instance_id": worker_instance_id,
126        "expires_at_ms": expires_at_ms,
127        "issued_at_ms": issued_at_ms,
128    });
129    sqlx::query(
130        r#"
131        UPDATE ff_exec_core
132           SET raw_fields = jsonb_set(
133                   COALESCE(raw_fields, '{}'::jsonb),
134                   '{claim_grant}',
135                   $3::jsonb,
136                   true)
137         WHERE partition_key = $1 AND execution_id = $2
138        "#,
139    )
140    .bind(partition_key)
141    .bind(execution_id)
142    .bind(grant_json)
143    .execute(&mut **tx)
144    .await
145    .map_err(map_sqlx_error)?;
146
147    Ok(())
148}
149
150/// Read a claim-grant row (kind='claim') for verification /
151/// diagnostics. Returns `Ok(None)` when absent.
152pub async fn read_claim_grant_identity(
153    pool: &PgPool,
154    partition_key: i16,
155    execution_id: Uuid,
156) -> Result<Option<(String, String)>, EngineError> {
157    let row = sqlx::query(
158        r#"
159        SELECT worker_id, worker_instance_id
160          FROM ff_claim_grant
161         WHERE partition_key = $1
162           AND execution_id = $2
163           AND kind = 'claim'
164         ORDER BY issued_at_ms DESC
165         LIMIT 1
166        "#,
167    )
168    .bind(partition_key)
169    .bind(execution_id)
170    .fetch_optional(pool)
171    .await
172    .map_err(map_sqlx_error)?;
173    Ok(row.map(|r| {
174        (
175            r.get::<String, _>("worker_id"),
176            r.get::<String, _>("worker_instance_id"),
177        )
178    }))
179}
180
181// ─── retry helpers ──────────────────────────────────────────────────
182
183fn is_retryable_serialization(err: &EngineError) -> bool {
184    matches!(err, EngineError::Contention(ContentionKind::LeaseConflict))
185}
186
187async fn begin_serializable(pool: &PgPool) -> Result<sqlx::Transaction<'_, Postgres>, EngineError> {
188    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
189    sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
190        .execute(&mut *tx)
191        .await
192        .map_err(map_sqlx_error)?;
193    Ok(tx)
194}
195
196fn split_exec_id(eid: &ExecutionId) -> Result<(i16, Uuid), EngineError> {
197    let s = eid.as_str();
198    let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
199        kind: ValidationKind::InvalidInput,
200        detail: format!("execution_id missing `{{fp:` prefix: {s}"),
201    })?;
202    let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
203        kind: ValidationKind::InvalidInput,
204        detail: format!("execution_id missing `}}:`: {s}"),
205    })?;
206    // Parse the wire-shape `u16` partition index and cast to the
207    // schema's `smallint` column type. Matches the
208    // `partition.index as i16` convention used in
209    // `budget.rs:221`/`exec_core.rs:318`. Parsing directly as `i16`
210    // would accept negatives and wrap under cast — wrong hash-tag
211    // routing on negative input.
212    let part_u: u16 = rest[..close].parse().map_err(|_| EngineError::Validation {
213        kind: ValidationKind::InvalidInput,
214        detail: format!("execution_id partition index not u16: {s}"),
215    })?;
216    let part = part_u as i16;
217    let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
218        kind: ValidationKind::InvalidInput,
219        detail: format!("execution_id UUID invalid: {s}"),
220    })?;
221    Ok((part, uuid))
222}
223
224fn now_ms() -> i64 {
225    let d = std::time::SystemTime::now()
226        .duration_since(std::time::UNIX_EPOCH)
227        .unwrap_or(Duration::ZERO);
228    (d.as_millis() as i64).max(0)
229}
230
231// ─── issue_reclaim_grant ─────────────────────────────────────────────
232
233async fn issue_reclaim_grant_once(
234    pool: &PgPool,
235    args: &IssueReclaimGrantArgs,
236) -> Result<IssueReclaimGrantOutcome, EngineError> {
237    let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
238    let (kid, secret) = current_active_kid(pool)
239        .await?
240        .ok_or(EngineError::Unavailable {
241            op: "issue_reclaim_grant: ff_waitpoint_hmac keystore empty",
242        })?;
243
244    let mut tx = begin_serializable(pool).await?;
245
246    // RFC §4.2 admission gate — lifecycle_phase='active' AND either
247    // a reclaimable ownership_state literal OR the pre-RFC implicit
248    // "lease expired / released" shape (lease_expires_at_ms <= now OR
249    // worker_instance_id NULL).
250    let row = sqlx::query(
251        r#"
252        SELECT ec.lifecycle_phase,
253               ec.ownership_state,
254               ec.lease_reclaim_count,
255               a.lease_expires_at_ms,
256               a.worker_instance_id
257          FROM ff_exec_core ec
258          LEFT JOIN ff_attempt a
259            ON a.partition_key = ec.partition_key
260           AND a.execution_id  = ec.execution_id
261           AND a.attempt_index = ec.attempt_index
262         WHERE ec.partition_key = $1 AND ec.execution_id = $2
263         FOR NO KEY UPDATE OF ec
264        "#,
265    )
266    .bind(part)
267    .bind(exec_uuid)
268    .fetch_optional(&mut *tx)
269    .await
270    .map_err(map_sqlx_error)?;
271
272    let Some(row) = row else {
273        tx.rollback().await.map_err(map_sqlx_error)?;
274        return Err(EngineError::NotFound {
275            entity: "execution",
276        });
277    };
278
279    let lifecycle_phase: String = row.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
280    let ownership_state: String = row.try_get("ownership_state").map_err(map_sqlx_error)?;
281    let reclaim_count: i32 = row.try_get("lease_reclaim_count").map_err(map_sqlx_error)?;
282    let lease_expires_at: Option<i64> = row
283        .try_get::<Option<i64>, _>("lease_expires_at_ms")
284        .map_err(map_sqlx_error)?;
285    let worker_instance_id: Option<String> = row
286        .try_get::<Option<String>, _>("worker_instance_id")
287        .map_err(map_sqlx_error)?;
288
289    let reclaim_count_u = u32::try_from(reclaim_count.max(0)).unwrap_or(0);
290    if reclaim_count_u >= DEFAULT_MAX_RECLAIM_COUNT {
291        tx.rollback().await.map_err(map_sqlx_error)?;
292        return Ok(IssueReclaimGrantOutcome::ReclaimCapExceeded {
293            execution_id: args.execution_id.clone(),
294            reclaim_count: reclaim_count_u,
295        });
296    }
297
298    let now = now_ms();
299    let lease_expired = match lease_expires_at {
300        Some(exp) => exp <= now,
301        None => worker_instance_id.is_none() || worker_instance_id.as_deref() == Some(""),
302    };
303    let reclaimable_state = matches!(
304        ownership_state.as_str(),
305        "lease_expired_reclaimable" | "lease_revoked"
306    );
307    let phase_active = lifecycle_phase == "active";
308    if !(phase_active && (reclaimable_state || lease_expired)) {
309        tx.rollback().await.map_err(map_sqlx_error)?;
310        return Ok(IssueReclaimGrantOutcome::NotReclaimable {
311            execution_id: args.execution_id.clone(),
312            detail: format!(
313                "execution not reclaimable: lifecycle_phase={lifecycle_phase}, ownership_state={ownership_state}"
314            ),
315        });
316    }
317
318    let partition = Partition {
319        family: PartitionFamily::Execution,
320        index: part as u16,
321    };
322    let hash_tag = partition.hash_tag();
323    let expires_at_ms = now.saturating_add_unsigned(args.grant_ttl_ms.min(i64::MAX as u64));
324    let message = format!(
325        "{hash_tag}|{exec_uuid}|{wid}|{wiid}|{exp}|reclaim",
326        wid = args.worker_id.as_str(),
327        wiid = args.worker_instance_id.as_str(),
328        exp = expires_at_ms,
329    );
330    let sig = hmac_sign(&secret, &kid, message.as_bytes());
331    let grant_key = format!("pg:reclaim:{hash_tag}:{exec_uuid}:{expires_at_ms}:{sig}");
332    let grant_id = grant_id_from_key(&grant_key);
333
334    sqlx::query(
335        r#"
336        INSERT INTO ff_claim_grant (
337            partition_key, grant_id, execution_id, kind,
338            worker_id, worker_instance_id, lane_id,
339            capability_hash, worker_capabilities,
340            route_snapshot_json, admission_summary,
341            grant_ttl_ms, issued_at_ms, expires_at_ms
342        ) VALUES (
343            $1, $2, $3, 'reclaim',
344            $4, $5, $6,
345            $7, $8,
346            $9, $10,
347            $11, $12, $13
348        )
349        "#,
350    )
351    .bind(part)
352    .bind(&grant_id)
353    .bind(exec_uuid)
354    .bind(args.worker_id.as_str())
355    .bind(args.worker_instance_id.as_str())
356    .bind(args.lane_id.as_str())
357    .bind(args.capability_hash.as_deref())
358    .bind(capabilities_jsonb(&args.worker_capabilities))
359    .bind(args.route_snapshot_json.as_deref())
360    .bind(args.admission_summary.as_deref())
361    .bind(i64::try_from(args.grant_ttl_ms).unwrap_or(i64::MAX))
362    .bind(now)
363    .bind(expires_at_ms)
364    .execute(&mut *tx)
365    .await
366    .map_err(map_sqlx_error)?;
367
368    tx.commit().await.map_err(map_sqlx_error)?;
369
370    Ok(IssueReclaimGrantOutcome::Granted(ReclaimGrant::new(
371        args.execution_id.clone(),
372        PartitionKey::from(&partition),
373        grant_key,
374        expires_at_ms as u64,
375        args.lane_id.clone(),
376    )))
377}
378
379pub async fn issue_reclaim_grant_impl(
380    pool: &PgPool,
381    args: IssueReclaimGrantArgs,
382) -> Result<IssueReclaimGrantOutcome, EngineError> {
383    let mut last: Option<EngineError> = None;
384    for attempt in 0..MAX_ATTEMPTS {
385        match issue_reclaim_grant_once(pool, &args).await {
386            Ok(r) => return Ok(r),
387            Err(err) if is_retryable_serialization(&err) => {
388                if attempt + 1 < MAX_ATTEMPTS {
389                    let ms = 5u64 * (1u64 << attempt);
390                    tokio::time::sleep(Duration::from_millis(ms)).await;
391                }
392                last = Some(err);
393                continue;
394            }
395            Err(err) => return Err(err),
396        }
397    }
398    let _ = last;
399    Err(EngineError::Contention(ContentionKind::RetryExhausted))
400}
401
402// ─── reclaim_execution ─────────────────────────────────────────────
403
404async fn reclaim_execution_once(
405    pool: &PgPool,
406    args: &ReclaimExecutionArgs,
407) -> Result<ReclaimExecutionOutcome, EngineError> {
408    let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
409    let max_reclaim_count = args.max_reclaim_count.unwrap_or(DEFAULT_MAX_RECLAIM_COUNT);
410
411    let mut tx = begin_serializable(pool).await?;
412
413    let grant_row = sqlx::query(
414        r#"
415        SELECT grant_id, worker_id, expires_at_ms, lane_id
416          FROM ff_claim_grant
417         WHERE partition_key = $1
418           AND execution_id = $2
419           AND kind = 'reclaim'
420         ORDER BY issued_at_ms DESC
421         FOR UPDATE
422         LIMIT 1
423        "#,
424    )
425    .bind(part)
426    .bind(exec_uuid)
427    .fetch_optional(&mut *tx)
428    .await
429    .map_err(map_sqlx_error)?;
430
431    let Some(grant_row) = grant_row else {
432        tx.rollback().await.map_err(map_sqlx_error)?;
433        return Ok(ReclaimExecutionOutcome::GrantNotFound {
434            execution_id: args.execution_id.clone(),
435        });
436    };
437    let grant_id: Vec<u8> = grant_row.try_get("grant_id").map_err(map_sqlx_error)?;
438    let grant_worker_id: String = grant_row.try_get("worker_id").map_err(map_sqlx_error)?;
439    let grant_expires_at_ms: i64 = grant_row
440        .try_get("expires_at_ms")
441        .map_err(map_sqlx_error)?;
442
443    // RFC-024 §4.4 worker identity contract: worker_id must match;
444    // worker_instance_id is informational only.
445    if grant_worker_id != args.worker_id.as_str() {
446        tx.rollback().await.map_err(map_sqlx_error)?;
447        return Err(EngineError::Validation {
448            kind: ValidationKind::InvalidInput,
449            detail: format!(
450                "reclaim_execution: grant.worker_id={grant_worker_id} != args.worker_id={}",
451                args.worker_id.as_str()
452            ),
453        });
454    }
455    let now = now_ms();
456    if grant_expires_at_ms <= now {
457        tx.rollback().await.map_err(map_sqlx_error)?;
458        return Ok(ReclaimExecutionOutcome::GrantNotFound {
459            execution_id: args.execution_id.clone(),
460        });
461    }
462
463    let core_row = sqlx::query(
464        r#"
465        SELECT ec.lifecycle_phase, ec.attempt_index, ec.lease_reclaim_count,
466               COALESCE(a.lease_epoch, 0) AS prior_lease_epoch
467          FROM ff_exec_core ec
468          LEFT JOIN ff_attempt a
469            ON a.partition_key = ec.partition_key
470           AND a.execution_id  = ec.execution_id
471           AND a.attempt_index = ec.attempt_index
472         WHERE ec.partition_key = $1 AND ec.execution_id = $2
473         FOR NO KEY UPDATE OF ec
474        "#,
475    )
476    .bind(part)
477    .bind(exec_uuid)
478    .fetch_optional(&mut *tx)
479    .await
480    .map_err(map_sqlx_error)?;
481    let Some(core_row) = core_row else {
482        tx.rollback().await.map_err(map_sqlx_error)?;
483        return Err(EngineError::NotFound {
484            entity: "execution",
485        });
486    };
487    let lifecycle_phase: String = core_row.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
488    let cur_attempt_index: i32 = core_row.try_get("attempt_index").map_err(map_sqlx_error)?;
489    let cur_reclaim_count: i32 = core_row
490        .try_get("lease_reclaim_count")
491        .map_err(map_sqlx_error)?;
492    // Prior-attempt lease epoch drives the monotonic bump for the new
493    // attempt's `lease_epoch` (Bug B parity — Valkey Lua
494    // `flowfabric.lua:3106` + SQLite `reclaim.rs::reclaim_execution_inner`
495    // both use `prior + 1`). LEFT JOIN tolerates a missing
496    // prior-attempt row (COALESCE → 0, bump → 1).
497    let prior_lease_epoch: i64 = core_row
498        .try_get("prior_lease_epoch")
499        .map_err(map_sqlx_error)?;
500
501    if lifecycle_phase == "terminal" || lifecycle_phase == "cancelled" {
502        tx.rollback().await.map_err(map_sqlx_error)?;
503        return Ok(ReclaimExecutionOutcome::NotReclaimable {
504            execution_id: args.execution_id.clone(),
505            detail: format!("execution terminal: lifecycle_phase={lifecycle_phase}"),
506        });
507    }
508
509    // Column is `INTEGER NOT NULL DEFAULT 0` so the row value SHOULD
510    // always be ≥ 0. Clamp defensively anyway: a negative value would
511    // wrap under `as u32` and skip the cap check.
512    let next_reclaim_count = (cur_reclaim_count.max(0) as u32).saturating_add(1);
513    if next_reclaim_count > max_reclaim_count {
514        sqlx::query(
515            r#"
516            UPDATE ff_exec_core
517               SET lifecycle_phase    = 'terminal',
518                   ownership_state    = 'unowned',
519                   eligibility_state  = 'not_applicable',
520                   public_state       = 'failed',
521                   attempt_state      = 'terminal_failed',
522                   terminal_at_ms     = COALESCE(terminal_at_ms, $3),
523                   lease_reclaim_count = $4,
524                   cancellation_reason = COALESCE(cancellation_reason, 'reclaim_cap_exceeded')
525             WHERE partition_key = $1 AND execution_id = $2
526            "#,
527        )
528        .bind(part)
529        .bind(exec_uuid)
530        .bind(now)
531        .bind(i32::try_from(next_reclaim_count).unwrap_or(i32::MAX))
532        .execute(&mut *tx)
533        .await
534        .map_err(map_sqlx_error)?;
535        sqlx::query("DELETE FROM ff_claim_grant WHERE partition_key = $1 AND grant_id = $2")
536            .bind(part)
537            .bind(&grant_id)
538            .execute(&mut *tx)
539            .await
540            .map_err(map_sqlx_error)?;
541
542        // Bug A parity fix — RFC-024 §4.2.7 / RFC-019 outbox matrix:
543        // every terminal transition emits BOTH a completion_event and a
544        // lease_event. Cap-exceeded is `terminal_failed`, so both fire.
545        // Mirrors SQLite `reclaim.rs::reclaim_execution_inner` cap-
546        // exceeded branch (lines 376-384, post-PR-E `d16ad68`).
547        sqlx::query(
548            r#"
549            INSERT INTO ff_completion_event (
550                partition_key, execution_id, flow_id, outcome,
551                namespace, instance_tag, occurred_at_ms
552            )
553            SELECT partition_key, execution_id, flow_id, 'failed',
554                   NULL, NULL, $3
555              FROM ff_exec_core
556             WHERE partition_key = $1 AND execution_id = $2
557            "#,
558        )
559        .bind(part)
560        .bind(exec_uuid)
561        .bind(now)
562        .execute(&mut *tx)
563        .await
564        .map_err(map_sqlx_error)?;
565        lease_event::emit(
566            &mut tx,
567            part,
568            exec_uuid,
569            None,
570            lease_event::EVENT_REVOKED,
571            now,
572        )
573        .await?;
574
575        tx.commit().await.map_err(map_sqlx_error)?;
576        return Ok(ReclaimExecutionOutcome::ReclaimCapExceeded {
577            execution_id: args.execution_id.clone(),
578            reclaim_count: next_reclaim_count,
579        });
580    }
581
582    // Mark prior attempt `interrupted_reclaimed`.
583    sqlx::query(
584        r#"
585        UPDATE ff_attempt
586           SET outcome = 'interrupted_reclaimed',
587               terminal_at_ms = COALESCE(terminal_at_ms, $4)
588         WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
589        "#,
590    )
591    .bind(part)
592    .bind(exec_uuid)
593    .bind(cur_attempt_index)
594    .bind(now)
595    .execute(&mut *tx)
596    .await
597    .map_err(map_sqlx_error)?;
598
599    // Insert NEW attempt row (attempt_index = cur+1, epoch = prior+1,
600    // attempt_type = 'reclaim' in raw_fields). Bug B parity fix: the
601    // new lease_epoch is derived from the prior attempt's epoch so
602    // fencing remains monotonic across successive reclaims (matches
603    // Valkey Lua `flowfabric.lua:3106` + SQLite `reclaim.rs`).
604    let new_attempt_index = cur_attempt_index + 1;
605    // Defensive clamp: `ff_attempt.lease_epoch` is `bigint` with no
606    // CHECK constraint, so a malformed row could surface a negative
607    // value. Clamp before bump so the DB-persisted epoch matches the
608    // handle's non-negative u64 value (no DB/handle divergence).
609    let new_lease_epoch: i64 = prior_lease_epoch.max(0).saturating_add(1);
610    let lease_ttl_ms = i64::try_from(args.lease_ttl_ms).unwrap_or(i64::MAX);
611    let new_lease_expires = now.saturating_add(lease_ttl_ms);
612    sqlx::query(
613        r#"
614        INSERT INTO ff_attempt (
615            partition_key, execution_id, attempt_index,
616            worker_id, worker_instance_id,
617            lease_epoch, lease_expires_at_ms, started_at_ms,
618            raw_fields
619        ) VALUES (
620            $1, $2, $3,
621            $4, $5,
622            $8, $6, $7,
623            jsonb_build_object('attempt_type', 'reclaim')
624        )
625        "#,
626    )
627    .bind(part)
628    .bind(exec_uuid)
629    .bind(new_attempt_index)
630    .bind(args.worker_id.as_str())
631    .bind(args.worker_instance_id.as_str())
632    .bind(new_lease_expires)
633    .bind(now)
634    .bind(new_lease_epoch)
635    .execute(&mut *tx)
636    .await
637    .map_err(map_sqlx_error)?;
638
639    // Flip exec_core to active on the new attempt; bump reclaim count.
640    sqlx::query(
641        r#"
642        UPDATE ff_exec_core
643           SET lifecycle_phase   = 'active',
644               ownership_state   = 'leased',
645               eligibility_state = 'not_applicable',
646               public_state      = 'running',
647               attempt_state     = 'running_attempt',
648               attempt_index     = $3,
649               lease_reclaim_count = $4,
650               started_at_ms     = COALESCE(started_at_ms, $5)
651         WHERE partition_key = $1 AND execution_id = $2
652        "#,
653    )
654    .bind(part)
655    .bind(exec_uuid)
656    .bind(new_attempt_index)
657    .bind(i32::try_from(next_reclaim_count).unwrap_or(i32::MAX))
658    .bind(now)
659    .execute(&mut *tx)
660    .await
661    .map_err(map_sqlx_error)?;
662
663    // Consume the grant.
664    sqlx::query("DELETE FROM ff_claim_grant WHERE partition_key = $1 AND grant_id = $2")
665        .bind(part)
666        .bind(&grant_id)
667        .execute(&mut *tx)
668        .await
669        .map_err(map_sqlx_error)?;
670
671    // RFC-019 Stage B outbox: lease reclaimed.
672    lease_event::emit(
673        &mut tx,
674        part,
675        exec_uuid,
676        None,
677        lease_event::EVENT_RECLAIMED,
678        now,
679    )
680    .await?;
681
682    tx.commit().await.map_err(map_sqlx_error)?;
683
684    // RFC-024 §3.3 / §4.4: the HandlePayload carries the
685    // caller-supplied `attempt_id` + `lease_id`. Valkey PR-F round-
686    // trips these through `ff_reclaim_execution` ARGV[5] (lease_id) +
687    // ARGV[7] (attempt_id) — the Lua echoes them back in the success
688    // tuple, so the handle the Valkey surface mints embeds the
689    // caller's identifiers. Postgres has no Lua round-trip; we use
690    // the args values directly to preserve cross-backend parity +
691    // idempotency + lease fencing.
692    let payload = HandlePayload::new(
693        args.execution_id.clone(),
694        AttemptIndex::new(u32::try_from(new_attempt_index.max(0)).unwrap_or(0)),
695        args.attempt_id.clone(),
696        args.lease_id.clone(),
697        LeaseEpoch(u64::try_from(new_lease_epoch.max(0)).unwrap_or(0)),
698        u32::try_from(args.lease_ttl_ms.min(u32::MAX as u64)).unwrap_or(u32::MAX) as u64,
699        args.lane_id.clone(),
700        args.worker_instance_id.clone(),
701    );
702    Ok(ReclaimExecutionOutcome::Claimed(mint_handle(
703        payload,
704        HandleKind::Reclaimed,
705    )))
706}
707
708pub async fn reclaim_execution_impl(
709    pool: &PgPool,
710    args: ReclaimExecutionArgs,
711) -> Result<ReclaimExecutionOutcome, EngineError> {
712    let mut last: Option<EngineError> = None;
713    for attempt in 0..MAX_ATTEMPTS {
714        match reclaim_execution_once(pool, &args).await {
715            Ok(r) => return Ok(r),
716            Err(err) if is_retryable_serialization(&err) => {
717                if attempt + 1 < MAX_ATTEMPTS {
718                    let ms = 5u64 * (1u64 << attempt);
719                    tokio::time::sleep(Duration::from_millis(ms)).await;
720                }
721                last = Some(err);
722                continue;
723            }
724            Err(err) => return Err(err),
725        }
726    }
727    let _ = last;
728    Err(EngineError::Contention(ContentionKind::RetryExhausted))
729}
730
731fn mint_handle(payload: HandlePayload, kind: HandleKind) -> Handle {
732    let op = encode_opaque(BackendTag::Postgres, &payload);
733    Handle::new(BackendTag::Postgres, kind, op)
734}