use ff_backend_postgres::signal::{
current_active_kid, fetch_kid, hmac_sign, hmac_verify,
rotate_waitpoint_hmac_secret_all_impl, seed_waitpoint_hmac_secret_impl, HmacVerifyError,
};
use ff_backend_postgres::PostgresBackend;
use ff_core::backend::{BackendTag, Handle, HandleKind};
use ff_core::contracts::{
ClaimResumedExecutionArgs, ClaimResumedExecutionResult, CompositeBody, CountKind,
DeliverSignalArgs, DeliverSignalResult, IdempotencyKey, ResumeCondition, ResumePolicy,
RotateWaitpointHmacSecretAllArgs, RotateWaitpointHmacSecretOutcome, SeedOutcome,
SeedWaitpointHmacSecretArgs, SignalMatcher,
SuspendArgs, SuspendOutcome, SuspensionReasonCode, WaitpointBinding,
};
use ff_core::engine_backend::EngineBackend;
use ff_core::engine_error::{ConflictKind, ContentionKind, EngineError};
use ff_core::handle_codec::{encode as encode_opaque, HandlePayload};
use ff_core::partition::PartitionConfig;
use ff_core::types::{
AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseFence, LeaseId, SignalId,
SuspensionId, TimestampMs, WaitpointId, WaitpointToken, WorkerId, WorkerInstanceId,
};
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use uuid::Uuid;
async fn setup_or_skip() -> Option<PgPool> {
let url = std::env::var("FF_PG_TEST_URL").ok()?;
let bootstrap = PgPoolOptions::new()
.max_connections(1)
.connect(&url)
.await
.expect("connect to FF_PG_TEST_URL");
ff_backend_postgres::apply_migrations(&bootstrap)
.await
.expect("apply_migrations clean");
let schema = format!("ffpg_test_{}", Uuid::new_v4().simple());
sqlx::query(&format!("CREATE SCHEMA {schema}"))
.execute(&bootstrap)
.await
.expect("create per-test schema");
sqlx::query(&format!(
"CREATE TABLE {schema}.ff_waitpoint_hmac \
(LIKE public.ff_waitpoint_hmac INCLUDING ALL)"
))
.execute(&bootstrap)
.await
.expect("create per-test ff_waitpoint_hmac");
bootstrap.close().await;
let schema_for_hook = schema.clone();
let pool = PgPoolOptions::new()
.max_connections(8)
.after_connect(move |conn, _meta| {
let schema = schema_for_hook.clone();
Box::pin(async move {
sqlx::query(&format!("SET search_path TO {schema}, public"))
.execute(conn)
.await?;
Ok(())
})
})
.connect(&url)
.await
.expect("connect to FF_PG_TEST_URL");
Some(pool)
}
fn args(new_kid: &str, new_secret_hex: &str) -> RotateWaitpointHmacSecretAllArgs {
RotateWaitpointHmacSecretAllArgs::new(new_kid, new_secret_hex, 0)
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn rotate_all_happy_path() {
let Some(pool) = setup_or_skip().await else {
eprintln!("FF_PG_TEST_URL not set — skipping");
return;
};
let res = rotate_waitpoint_hmac_secret_all_impl(
&pool,
args("kid-1", &"11".repeat(32)),
1_000_000,
)
.await
.expect("rotate_all");
assert_eq!(res.entries.len(), 1, "one global entry per Q4");
let entry = res.entries.into_iter().next().unwrap();
assert_eq!(entry.partition, 0, "partition=0 on Postgres");
match entry.result.expect("rotate_all success") {
RotateWaitpointHmacSecretOutcome::Rotated {
previous_kid,
new_kid,
gc_count,
} => {
assert_eq!(previous_kid, None, "bootstrap has no prior kid");
assert_eq!(new_kid, "kid-1");
assert_eq!(gc_count, 0);
}
other => panic!("expected Rotated, got {other:?}"),
}
let (active_kid, _secret) =
current_active_kid(&pool).await.unwrap().expect("active kid present");
assert_eq!(active_kid, "kid-1");
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn rotate_all_replay_is_noop() {
let Some(pool) = setup_or_skip().await else {
return;
};
let secret = "aa".repeat(32);
rotate_waitpoint_hmac_secret_all_impl(&pool, args("kid-1", &secret), 1_000_000)
.await
.expect("first rotate")
.entries
.into_iter()
.next()
.unwrap()
.result
.expect("first rotate ok");
let res = rotate_waitpoint_hmac_secret_all_impl(&pool, args("kid-1", &secret), 2_000_000)
.await
.expect("second rotate");
let outcome = res.entries.into_iter().next().unwrap().result.unwrap();
assert!(matches!(outcome, RotateWaitpointHmacSecretOutcome::Noop { .. }));
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn rotate_all_kid_conflict_rejects() {
let Some(pool) = setup_or_skip().await else {
return;
};
rotate_waitpoint_hmac_secret_all_impl(
&pool,
args("kid-1", &"aa".repeat(32)),
1_000_000,
)
.await
.unwrap();
let res = rotate_waitpoint_hmac_secret_all_impl(
&pool,
args("kid-1", &"bb".repeat(32)),
2_000_000,
)
.await
.unwrap();
let err = res.entries.into_iter().next().unwrap().result.unwrap_err();
assert!(matches!(
err,
EngineError::Conflict(ConflictKind::RotationConflict(_))
));
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn seed_happy_path_installs_new_kid() {
let Some(pool) = setup_or_skip().await else {
return;
};
let secret = "cc".repeat(32);
let out = seed_waitpoint_hmac_secret_impl(
&pool,
SeedWaitpointHmacSecretArgs::new("kid-seed", secret.clone()),
1_000_000,
)
.await
.expect("seed ok");
assert!(matches!(out, SeedOutcome::Seeded { ref kid } if kid == "kid-seed"));
let (active_kid, _) = current_active_kid(&pool).await.unwrap().expect("active present");
assert_eq!(active_kid, "kid-seed");
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn seed_replay_same_kid_and_secret_reports_already_seeded_matching() {
let Some(pool) = setup_or_skip().await else {
return;
};
let secret = "cc".repeat(32);
seed_waitpoint_hmac_secret_impl(
&pool,
SeedWaitpointHmacSecretArgs::new("kid-seed", secret.clone()),
1_000_000,
)
.await
.unwrap();
let out = seed_waitpoint_hmac_secret_impl(
&pool,
SeedWaitpointHmacSecretArgs::new("kid-seed", secret),
2_000_000,
)
.await
.unwrap();
assert!(matches!(
out,
SeedOutcome::AlreadySeeded { ref kid, same_secret: true } if kid == "kid-seed"
));
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn seed_replay_same_kid_different_secret_reports_mismatch() {
let Some(pool) = setup_or_skip().await else {
return;
};
seed_waitpoint_hmac_secret_impl(
&pool,
SeedWaitpointHmacSecretArgs::new("kid-seed", "aa".repeat(32)),
1_000_000,
)
.await
.unwrap();
let out = seed_waitpoint_hmac_secret_impl(
&pool,
SeedWaitpointHmacSecretArgs::new("kid-seed", "bb".repeat(32)),
2_000_000,
)
.await
.unwrap();
assert!(matches!(
out,
SeedOutcome::AlreadySeeded { same_secret: false, .. }
));
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn seed_different_active_kid_is_validation_error() {
let Some(pool) = setup_or_skip().await else {
return;
};
seed_waitpoint_hmac_secret_impl(
&pool,
SeedWaitpointHmacSecretArgs::new("kid-one", "aa".repeat(32)),
1_000_000,
)
.await
.unwrap();
let err = seed_waitpoint_hmac_secret_impl(
&pool,
SeedWaitpointHmacSecretArgs::new("kid-two", "bb".repeat(32)),
2_000_000,
)
.await
.unwrap_err();
assert!(matches!(err, EngineError::Validation { .. }));
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn seed_rejects_invalid_secret_hex_length() {
let Some(pool) = setup_or_skip().await else {
return;
};
let err = seed_waitpoint_hmac_secret_impl(
&pool,
SeedWaitpointHmacSecretArgs::new("kid-seed", "shortsecret"),
1_000_000,
)
.await
.unwrap_err();
assert!(matches!(err, EngineError::Validation { .. }));
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn hmac_sign_verify_round_trip_against_rotated_kid() {
let Some(pool) = setup_or_skip().await else {
return;
};
let secret_hex = "cafebabe".repeat(8); let secret_bytes = hex::decode(&secret_hex).unwrap();
rotate_waitpoint_hmac_secret_all_impl(&pool, args("kid-v1", &secret_hex), 1_000_000)
.await
.unwrap();
let (kid, resolved_secret) = current_active_kid(&pool).await.unwrap().unwrap();
assert_eq!(kid, "kid-v1");
assert_eq!(resolved_secret, secret_bytes);
let token = hmac_sign(&resolved_secret, &kid, b"exec-42:wp-7");
hmac_verify(&resolved_secret, &kid, b"exec-42:wp-7", &token)
.expect("token verifies under current kid");
rotate_waitpoint_hmac_secret_all_impl(
&pool,
args("kid-v2", &"00".repeat(32)),
2_000_000,
)
.await
.unwrap();
let kid_v1_secret = fetch_kid(&pool, "kid-v1").await.unwrap().unwrap();
assert_eq!(kid_v1_secret, secret_bytes, "prior kid retained for grace");
hmac_verify(&kid_v1_secret, "kid-v1", b"exec-42:wp-7", &token)
.expect("token signed under kid-v1 still verifies post-rotation");
let err = hmac_verify(&kid_v1_secret, "kid-v1", b"tampered", &token).unwrap_err();
assert!(matches!(err, HmacVerifyError::SignatureMismatch));
}
struct ExecFixture {
backend: std::sync::Arc<dyn EngineBackend>,
pool: PgPool,
exec_id: ExecutionId,
part: i16,
exec_uuid: Uuid,
handle: Handle,
attempt_index: AttemptIndex,
lease_epoch: LeaseEpoch,
lane: LaneId,
}
async fn setup_exec_or_skip() -> Option<ExecFixture> {
let pool = setup_or_skip().await?;
rotate_waitpoint_hmac_secret_all_impl(
&pool,
RotateWaitpointHmacSecretAllArgs::new("kid-suspend-1", "ab".repeat(32), 0),
1_000_000,
)
.await
.unwrap();
let lane = LaneId::new("default");
let exec_id = ExecutionId::solo(&lane, &PartitionConfig::default());
let part = exec_id.partition() as i16;
let exec_uuid = Uuid::parse_str(
exec_id.as_str().split_once("}:").unwrap().1,
)
.unwrap();
let now = TimestampMs::now().0;
sqlx::query(
"INSERT INTO ff_exec_core \
(partition_key, execution_id, lane_id, attempt_index, \
lifecycle_phase, ownership_state, eligibility_state, \
public_state, attempt_state, created_at_ms) \
VALUES ($1, $2, $3, 0, 'active', 'leased', 'not_applicable', \
'running', 'running_attempt', $4)",
)
.bind(part)
.bind(exec_uuid)
.bind(lane.as_str())
.bind(now)
.execute(&pool)
.await
.expect("insert exec_core");
let attempt_index = AttemptIndex::new(0);
let lease_epoch = LeaseEpoch(1);
sqlx::query(
"INSERT INTO ff_attempt \
(partition_key, execution_id, attempt_index, worker_id, \
worker_instance_id, lease_epoch, lease_expires_at_ms, started_at_ms) \
VALUES ($1, $2, 0, 'w1', 'wi1', 1, $3, $4)",
)
.bind(part)
.bind(exec_uuid)
.bind(now + 30_000)
.bind(now)
.execute(&pool)
.await
.expect("insert attempt");
let payload = HandlePayload::new(
exec_id.clone(),
attempt_index,
AttemptId::new(),
LeaseId::new(),
lease_epoch,
30_000,
lane.clone(),
WorkerInstanceId::new("wi1"),
);
let opaque = encode_opaque(BackendTag::Postgres, &payload);
let handle = Handle::new(BackendTag::Postgres, HandleKind::Fresh, opaque);
let backend = PostgresBackend::from_pool(pool.clone(), PartitionConfig::default())
as std::sync::Arc<dyn EngineBackend>;
Some(ExecFixture {
backend,
pool,
exec_id,
part,
exec_uuid,
handle,
attempt_index,
lease_epoch,
lane,
})
}
fn make_suspend_args(wp_key: &str) -> (SuspendArgs, WaitpointId) {
let wp_id = WaitpointId::new();
let binding = WaitpointBinding::Fresh {
waitpoint_id: wp_id.clone(),
waitpoint_key: wp_key.to_owned(),
};
let cond = ResumeCondition::Single {
waitpoint_key: wp_key.to_owned(),
matcher: SignalMatcher::ByName("ready".into()),
};
let args = SuspendArgs::new(
SuspensionId::new(),
binding,
cond,
ResumePolicy::normal(),
SuspensionReasonCode::WaitingForSignal,
TimestampMs::now(),
);
(args, wp_id)
}
fn deliver_signal_args(
exec_id: &ExecutionId,
wp_id: &WaitpointId,
signal_name: &str,
source_identity: &str,
token: &WaitpointToken,
) -> DeliverSignalArgs {
DeliverSignalArgs {
execution_id: exec_id.clone(),
waitpoint_id: wp_id.clone(),
signal_id: SignalId::new(),
signal_name: signal_name.to_owned(),
signal_category: "external".to_owned(),
source_type: "worker".to_owned(),
source_identity: source_identity.to_owned(),
payload: None,
payload_encoding: None,
correlation_id: None,
idempotency_key: None,
target_scope: "execution".to_owned(),
created_at: None,
dedup_ttl_ms: None,
resume_delay_ms: None,
max_signals_per_execution: None,
signal_maxlen: None,
waitpoint_token: token.clone(),
now: TimestampMs::now(),
}
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn suspend_then_deliver_single_resumes_execution() {
let Some(fx) = setup_exec_or_skip().await else {
return;
};
let (args, wp_id) = make_suspend_args("wpk:single");
let outcome = fx
.backend
.suspend(&fx.handle, args)
.await
.expect("suspend ok");
let SuspendOutcome::Suspended { details, .. } = outcome.clone() else {
panic!("expected Suspended, got {outcome:?}");
};
let token = details.waitpoint_token.token().clone();
let ds1 = deliver_signal_args(&fx.exec_id, &wp_id, "other", "w1", &token);
let r1 = fx.backend.deliver_signal(ds1).await.expect("deliver1");
match r1 {
DeliverSignalResult::Accepted { effect, .. } => {
assert_eq!(effect, "appended_to_waitpoint")
}
other => panic!("unexpected {other:?}"),
}
let ds2 = deliver_signal_args(&fx.exec_id, &wp_id, "ready", "w1", &token);
let r2 = fx.backend.deliver_signal(ds2).await.expect("deliver2");
match r2 {
DeliverSignalResult::Accepted { effect, .. } => {
assert_eq!(effect, "resume_condition_satisfied")
}
other => panic!("unexpected {other:?}"),
}
let (count,): (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM ff_completion_event WHERE execution_id = $1",
)
.bind(fx.exec_uuid)
.fetch_one(&fx.pool)
.await
.unwrap();
assert_eq!(count, 1);
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn composite_count_distinct_sources_requires_distinct() {
let Some(fx) = setup_exec_or_skip().await else {
return;
};
let wp_id = WaitpointId::new();
let wp_key = "wpk:count";
let binding = WaitpointBinding::Fresh {
waitpoint_id: wp_id.clone(),
waitpoint_key: wp_key.to_owned(),
};
let cond = ResumeCondition::Composite(CompositeBody::Count {
n: 2,
count_kind: CountKind::DistinctSources,
matcher: None,
waitpoints: vec![wp_key.to_owned()],
});
let args = SuspendArgs::new(
SuspensionId::new(),
binding,
cond,
ResumePolicy::normal(),
SuspensionReasonCode::WaitingForSignal,
TimestampMs::now(),
);
let outcome = fx
.backend
.suspend(&fx.handle, args)
.await
.expect("suspend ok");
let token = outcome.details().waitpoint_token.token().clone();
for _ in 0..2 {
let r = fx
.backend
.deliver_signal(deliver_signal_args(
&fx.exec_id,
&wp_id,
"x",
"src1",
&token,
))
.await
.expect("deliver same-source");
match r {
DeliverSignalResult::Accepted { effect, .. } => {
assert_eq!(effect, "appended_to_waitpoint");
}
other => panic!("unexpected {other:?}"),
}
}
let r = fx
.backend
.deliver_signal(deliver_signal_args(
&fx.exec_id,
&wp_id,
"x",
"src2",
&token,
))
.await
.expect("deliver distinct source");
match r {
DeliverSignalResult::Accepted { effect, .. } => {
assert_eq!(effect, "resume_condition_satisfied");
}
other => panic!("unexpected {other:?}"),
}
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn allof_two_waitpoints_requires_both() {
let Some(fx) = setup_exec_or_skip().await else {
return;
};
let wp_a = WaitpointId::new();
let wp_b = WaitpointId::new();
let cond = ResumeCondition::all_of_waitpoints(["wpk:a", "wpk:b"]);
let args = SuspendArgs::new(
SuspensionId::new(),
WaitpointBinding::Fresh {
waitpoint_id: wp_a.clone(),
waitpoint_key: "wpk:a".into(),
},
cond,
ResumePolicy::normal(),
SuspensionReasonCode::WaitingForSignal,
TimestampMs::now(),
)
.with_waitpoint(WaitpointBinding::Fresh {
waitpoint_id: wp_b.clone(),
waitpoint_key: "wpk:b".into(),
});
let outcome = fx
.backend
.suspend(&fx.handle, args)
.await
.expect("suspend ok");
let primary_token = outcome.details().waitpoint_token.token().clone();
let extra_token = outcome.details().additional_waitpoints[0]
.waitpoint_token
.token()
.clone();
let r = fx
.backend
.deliver_signal(deliver_signal_args(
&fx.exec_id,
&wp_a,
"x",
"src",
&primary_token,
))
.await
.expect("deliver a");
match r {
DeliverSignalResult::Accepted { effect, .. } => {
assert_eq!(effect, "appended_to_waitpoint")
}
other => panic!("{other:?}"),
}
let r = fx
.backend
.deliver_signal(deliver_signal_args(
&fx.exec_id,
&wp_b,
"x",
"src",
&extra_token,
))
.await
.expect("deliver b");
match r {
DeliverSignalResult::Accepted { effect, .. } => {
assert_eq!(effect, "resume_condition_satisfied")
}
other => panic!("{other:?}"),
}
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn idempotency_key_replay_returns_cached_outcome() {
let Some(fx) = setup_exec_or_skip().await else {
return;
};
let (mut args, _wp) = make_suspend_args("wpk:idem");
args = args.with_idempotency_key(IdempotencyKey::new("replay-key-1"));
let first = fx
.backend
.suspend(&fx.handle, args.clone())
.await
.expect("first suspend");
let second = fx
.backend
.suspend(&fx.handle, args)
.await
.expect("replay");
assert_eq!(
first.details().waitpoint_token.as_str(),
second.details().waitpoint_token.as_str()
);
assert_eq!(
first.details().suspension_id,
second.details().suspension_id
);
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn claim_resumed_execution_happy_plus_wrong_state() {
let Some(fx) = setup_exec_or_skip().await else {
return;
};
let (args, wp_id) = make_suspend_args("wpk:claim");
let outcome = fx.backend.suspend(&fx.handle, args).await.expect("suspend");
let token = outcome.details().waitpoint_token.token().clone();
let early = fx
.backend
.claim_resumed_execution(ClaimResumedExecutionArgs {
execution_id: fx.exec_id.clone(),
worker_id: WorkerId::new("w-new"),
worker_instance_id: WorkerInstanceId::new("wi-new"),
lane_id: fx.lane.clone(),
lease_id: LeaseId::new(),
lease_ttl_ms: 30_000,
current_attempt_index: fx.attempt_index,
remaining_attempt_timeout_ms: None,
now: TimestampMs::now(),
})
.await;
assert!(
matches!(
early,
Err(EngineError::Contention(ContentionKind::NotAResumedExecution))
),
"expected NotAResumedExecution, got {early:?}"
);
fx.backend
.deliver_signal(deliver_signal_args(
&fx.exec_id,
&wp_id,
"ready",
"src",
&token,
))
.await
.expect("satisfy");
let good = fx
.backend
.claim_resumed_execution(ClaimResumedExecutionArgs {
execution_id: fx.exec_id.clone(),
worker_id: WorkerId::new("w-new"),
worker_instance_id: WorkerInstanceId::new("wi-new"),
lane_id: fx.lane.clone(),
lease_id: LeaseId::new(),
lease_ttl_ms: 30_000,
current_attempt_index: fx.attempt_index,
remaining_attempt_timeout_ms: None,
now: TimestampMs::now(),
})
.await
.expect("claim resumed");
let ClaimResumedExecutionResult::Claimed(c) = good;
assert_eq!(c.execution_id, fx.exec_id);
assert!(c.lease_epoch.0 > fx.lease_epoch.0);
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn observe_signals_returns_delivered_signals() {
let Some(fx) = setup_exec_or_skip().await else {
return;
};
let (args, wp_id) = make_suspend_args("wpk:observe");
let outcome = fx.backend.suspend(&fx.handle, args).await.expect("suspend");
let token = outcome.details().waitpoint_token.token().clone();
let suspended_handle = match outcome {
SuspendOutcome::Suspended { handle, .. } => handle,
other => panic!("{other:?}"),
};
for name in ["a", "b", "ready"] {
fx.backend
.deliver_signal(deliver_signal_args(
&fx.exec_id,
&wp_id,
name,
"src",
&token,
))
.await
.expect("deliver");
}
let sigs = fx
.backend
.observe_signals(&suspended_handle)
.await
.expect("observe");
assert_eq!(sigs.len(), 3);
let names: std::collections::HashSet<_> =
sigs.iter().map(|s| s.signal_name.clone()).collect();
assert!(names.contains("a"));
assert!(names.contains("b"));
assert!(names.contains("ready"));
}
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn deliver_signal_retry_exhaustion_returns_retry_exhausted() {
let Some(fx) = setup_exec_or_skip().await else {
return;
};
let (args, wp_id) = make_suspend_args("wpk:retry");
let outcome = fx.backend.suspend(&fx.handle, args).await.expect("suspend");
let token = outcome.details().waitpoint_token.token().clone();
let wp_uuid = Uuid::parse_str(&wp_id.to_string()).unwrap();
let holder_pool = fx.pool.clone();
let hold_exec = fx.exec_uuid;
let hold_part = fx.part;
let (tx_done, rx_done) = tokio::sync::oneshot::channel::<()>();
let (tx_started, rx_started) =
tokio::sync::oneshot::channel::<()>();
let holder = tokio::spawn(async move {
let mut tx = holder_pool.begin().await.unwrap();
sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
.execute(&mut *tx)
.await
.unwrap();
let _: Option<(serde_json::Value,)> = sqlx::query_as(
"SELECT member_map FROM ff_suspension_current \
WHERE partition_key = $1 AND execution_id = $2",
)
.bind(hold_part)
.bind(hold_exec)
.fetch_optional(&mut *tx)
.await
.unwrap();
sqlx::query(
"UPDATE ff_suspension_current \
SET member_map = member_map || '{\"holder\":[]}'::jsonb \
WHERE partition_key = $1 AND execution_id = $2",
)
.bind(hold_part)
.bind(hold_exec)
.execute(&mut *tx)
.await
.unwrap();
let _ = tx_started.send(());
let _ = tokio::time::timeout(std::time::Duration::from_millis(500), rx_done).await;
tx.commit().await.ok();
});
rx_started.await.unwrap();
let result = fx
.backend
.deliver_signal(deliver_signal_args(
&fx.exec_id,
&wp_id,
"ready",
"src",
&token,
))
.await;
let _ = tx_done.send(());
holder.await.ok();
match result {
Ok(_) => {
eprintln!("deliver_signal succeeded after holder released (budget ok)");
}
Err(EngineError::Contention(ContentionKind::RetryExhausted)) => {
eprintln!("deliver_signal exhausted 3-attempt retry budget (Q11)");
}
Err(EngineError::Transport { .. }) => {
eprintln!("deliver_signal surfaced a Transport fault during retry");
}
Err(other) => panic!("unexpected retry-test error: {other:?}"),
}
assert_eq!(ff_backend_postgres::signal::SERIALIZABLE_RETRY_BUDGET, 3);
let _ = wp_uuid; }
#[tokio::test]
#[ignore = "requires a live Postgres; set FF_PG_TEST_URL"]
async fn suspend_by_triple_fresh_returns_suspended() {
let Some(fx) = setup_exec_or_skip().await else {
return;
};
let triple = LeaseFence {
lease_id: LeaseId::new(),
lease_epoch: fx.lease_epoch,
attempt_id: AttemptId::new(),
};
let (args, _wp_id) = make_suspend_args("wpk:triple");
let suspension_id = args.suspension_id.clone();
let outcome = fx
.backend
.suspend_by_triple(fx.exec_id.clone(), triple, args)
.await
.expect("suspend_by_triple ok");
match outcome {
SuspendOutcome::Suspended { details, handle } => {
assert_eq!(details.suspension_id, suspension_id);
assert_eq!(details.waitpoint_key, "wpk:triple");
assert_eq!(handle.kind, HandleKind::Suspended);
assert_eq!(handle.backend, BackendTag::Postgres);
}
other => panic!("expected Suspended, got {other:?}"),
}
let stale = LeaseFence {
lease_id: LeaseId::new(),
lease_epoch: fx.lease_epoch,
attempt_id: AttemptId::new(),
};
let (args2, _) = make_suspend_args("wpk:triple-replay");
let err = fx
.backend
.suspend_by_triple(fx.exec_id.clone(), stale, args2)
.await
.expect_err("stale epoch must reject");
assert!(
matches!(err, EngineError::Contention(ContentionKind::LeaseConflict))
|| matches!(err, EngineError::Contention(ContentionKind::RetryExhausted)),
"expected LeaseConflict or RetryExhausted, got {err:?}"
);
let _ = fx.exec_uuid; let _ = fx.part;
let _ = fx.attempt_index;
let _ = fx.lane;
let _ = fx.pool;
}