Skip to main content

ff_backend_postgres/
scheduler.rs

1//! Admission-pipeline for the Postgres backend (RFC-v0.7 Wave 5b).
2//!
3//! Mirrors `ff-scheduler::claim::Scheduler::claim_for_worker` on Valkey:
4//! find an eligible execution, match capabilities, admit under budget,
5//! admit under quota, and issue a signed [`ClaimGrant`]. Returns
6//! `Ok(None)` when no candidate is admissible.
7//!
8//! # Pipeline
9//!
10//! All six pipeline stages run in one `READ COMMITTED` transaction
11//! (Q11):
12//!
13//! 1. **Eligible pick.** `SELECT ... FROM ff_exec_core WHERE lane_id =
14//!    $1 AND lifecycle_phase = 'runnable' AND eligibility_state =
15//!    'eligible_now' ORDER BY priority DESC, created_at_ms ASC FOR
16//!    UPDATE SKIP LOCKED LIMIT N`. Over-fetches so capability-subset
17//!    filtering has candidates after per-row rejects.
18//! 2. **Capability subset-match.** Each over-fetched row is filtered
19//!    via [`ff_core::caps::matches`]. First matching row wins.
20//! 3. **Budget admission.** If the exec row carries `budget_ids`,
21//!    each referenced [`ff_budget_policy`] row is `FOR SHARE`-locked,
22//!    its `policy_json` parsed for `hard_limit` + `dimension`, and
23//!    the current `ff_budget_usage` value compared. Breach →
24//!    `Ok(None)`, row left eligible for another worker/tick.
25//! 4. **Quota admission.** There is no quota schema in 0001/0002.
26//!    This stage is a no-op and reported as "skipped — quota schema
27//!    not yet migrated" in the return channel. Grep of the migrations
28//!    directory confirms: only budget + core + suspension + stream
29//!    tables exist.
30//! 5. **Issue ClaimGrant.** Uses the Wave-4d global
31//!    [`ff_waitpoint_hmac`] keystore via [`hmac_sign`] — the same
32//!    primitive signing waitpoint tokens. The signed blob rides
33//!    inside [`ClaimGrant::grant_key`] as
34//!    `pg:<hash-tag>:<uuid>:<expires_ms>:<kid>:<hex>`. The grant
35//!    itself is stashed into `ff_exec_core.raw_fields.claim_grant`
36//!    (no schema addition — Wave-4b already uses `raw_fields` as the
37//!    untyped-column overflow; see `progress`).
38//! 6. **Commit + return grant.**
39//!
40//! # Isolation (Q11)
41//!
42//! READ COMMITTED + `FOR UPDATE SKIP LOCKED` on the eligible pick +
43//! `FOR SHARE` on budget policy rows. No SERIALIZABLE retries.
44//!
45//! # Scheduler integration
46//!
47//! `ff-scheduler` today is Valkey-specific (`ferriskey::Client`
48//! embedded in `Scheduler`). Rather than add `ff-backend-postgres`
49//! (and its sqlx transitive graph) as a dep of ff-scheduler — which
50//! would pollute every consumer (ff-server, ff-observability,
51//! ff-test, ff-readiness-tests, ff-script, ff-backend-valkey) — the
52//! Postgres variant lives here as a free-standing [`PostgresScheduler`]
53//! struct. ff-server dispatches against the concrete backend type it
54//! constructed (Valkey → `ff_scheduler::Scheduler`, Postgres →
55//! `ff_backend_postgres::scheduler::PostgresScheduler`); no trait-
56//! object indirection needed because the engine already decides
57//! backend at boot.
58
59use std::collections::BTreeSet;
60use std::time::{SystemTime, UNIX_EPOCH};
61
62use ff_core::caps::{matches as caps_matches, CapabilityRequirement};
63use ff_core::contracts::ClaimGrant;
64use ff_core::engine_error::{EngineError, ValidationKind};
65use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
66use ff_core::types::{ExecutionId, LaneId, WorkerId, WorkerInstanceId};
67use serde_json::Value as JsonValue;
68use sqlx::{PgPool, Row};
69use uuid::Uuid;
70
71use crate::error::map_sqlx_error;
72use crate::signal::{current_active_kid, hmac_sign};
73
74/// Over-fetch size for the eligible pick. Gives per-row capability
75/// filter some headroom before giving up — matches the Valkey path's
76/// "pick 10 and filter" shape.
77const ELIGIBLE_OVERFETCH: i64 = 10;
78
79/// Postgres admission pipeline. See the module rustdoc for
80/// pipeline + isolation notes.
81pub struct PostgresScheduler {
82    pool: PgPool,
83}
84
85impl PostgresScheduler {
86    pub fn new(pool: PgPool) -> Self {
87        Self { pool }
88    }
89
90    /// Find an eligible execution, admit it against budget + quota,
91    /// and issue a signed [`ClaimGrant`]. Returns `Ok(None)` when no
92    /// candidate is admissible on this lane right now.
93    pub async fn claim_for_worker(
94        &self,
95        lane: &LaneId,
96        worker_id: &WorkerId,
97        worker_instance_id: &WorkerInstanceId,
98        worker_capabilities: &BTreeSet<String>,
99        grant_ttl_ms: u64,
100    ) -> Result<Option<ClaimGrant>, EngineError> {
101        // Read the active HMAC kid up-front — if the keystore is
102        // empty we refuse to issue grants (fail-closed, matches the
103        // Valkey path where ff_issue_claim_grant requires a secret).
104        let (kid, secret) = match current_active_kid(&self.pool).await? {
105            Some(v) => v,
106            None => {
107                return Err(EngineError::Unavailable {
108                    op: "claim_for_worker: ff_waitpoint_hmac keystore empty",
109                });
110            }
111        };
112
113        // Iterate partitions 0..256. Over each partition we run the
114        // full admission tx. Matches the Valkey scheduler's partition-
115        // walk shape; the bounded-scan / rotation-cursor machinery is
116        // Valkey-specific (lives in ff-scheduler) and not ported here.
117        const TOTAL_PARTITIONS: i16 = 256;
118        for part in 0..TOTAL_PARTITIONS {
119            if let Some(grant) = self
120                .try_claim_in_partition(
121                    part,
122                    lane,
123                    worker_id,
124                    worker_instance_id,
125                    worker_capabilities,
126                    grant_ttl_ms,
127                    &kid,
128                    &secret,
129                )
130                .await?
131            {
132                return Ok(Some(grant));
133            }
134        }
135        Ok(None)
136    }
137
138    #[allow(clippy::too_many_arguments)]
139    async fn try_claim_in_partition(
140        &self,
141        part: i16,
142        lane: &LaneId,
143        worker_id: &WorkerId,
144        worker_instance_id: &WorkerInstanceId,
145        worker_capabilities: &BTreeSet<String>,
146        grant_ttl_ms: u64,
147        kid: &str,
148        secret: &[u8],
149    ) -> Result<Option<ClaimGrant>, EngineError> {
150        let mut tx = self.pool.begin().await.map_err(map_sqlx_error)?;
151
152        // ── Stage 1: eligible pick (FOR UPDATE SKIP LOCKED) ──
153        let rows = sqlx::query(
154            r#"
155            SELECT execution_id, required_capabilities, raw_fields
156              FROM ff_exec_core
157             WHERE partition_key = $1
158               AND lane_id = $2
159               AND lifecycle_phase = 'runnable'
160               AND eligibility_state = 'eligible_now'
161             ORDER BY priority DESC, created_at_ms ASC
162             FOR UPDATE SKIP LOCKED
163             LIMIT $3
164            "#,
165        )
166        .bind(part)
167        .bind(lane.as_str())
168        .bind(ELIGIBLE_OVERFETCH)
169        .fetch_all(&mut *tx)
170        .await
171        .map_err(map_sqlx_error)?;
172
173        if rows.is_empty() {
174            tx.rollback().await.map_err(map_sqlx_error)?;
175            return Ok(None);
176        }
177
178        // ── Stage 2: capability subset-match ──
179        let mut picked: Option<(Uuid, JsonValue)> = None;
180        for row in &rows {
181            let required: Vec<String> = row
182                .try_get::<Vec<String>, _>("required_capabilities")
183                .map_err(map_sqlx_error)?;
184            let req = CapabilityRequirement::new(required);
185            let worker_set = ff_core::backend::CapabilitySet::new(worker_capabilities.iter().cloned());
186            if !caps_matches(&req, &worker_set) {
187                continue;
188            }
189            let eid: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
190            let raw: JsonValue = row.try_get("raw_fields").map_err(map_sqlx_error)?;
191            picked = Some((eid, raw));
192            break;
193        }
194        let Some((exec_uuid, raw_fields)) = picked else {
195            tx.rollback().await.map_err(map_sqlx_error)?;
196            return Ok(None);
197        };
198
199        // ── Stage 3: budget admission ──
200        // `budget_ids` is stashed in raw_fields as a comma-separated
201        // string (matches the Valkey HGET shape). Missing field → no
202        // budget attached; empty string → no budget attached.
203        let budget_ids: Vec<String> = raw_fields
204            .get("budget_ids")
205            .and_then(JsonValue::as_str)
206            .map(|s| {
207                s.split(',')
208                    .map(str::trim)
209                    .filter(|s| !s.is_empty())
210                    .map(str::to_owned)
211                    .collect()
212            })
213            .unwrap_or_default();
214
215        for bid in &budget_ids {
216            if !admit_budget(&mut tx, bid).await? {
217                // Breach — leave the row eligible (rollback releases
218                // the FOR UPDATE lock without mutating the row).
219                tx.rollback().await.map_err(map_sqlx_error)?;
220                return Ok(None);
221            }
222        }
223
224        // ── Stage 4: quota admission (no schema → skipped) ──
225        // Quota tables are not in 0001_initial.sql or 0002_budget.sql.
226        // A future migration (0003_quota.sql) would add
227        // ff_quota_policy + ff_quota_usage; this stage becomes a
228        // FOR SHARE policy-read + current-count compare at that
229        // point. Keeping the slot in the pipeline as a doc/ack:
230        let _quota_skipped_no_schema = ();
231
232        // ── Stage 5: issue signed ClaimGrant ──
233        let now = now_ms();
234        let expires_at_ms = now.saturating_add_unsigned(grant_ttl_ms.min(i64::MAX as u64));
235
236        // Sign over the fence-relevant fields. Verification later
237        // re-constructs the same message.
238        let partition = Partition {
239            family: PartitionFamily::Execution,
240            index: part as u16,
241        };
242        let hash_tag = partition.hash_tag();
243        let message = format!(
244            "{hash_tag}|{exec_uuid}|{wid}|{wiid}|{exp}",
245            wid = worker_id.as_str(),
246            wiid = worker_instance_id.as_str(),
247            exp = expires_at_ms,
248        );
249        let sig = hmac_sign(secret, kid, message.as_bytes());
250        let grant_key = format!("pg:{hash_tag}:{exec_uuid}:{expires_at_ms}:{sig}");
251
252        // RFC-024 PR-D: persist the grant into `ff_claim_grant` (the
253        // properly-shaped table with `kind` discriminator). Pre-PR-D
254        // this used the JSON stash at `ff_exec_core.raw_fields.claim_grant`
255        // — the JSON column is left in place for one release for
256        // backfill safety (RFC §5 / §10) but is no longer consulted
257        // by the read path.
258        crate::claim_grant::write_claim_grant(
259            &mut tx,
260            part,
261            &grant_key,
262            exec_uuid,
263            worker_id.as_str(),
264            worker_instance_id.as_str(),
265            grant_ttl_ms,
266            now,
267            expires_at_ms,
268        )
269        .await?;
270        sqlx::query(
271            r#"
272            UPDATE ff_exec_core
273               SET eligibility_state = 'pending_claim'
274             WHERE partition_key = $1 AND execution_id = $2
275            "#,
276        )
277        .bind(part)
278        .bind(exec_uuid)
279        .execute(&mut *tx)
280        .await
281        .map_err(map_sqlx_error)?;
282
283        tx.commit().await.map_err(map_sqlx_error)?;
284
285        // Build ClaimGrant wire-type.
286        let eid = ExecutionId::parse(&format!("{{fp:{part}}}:{exec_uuid}")).map_err(|e| {
287            EngineError::Validation {
288                kind: ValidationKind::Corruption,
289                detail: format!("scheduler: reassembling exec id: {e}"),
290            }
291        })?;
292        Ok(Some(ClaimGrant::new(
293            eid,
294            PartitionKey::from(&partition),
295            grant_key,
296            expires_at_ms as u64,
297        )))
298    }
299}
300
301/// Verify a grant produced by [`PostgresScheduler::claim_for_worker`].
302///
303/// Returns `Ok(())` iff the signature embedded in `grant.grant_key`
304/// verifies under the kid stored with it and `expires_at_ms` is in
305/// the future. Exposed for test / consumer use; a production
306/// `claim_from_grant` on Postgres will extend this with fence-check
307/// into `ff_attempt`.
308pub async fn verify_grant(pool: &PgPool, grant: &ClaimGrant) -> Result<(), GrantVerifyError> {
309    // Parse: pg:<hash-tag>:<uuid>:<expires_ms>:<kid>:<hex>
310    let s = grant.grant_key.as_str();
311    let rest = s.strip_prefix("pg:").ok_or(GrantVerifyError::Malformed)?;
312    // hash_tag contains ':' internally ("{fp:7}"), so split from the
313    // right: kid:hex is the final `kid:hex` segment, preceded by
314    // expires_ms, preceded by uuid, preceded by the hash tag.
315    let mut parts: Vec<&str> = rest.rsplitn(4, ':').collect(); // [hex, kid, expires_ms, hash_tag:uuid?]
316    // rsplitn from the right yields in reverse order; we need 4
317    // segments: hex, kid, expires, and the left-over prefix
318    // hash_tag:uuid (which still contains one `:`).
319    if parts.len() != 4 {
320        return Err(GrantVerifyError::Malformed);
321    }
322    let hex_part = parts.remove(0);
323    let kid = parts.remove(0);
324    let expires_str = parts.remove(0);
325    let left = parts.remove(0); // "{fp:N}:<uuid>"
326    let expires_at_ms: i64 = expires_str.parse().map_err(|_| GrantVerifyError::Malformed)?;
327    if expires_at_ms <= now_ms() {
328        return Err(GrantVerifyError::Expired);
329    }
330    // Split left into hash_tag and uuid. hash_tag ends at `}`.
331    let close = left.find("}:").ok_or(GrantVerifyError::Malformed)?;
332    let hash_tag = &left[..=close]; // includes `}`
333    let uuid_str = &left[close + 2..];
334
335    // Look up the kid's secret (may be inactive — grace window).
336    let secret = crate::signal::fetch_kid(pool, kid)
337        .await
338        .map_err(|_| GrantVerifyError::Transport)?
339        .ok_or(GrantVerifyError::UnknownKid)?;
340
341    // Reconstruct the signed message.
342    let wid_wiid = read_grant_identity(pool, grant).await?;
343    let message = format!(
344        "{hash_tag}|{uuid_str}|{wid}|{wiid}|{expires_at_ms}",
345        wid = wid_wiid.0,
346        wiid = wid_wiid.1,
347    );
348    let token = format!("{kid}:{hex_part}");
349    crate::signal::hmac_verify(&secret, kid, message.as_bytes(), &token)
350        .map_err(|_| GrantVerifyError::SignatureMismatch)?;
351    Ok(())
352}
353
354/// Read the worker identity for a claim grant by (partition_key,
355/// execution_id). RFC-024 PR-D: reads from the new `ff_claim_grant`
356/// table (kind='claim'); pre-PR-D this read from
357/// `ff_exec_core.raw_fields.claim_grant`.
358async fn read_grant_identity(
359    pool: &PgPool,
360    grant: &ClaimGrant,
361) -> Result<(String, String), GrantVerifyError> {
362    let partition = grant.partition().map_err(|_| GrantVerifyError::Malformed)?;
363    let part = partition.index as i16;
364    let uuid_str = grant
365        .execution_id
366        .as_str()
367        .split_once("}:")
368        .map(|(_, u)| u)
369        .ok_or(GrantVerifyError::Malformed)?;
370    let exec_uuid = Uuid::parse_str(uuid_str).map_err(|_| GrantVerifyError::Malformed)?;
371    let ident = crate::claim_grant::read_claim_grant_identity(pool, part, exec_uuid)
372        .await
373        .map_err(|_| GrantVerifyError::Transport)?
374        .ok_or(GrantVerifyError::UnknownGrant)?;
375    Ok(ident)
376}
377
378/// Errors from [`verify_grant`].
379#[derive(Debug, thiserror::Error)]
380pub enum GrantVerifyError {
381    #[error("grant_key malformed")]
382    Malformed,
383    #[error("grant expired")]
384    Expired,
385    #[error("unknown kid in grant")]
386    UnknownKid,
387    #[error("unknown grant — no row with matching claim_grant in exec_core")]
388    UnknownGrant,
389    #[error("signature verification failed")]
390    SignatureMismatch,
391    #[error("transport error while verifying grant")]
392    Transport,
393}
394
395/// Budget admission for a single budget_id. Returns `Ok(true)` when
396/// the budget admits (or is unknown — fail-open on missing policy,
397/// matching the Valkey fallback on non-UUID test IDs), `Ok(false)`
398/// on hard breach.
399async fn admit_budget(
400    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
401    budget_id: &str,
402) -> Result<bool, EngineError> {
403    // Compute partition for this budget. The id might be a UUID or a
404    // test literal — fall back to partition 0 on parse failure, same
405    // as the Valkey BudgetChecker.
406    let partition_key: i16 = ff_core::types::BudgetId::parse(budget_id)
407        .map(|bid| {
408            ff_core::partition::budget_partition(&bid, &ff_core::partition::PartitionConfig::default())
409                .index as i16
410        })
411        .unwrap_or(0);
412
413    // FOR SHARE on the policy row — protects against concurrent
414    // rotation while we read hard_limit.
415    let policy: Option<JsonValue> = sqlx::query_scalar(
416        r#"
417        SELECT policy_json FROM ff_budget_policy
418         WHERE partition_key = $1 AND budget_id = $2
419         FOR SHARE
420        "#,
421    )
422    .bind(partition_key)
423    .bind(budget_id)
424    .fetch_optional(&mut **tx)
425    .await
426    .map_err(map_sqlx_error)?;
427    let Some(policy) = policy else {
428        // No policy row — nothing to enforce.
429        return Ok(true);
430    };
431
432    // Extract hard_limit + dimension. Shape matches the Wave 4f
433    // upsert_policy_for_test fixture: { "dimension": "tokens",
434    // "hard_limit": <u64> } (either at top-level or under "hard").
435    let hard_limit = policy
436        .get("hard_limit")
437        .and_then(JsonValue::as_u64)
438        .or_else(|| {
439            policy
440                .get("hard")
441                .and_then(JsonValue::as_object)
442                .and_then(|o| o.values().next())
443                .and_then(JsonValue::as_u64)
444        });
445    let dimension = policy
446        .get("dimension")
447        .and_then(JsonValue::as_str)
448        .map(str::to_owned)
449        .unwrap_or_else(|| "default".to_owned());
450    let Some(hard_limit) = hard_limit else {
451        return Ok(true);
452    };
453
454    // Sum current_value across dimension rows. A missing usage row
455    // means 0 used.
456    let current: Option<i64> = sqlx::query_scalar(
457        r#"
458        SELECT current_value FROM ff_budget_usage
459         WHERE partition_key = $1 AND budget_id = $2 AND dimensions_key = $3
460         FOR SHARE
461        "#,
462    )
463    .bind(partition_key)
464    .bind(budget_id)
465    .bind(&dimension)
466    .fetch_optional(&mut **tx)
467    .await
468    .map_err(map_sqlx_error)?;
469    let current = current.unwrap_or(0).max(0) as u64;
470
471    // Hard-limit rule: must have room for at least one more unit. No
472    // pre-reservation — the worker reports usage after execution.
473    Ok(current < hard_limit)
474}
475
476fn now_ms() -> i64 {
477    i64::try_from(
478        SystemTime::now()
479            .duration_since(UNIX_EPOCH)
480            .map(|d| d.as_millis())
481            .unwrap_or(0),
482    )
483    .unwrap_or(i64::MAX)
484}