ff_backend_postgres/scheduler.rs
1//! Admission-pipeline for the Postgres backend (RFC-v0.7 Wave 5b).
2//!
3//! Mirrors `ff-scheduler::claim::Scheduler::claim_for_worker` on Valkey:
4//! find an eligible execution, match capabilities, admit under budget,
5//! admit under quota, and issue a signed [`ClaimGrant`]. Returns
6//! `Ok(None)` when no candidate is admissible.
7//!
8//! # Pipeline
9//!
10//! All six pipeline stages run in one `READ COMMITTED` transaction
11//! (Q11):
12//!
13//! 1. **Eligible pick.** `SELECT ... FROM ff_exec_core WHERE lane_id =
14//! $1 AND lifecycle_phase = 'runnable' AND eligibility_state =
15//! 'eligible_now' ORDER BY priority DESC, created_at_ms ASC FOR
16//! UPDATE SKIP LOCKED LIMIT N`. Over-fetches so capability-subset
17//! filtering has candidates after per-row rejects.
18//! 2. **Capability subset-match.** Each over-fetched row is filtered
19//! via [`ff_core::caps::matches`]. First matching row wins.
20//! 3. **Budget admission.** If the exec row carries `budget_ids`,
21//! each referenced [`ff_budget_policy`] row is `FOR SHARE`-locked,
22//! its `policy_json` parsed for `hard_limit` + `dimension`, and
23//! the current `ff_budget_usage` value compared. Breach →
24//! `Ok(None)`, row left eligible for another worker/tick.
25//! 4. **Quota admission.** There is no quota schema in 0001/0002.
26//! This stage is a no-op and reported as "skipped — quota schema
27//! not yet migrated" in the return channel. Grep of the migrations
28//! directory confirms: only budget + core + suspension + stream
29//! tables exist.
30//! 5. **Issue ClaimGrant.** Uses the Wave-4d global
31//! [`ff_waitpoint_hmac`] keystore via [`hmac_sign`] — the same
32//! primitive signing waitpoint tokens. The signed blob rides
33//! inside [`ClaimGrant::grant_key`] as
34//! `pg:<hash-tag>:<uuid>:<expires_ms>:<kid>:<hex>`. The grant
35//! itself is stashed into `ff_exec_core.raw_fields.claim_grant`
36//! (no schema addition — Wave-4b already uses `raw_fields` as the
37//! untyped-column overflow; see `progress`).
38//! 6. **Commit + return grant.**
39//!
40//! # Isolation (Q11)
41//!
42//! READ COMMITTED + `FOR UPDATE SKIP LOCKED` on the eligible pick +
43//! `FOR SHARE` on budget policy rows. No SERIALIZABLE retries.
44//!
45//! # Scheduler integration
46//!
47//! `ff-scheduler` today is Valkey-specific (`ferriskey::Client`
48//! embedded in `Scheduler`). Rather than add `ff-backend-postgres`
49//! (and its sqlx transitive graph) as a dep of ff-scheduler — which
50//! would pollute every consumer (ff-server, ff-observability,
51//! ff-test, ff-readiness-tests, ff-script, ff-backend-valkey) — the
52//! Postgres variant lives here as a free-standing [`PostgresScheduler`]
53//! struct. ff-server dispatches against the concrete backend type it
54//! constructed (Valkey → `ff_scheduler::Scheduler`, Postgres →
55//! `ff_backend_postgres::scheduler::PostgresScheduler`); no trait-
56//! object indirection needed because the engine already decides
57//! backend at boot.
58
59use std::collections::BTreeSet;
60use std::time::{SystemTime, UNIX_EPOCH};
61
62use ff_core::caps::{matches as caps_matches, CapabilityRequirement};
63use ff_core::contracts::ClaimGrant;
64use ff_core::engine_error::{EngineError, ValidationKind};
65use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
66use ff_core::types::{ExecutionId, LaneId, WorkerId, WorkerInstanceId};
67use serde_json::Value as JsonValue;
68use sqlx::{PgPool, Row};
69use uuid::Uuid;
70
71use crate::error::map_sqlx_error;
72use crate::signal::{current_active_kid, hmac_sign};
73
74/// Over-fetch size for the eligible pick. Gives per-row capability
75/// filter some headroom before giving up — matches the Valkey path's
76/// "pick 10 and filter" shape.
77const ELIGIBLE_OVERFETCH: i64 = 10;
78
79/// Postgres admission pipeline. See the module rustdoc for
80/// pipeline + isolation notes.
81pub struct PostgresScheduler {
82 pool: PgPool,
83}
84
85impl PostgresScheduler {
86 pub fn new(pool: PgPool) -> Self {
87 Self { pool }
88 }
89
90 /// Find an eligible execution, admit it against budget + quota,
91 /// and issue a signed [`ClaimGrant`]. Returns `Ok(None)` when no
92 /// candidate is admissible on this lane right now.
93 pub async fn claim_for_worker(
94 &self,
95 lane: &LaneId,
96 worker_id: &WorkerId,
97 worker_instance_id: &WorkerInstanceId,
98 worker_capabilities: &BTreeSet<String>,
99 grant_ttl_ms: u64,
100 ) -> Result<Option<ClaimGrant>, EngineError> {
101 // Read the active HMAC kid up-front — if the keystore is
102 // empty we refuse to issue grants (fail-closed, matches the
103 // Valkey path where ff_issue_claim_grant requires a secret).
104 let (kid, secret) = match current_active_kid(&self.pool).await? {
105 Some(v) => v,
106 None => {
107 return Err(EngineError::Unavailable {
108 op: "claim_for_worker: ff_waitpoint_hmac keystore empty",
109 });
110 }
111 };
112
113 // Iterate partitions 0..256. Over each partition we run the
114 // full admission tx. Matches the Valkey scheduler's partition-
115 // walk shape; the bounded-scan / rotation-cursor machinery is
116 // Valkey-specific (lives in ff-scheduler) and not ported here.
117 const TOTAL_PARTITIONS: i16 = 256;
118 for part in 0..TOTAL_PARTITIONS {
119 if let Some(grant) = self
120 .try_claim_in_partition(
121 part,
122 lane,
123 worker_id,
124 worker_instance_id,
125 worker_capabilities,
126 grant_ttl_ms,
127 &kid,
128 &secret,
129 )
130 .await?
131 {
132 return Ok(Some(grant));
133 }
134 }
135 Ok(None)
136 }
137
138 #[allow(clippy::too_many_arguments)]
139 async fn try_claim_in_partition(
140 &self,
141 part: i16,
142 lane: &LaneId,
143 worker_id: &WorkerId,
144 worker_instance_id: &WorkerInstanceId,
145 worker_capabilities: &BTreeSet<String>,
146 grant_ttl_ms: u64,
147 kid: &str,
148 secret: &[u8],
149 ) -> Result<Option<ClaimGrant>, EngineError> {
150 let mut tx = self.pool.begin().await.map_err(map_sqlx_error)?;
151
152 // ── Stage 1: eligible pick (FOR UPDATE SKIP LOCKED) ──
153 let rows = sqlx::query(
154 r#"
155 SELECT execution_id, required_capabilities, raw_fields
156 FROM ff_exec_core
157 WHERE partition_key = $1
158 AND lane_id = $2
159 AND lifecycle_phase = 'runnable'
160 AND eligibility_state = 'eligible_now'
161 ORDER BY priority DESC, created_at_ms ASC
162 FOR UPDATE SKIP LOCKED
163 LIMIT $3
164 "#,
165 )
166 .bind(part)
167 .bind(lane.as_str())
168 .bind(ELIGIBLE_OVERFETCH)
169 .fetch_all(&mut *tx)
170 .await
171 .map_err(map_sqlx_error)?;
172
173 if rows.is_empty() {
174 tx.rollback().await.map_err(map_sqlx_error)?;
175 return Ok(None);
176 }
177
178 // ── Stage 2: capability subset-match ──
179 let mut picked: Option<(Uuid, JsonValue)> = None;
180 for row in &rows {
181 let required: Vec<String> = row
182 .try_get::<Vec<String>, _>("required_capabilities")
183 .map_err(map_sqlx_error)?;
184 let req = CapabilityRequirement::new(required);
185 let worker_set = ff_core::backend::CapabilitySet::new(worker_capabilities.iter().cloned());
186 if !caps_matches(&req, &worker_set) {
187 continue;
188 }
189 let eid: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
190 let raw: JsonValue = row.try_get("raw_fields").map_err(map_sqlx_error)?;
191 picked = Some((eid, raw));
192 break;
193 }
194 let Some((exec_uuid, raw_fields)) = picked else {
195 tx.rollback().await.map_err(map_sqlx_error)?;
196 return Ok(None);
197 };
198
199 // ── Stage 3: budget admission ──
200 // `budget_ids` is stashed in raw_fields as a comma-separated
201 // string (matches the Valkey HGET shape). Missing field → no
202 // budget attached; empty string → no budget attached.
203 let budget_ids: Vec<String> = raw_fields
204 .get("budget_ids")
205 .and_then(JsonValue::as_str)
206 .map(|s| {
207 s.split(',')
208 .map(str::trim)
209 .filter(|s| !s.is_empty())
210 .map(str::to_owned)
211 .collect()
212 })
213 .unwrap_or_default();
214
215 for bid in &budget_ids {
216 if !admit_budget(&mut tx, bid).await? {
217 // Breach — leave the row eligible (rollback releases
218 // the FOR UPDATE lock without mutating the row).
219 tx.rollback().await.map_err(map_sqlx_error)?;
220 return Ok(None);
221 }
222 }
223
224 // ── Stage 4: quota admission (no schema → skipped) ──
225 // Quota tables are not in 0001_initial.sql or 0002_budget.sql.
226 // A future migration (0003_quota.sql) would add
227 // ff_quota_policy + ff_quota_usage; this stage becomes a
228 // FOR SHARE policy-read + current-count compare at that
229 // point. Keeping the slot in the pipeline as a doc/ack:
230 let _quota_skipped_no_schema = ();
231
232 // ── Stage 5: issue signed ClaimGrant ──
233 let now = now_ms();
234 let expires_at_ms = now.saturating_add_unsigned(grant_ttl_ms.min(i64::MAX as u64));
235
236 // Sign over the fence-relevant fields. Verification later
237 // re-constructs the same message.
238 let partition = Partition {
239 family: PartitionFamily::Execution,
240 index: part as u16,
241 };
242 let hash_tag = partition.hash_tag();
243 let message = format!(
244 "{hash_tag}|{exec_uuid}|{wid}|{wiid}|{exp}",
245 wid = worker_id.as_str(),
246 wiid = worker_instance_id.as_str(),
247 exp = expires_at_ms,
248 );
249 let sig = hmac_sign(secret, kid, message.as_bytes());
250 let grant_key = format!("pg:{hash_tag}:{exec_uuid}:{expires_at_ms}:{sig}");
251
252 // RFC-024 PR-D: persist the grant into `ff_claim_grant` (the
253 // properly-shaped table with `kind` discriminator). Pre-PR-D
254 // this used the JSON stash at `ff_exec_core.raw_fields.claim_grant`
255 // — the JSON column is left in place for one release for
256 // backfill safety (RFC §5 / §10) but is no longer consulted
257 // by the read path.
258 crate::claim_grant::write_claim_grant(
259 &mut tx,
260 part,
261 &grant_key,
262 exec_uuid,
263 worker_id.as_str(),
264 worker_instance_id.as_str(),
265 grant_ttl_ms,
266 now,
267 expires_at_ms,
268 )
269 .await?;
270 sqlx::query(
271 r#"
272 UPDATE ff_exec_core
273 SET eligibility_state = 'pending_claim'
274 WHERE partition_key = $1 AND execution_id = $2
275 "#,
276 )
277 .bind(part)
278 .bind(exec_uuid)
279 .execute(&mut *tx)
280 .await
281 .map_err(map_sqlx_error)?;
282
283 tx.commit().await.map_err(map_sqlx_error)?;
284
285 // Build ClaimGrant wire-type.
286 let eid = ExecutionId::parse(&format!("{{fp:{part}}}:{exec_uuid}")).map_err(|e| {
287 EngineError::Validation {
288 kind: ValidationKind::Corruption,
289 detail: format!("scheduler: reassembling exec id: {e}"),
290 }
291 })?;
292 Ok(Some(ClaimGrant::new(
293 eid,
294 PartitionKey::from(&partition),
295 grant_key,
296 expires_at_ms as u64,
297 )))
298 }
299}
300
301/// Verify a grant produced by [`PostgresScheduler::claim_for_worker`].
302///
303/// Returns `Ok(())` iff the signature embedded in `grant.grant_key`
304/// verifies under the kid stored with it and `expires_at_ms` is in
305/// the future. Exposed for test / consumer use; a production
306/// `claim_from_grant` on Postgres will extend this with fence-check
307/// into `ff_attempt`.
308pub async fn verify_grant(pool: &PgPool, grant: &ClaimGrant) -> Result<(), GrantVerifyError> {
309 // Parse: pg:<hash-tag>:<uuid>:<expires_ms>:<kid>:<hex>
310 let s = grant.grant_key.as_str();
311 let rest = s.strip_prefix("pg:").ok_or(GrantVerifyError::Malformed)?;
312 // hash_tag contains ':' internally ("{fp:7}"), so split from the
313 // right: kid:hex is the final `kid:hex` segment, preceded by
314 // expires_ms, preceded by uuid, preceded by the hash tag.
315 let mut parts: Vec<&str> = rest.rsplitn(4, ':').collect(); // [hex, kid, expires_ms, hash_tag:uuid?]
316 // rsplitn from the right yields in reverse order; we need 4
317 // segments: hex, kid, expires, and the left-over prefix
318 // hash_tag:uuid (which still contains one `:`).
319 if parts.len() != 4 {
320 return Err(GrantVerifyError::Malformed);
321 }
322 let hex_part = parts.remove(0);
323 let kid = parts.remove(0);
324 let expires_str = parts.remove(0);
325 let left = parts.remove(0); // "{fp:N}:<uuid>"
326 let expires_at_ms: i64 = expires_str.parse().map_err(|_| GrantVerifyError::Malformed)?;
327 if expires_at_ms <= now_ms() {
328 return Err(GrantVerifyError::Expired);
329 }
330 // Split left into hash_tag and uuid. hash_tag ends at `}`.
331 let close = left.find("}:").ok_or(GrantVerifyError::Malformed)?;
332 let hash_tag = &left[..=close]; // includes `}`
333 let uuid_str = &left[close + 2..];
334
335 // Look up the kid's secret (may be inactive — grace window).
336 let secret = crate::signal::fetch_kid(pool, kid)
337 .await
338 .map_err(|_| GrantVerifyError::Transport)?
339 .ok_or(GrantVerifyError::UnknownKid)?;
340
341 // Reconstruct the signed message.
342 let wid_wiid = read_grant_identity(pool, grant).await?;
343 let message = format!(
344 "{hash_tag}|{uuid_str}|{wid}|{wiid}|{expires_at_ms}",
345 wid = wid_wiid.0,
346 wiid = wid_wiid.1,
347 );
348 let token = format!("{kid}:{hex_part}");
349 crate::signal::hmac_verify(&secret, kid, message.as_bytes(), &token)
350 .map_err(|_| GrantVerifyError::SignatureMismatch)?;
351 Ok(())
352}
353
354/// Read the worker identity for a claim grant by (partition_key,
355/// execution_id). RFC-024 PR-D: reads from the new `ff_claim_grant`
356/// table (kind='claim'); pre-PR-D this read from
357/// `ff_exec_core.raw_fields.claim_grant`.
358async fn read_grant_identity(
359 pool: &PgPool,
360 grant: &ClaimGrant,
361) -> Result<(String, String), GrantVerifyError> {
362 let partition = grant.partition().map_err(|_| GrantVerifyError::Malformed)?;
363 let part = partition.index as i16;
364 let uuid_str = grant
365 .execution_id
366 .as_str()
367 .split_once("}:")
368 .map(|(_, u)| u)
369 .ok_or(GrantVerifyError::Malformed)?;
370 let exec_uuid = Uuid::parse_str(uuid_str).map_err(|_| GrantVerifyError::Malformed)?;
371 let ident = crate::claim_grant::read_claim_grant_identity(pool, part, exec_uuid)
372 .await
373 .map_err(|_| GrantVerifyError::Transport)?
374 .ok_or(GrantVerifyError::UnknownGrant)?;
375 Ok(ident)
376}
377
378/// Errors from [`verify_grant`].
379#[derive(Debug, thiserror::Error)]
380pub enum GrantVerifyError {
381 #[error("grant_key malformed")]
382 Malformed,
383 #[error("grant expired")]
384 Expired,
385 #[error("unknown kid in grant")]
386 UnknownKid,
387 #[error("unknown grant — no row with matching claim_grant in exec_core")]
388 UnknownGrant,
389 #[error("signature verification failed")]
390 SignatureMismatch,
391 #[error("transport error while verifying grant")]
392 Transport,
393}
394
395/// Budget admission for a single budget_id. Returns `Ok(true)` when
396/// the budget admits (or is unknown — fail-open on missing policy,
397/// matching the Valkey fallback on non-UUID test IDs), `Ok(false)`
398/// on hard breach.
399async fn admit_budget(
400 tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
401 budget_id: &str,
402) -> Result<bool, EngineError> {
403 // Compute partition for this budget. The id might be a UUID or a
404 // test literal — fall back to partition 0 on parse failure, same
405 // as the Valkey BudgetChecker.
406 let partition_key: i16 = ff_core::types::BudgetId::parse(budget_id)
407 .map(|bid| {
408 ff_core::partition::budget_partition(&bid, &ff_core::partition::PartitionConfig::default())
409 .index as i16
410 })
411 .unwrap_or(0);
412
413 // FOR SHARE on the policy row — protects against concurrent
414 // rotation while we read hard_limit.
415 let policy: Option<JsonValue> = sqlx::query_scalar(
416 r#"
417 SELECT policy_json FROM ff_budget_policy
418 WHERE partition_key = $1 AND budget_id = $2
419 FOR SHARE
420 "#,
421 )
422 .bind(partition_key)
423 .bind(budget_id)
424 .fetch_optional(&mut **tx)
425 .await
426 .map_err(map_sqlx_error)?;
427 let Some(policy) = policy else {
428 // No policy row — nothing to enforce.
429 return Ok(true);
430 };
431
432 // Extract hard_limit + dimension. Shape matches the Wave 4f
433 // upsert_policy_for_test fixture: { "dimension": "tokens",
434 // "hard_limit": <u64> } (either at top-level or under "hard").
435 let hard_limit = policy
436 .get("hard_limit")
437 .and_then(JsonValue::as_u64)
438 .or_else(|| {
439 policy
440 .get("hard")
441 .and_then(JsonValue::as_object)
442 .and_then(|o| o.values().next())
443 .and_then(JsonValue::as_u64)
444 });
445 let dimension = policy
446 .get("dimension")
447 .and_then(JsonValue::as_str)
448 .map(str::to_owned)
449 .unwrap_or_else(|| "default".to_owned());
450 let Some(hard_limit) = hard_limit else {
451 return Ok(true);
452 };
453
454 // Sum current_value across dimension rows. A missing usage row
455 // means 0 used.
456 let current: Option<i64> = sqlx::query_scalar(
457 r#"
458 SELECT current_value FROM ff_budget_usage
459 WHERE partition_key = $1 AND budget_id = $2 AND dimensions_key = $3
460 FOR SHARE
461 "#,
462 )
463 .bind(partition_key)
464 .bind(budget_id)
465 .bind(&dimension)
466 .fetch_optional(&mut **tx)
467 .await
468 .map_err(map_sqlx_error)?;
469 let current = current.unwrap_or(0).max(0) as u64;
470
471 // Hard-limit rule: must have room for at least one more unit. No
472 // pre-reservation — the worker reports usage after execution.
473 Ok(current < hard_limit)
474}
475
476fn now_ms() -> i64 {
477 i64::try_from(
478 SystemTime::now()
479 .duration_since(UNIX_EPOCH)
480 .map(|d| d.as_millis())
481 .unwrap_or(0),
482 )
483 .unwrap_or(i64::MAX)
484}