#![cfg(feature = "core")]
use std::sync::Arc;
use std::time::Duration;
use ff_backend_sqlite::SqliteBackend;
use ff_core::backend::{CapabilitySet, ClaimPolicy};
use ff_core::contracts::{
CancelExecutionArgs, CancelExecutionResult, CancelFlowArgs, CancelFlowHeader,
ChangePriorityArgs, ChangePriorityResult, CreateExecutionArgs, CreateExecutionResult,
ReplayExecutionArgs, ReplayExecutionResult, RevokeLeaseArgs, RevokeLeaseResult,
};
use ff_core::types::FlowId;
use ff_core::engine_backend::EngineBackend;
use ff_core::engine_error::{ContentionKind, EngineError, StateKind};
use ff_core::state::PublicState;
use ff_core::types::{
CancelSource, ExecutionId, LaneId, LeaseEpoch, Namespace, TimestampMs, WorkerId,
WorkerInstanceId,
};
use serial_test::serial;
use uuid::Uuid;
fn now_ms() -> i64 {
use std::time::{SystemTime, UNIX_EPOCH};
i64::try_from(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or(0),
)
.unwrap_or(0)
}
fn uuid_like() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let tid = std::thread::current().id();
format!("{ns}-{tid:?}").replace([':', ' '], "-")
}
async fn fresh_backend() -> Arc<SqliteBackend> {
unsafe {
std::env::set_var("FF_DEV_MODE", "1");
}
let uri = format!("file:rfc-023-wave9-{}?mode=memory&cache=shared", uuid_like());
SqliteBackend::new(&uri).await.expect("construct backend")
}
fn new_exec_id() -> ExecutionId {
ExecutionId::parse(&format!("{{fp:0}}:{}", Uuid::new_v4())).expect("exec id")
}
fn create_args(exec_id: &ExecutionId, lane_id: &LaneId) -> CreateExecutionArgs {
CreateExecutionArgs {
execution_id: exec_id.clone(),
namespace: Namespace::new("default"),
lane_id: lane_id.clone(),
execution_kind: "op".into(),
input_payload: b"hello".to_vec(),
payload_encoding: None,
priority: 0,
creator_identity: "test".into(),
idempotency_key: None,
tags: Default::default(),
policy: None,
delay_until: None,
execution_deadline_at: None,
partition_id: 0,
now: TimestampMs::from_millis(now_ms()),
}
}
async fn create_runnable(b: &Arc<SqliteBackend>) -> (ExecutionId, LaneId) {
let lane_id = LaneId::new(format!("lane-{}", Uuid::new_v4()));
let exec_id = new_exec_id();
let args = create_args(&exec_id, &lane_id);
let r = b.create_execution(args).await.expect("create");
assert!(matches!(r, CreateExecutionResult::Created { .. }));
let exec_uuid = Uuid::parse_str(exec_id.as_str().split_once("}:").unwrap().1).unwrap();
sqlx::query(
"UPDATE ff_exec_core SET lifecycle_phase='runnable', public_state='pending', \
attempt_state='initial' WHERE partition_key=0 AND execution_id=?1",
)
.bind(exec_uuid)
.execute(b.pool_for_test())
.await
.unwrap();
(exec_id, lane_id)
}
async fn create_and_claim(
b: &Arc<SqliteBackend>,
) -> (ExecutionId, ff_core::backend::Handle) {
let (exec_id, lane_id) = create_runnable(b).await;
let policy = ClaimPolicy::new(
WorkerId::new("w1"),
WorkerInstanceId::new("w1-i1"),
30_000,
None,
);
let handle = b
.claim(&lane_id, &CapabilitySet::default(), policy)
.await
.expect("claim")
.expect("handle");
(exec_id, handle)
}
fn uuid_of(eid: &ExecutionId) -> Uuid {
Uuid::parse_str(eid.as_str().split_once("}:").unwrap().1).unwrap()
}
async fn count_outbox(
b: &Arc<SqliteBackend>,
table: &str,
exec_uuid: Uuid,
) -> i64 {
let sql = format!(
"SELECT COUNT(*) FROM {table} WHERE execution_id = ?1 AND partition_key = 0"
);
sqlx::query_scalar::<_, i64>(&sql)
.bind(exec_uuid.to_string())
.fetch_one(b.pool_for_test())
.await
.unwrap()
}
async fn read_lifecycle_phase(b: &Arc<SqliteBackend>, exec_uuid: Uuid) -> String {
sqlx::query_scalar::<_, String>(
"SELECT lifecycle_phase FROM ff_exec_core WHERE partition_key=0 AND execution_id=?1",
)
.bind(exec_uuid)
.fetch_one(b.pool_for_test())
.await
.unwrap()
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn cancel_execution_happy_path() {
let b = fresh_backend().await;
let (exec_id, _handle) = create_and_claim(&b).await;
let exec_uuid = uuid_of(&exec_id);
let args = CancelExecutionArgs {
execution_id: exec_id.clone(),
reason: "operator test".into(),
source: CancelSource::OperatorOverride,
lease_id: None,
lease_epoch: None,
attempt_id: None,
now: TimestampMs::from_millis(now_ms()),
};
let r = b.cancel_execution(args).await.expect("cancel");
assert!(matches!(
r,
CancelExecutionResult::Cancelled {
public_state: PublicState::Cancelled,
..
}
));
assert_eq!(read_lifecycle_phase(&b, exec_uuid).await, "cancelled");
assert_eq!(count_outbox(&b, "ff_operator_event", exec_uuid).await, 0);
assert_eq!(count_outbox(&b, "ff_lease_event", exec_uuid).await, 2,
"acquired + revoked lease events expected");
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn cancel_execution_idempotent_when_already_cancelled() {
let b = fresh_backend().await;
let (exec_id, _handle) = create_and_claim(&b).await;
let args = CancelExecutionArgs {
execution_id: exec_id.clone(),
reason: "first".into(),
source: CancelSource::OperatorOverride,
lease_id: None,
lease_epoch: None,
attempt_id: None,
now: TimestampMs::from_millis(now_ms()),
};
let _ = b.cancel_execution(args.clone()).await.expect("cancel 1");
let r = b.cancel_execution(args).await.expect("cancel 2");
assert!(matches!(r, CancelExecutionResult::Cancelled { .. }));
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn cancel_execution_lease_fence_required_when_not_operator_override() {
let b = fresh_backend().await;
let (exec_id, _handle) = create_and_claim(&b).await;
let args = CancelExecutionArgs {
execution_id: exec_id.clone(),
reason: "no fence".into(),
source: CancelSource::LeaseHolder,
lease_id: None,
lease_epoch: None,
attempt_id: None,
now: TimestampMs::from_millis(now_ms()),
};
let err = b.cancel_execution(args).await.unwrap_err();
assert!(matches!(err, EngineError::Validation { .. }), "got {err:?}");
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn cancel_execution_stale_lease_fence_rejects() {
let b = fresh_backend().await;
let (exec_id, _handle) = create_and_claim(&b).await;
let args = CancelExecutionArgs {
execution_id: exec_id.clone(),
reason: "bad fence".into(),
source: CancelSource::LeaseHolder,
lease_id: None,
lease_epoch: Some(LeaseEpoch(999_999)),
attempt_id: None,
now: TimestampMs::from_millis(now_ms()),
};
let err = b.cancel_execution(args).await.unwrap_err();
assert!(
matches!(err, EngineError::State(StateKind::StaleLease)),
"got {err:?}"
);
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn revoke_lease_happy_path() {
let b = fresh_backend().await;
let (exec_id, _handle) = create_and_claim(&b).await;
let exec_uuid = uuid_of(&exec_id);
let args = RevokeLeaseArgs {
execution_id: exec_id.clone(),
expected_lease_id: None,
worker_instance_id: WorkerInstanceId::new("w1-i1"),
reason: "operator revoke".into(),
};
let r = b.revoke_lease(args).await.expect("revoke");
assert!(matches!(r, RevokeLeaseResult::Revoked { .. }), "got {r:?}");
assert_eq!(
count_outbox(&b, "ff_lease_event", exec_uuid).await,
2,
"expected acquired (from claim) + revoked (from revoke_lease)"
);
assert_eq!(read_lifecycle_phase(&b, exec_uuid).await, "runnable");
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn revoke_lease_no_active_lease_returns_already_satisfied() {
let b = fresh_backend().await;
let (exec_id, _lane) = create_runnable(&b).await;
let args = RevokeLeaseArgs {
execution_id: exec_id.clone(),
expected_lease_id: None,
worker_instance_id: WorkerInstanceId::new("w1-i1"),
reason: "no lease".into(),
};
let r = b.revoke_lease(args).await.expect("revoke");
match r {
RevokeLeaseResult::AlreadySatisfied { reason } => {
assert_eq!(reason, "no_active_lease");
}
other => panic!("expected AlreadySatisfied, got {other:?}"),
}
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn revoke_lease_different_worker_returns_already_satisfied() {
let b = fresh_backend().await;
let (exec_id, _handle) = create_and_claim(&b).await;
let args = RevokeLeaseArgs {
execution_id: exec_id.clone(),
expected_lease_id: None,
worker_instance_id: WorkerInstanceId::new("w2-i1"),
reason: "wrong worker".into(),
};
let r = b.revoke_lease(args).await.expect("revoke");
match r {
RevokeLeaseResult::AlreadySatisfied { reason } => {
assert_eq!(reason, "different_worker_instance_id");
}
other => panic!("expected AlreadySatisfied, got {other:?}"),
}
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn change_priority_happy_path() {
let b = fresh_backend().await;
let (exec_id, _lane) = create_runnable(&b).await;
let exec_uuid = uuid_of(&exec_id);
sqlx::query(
"UPDATE ff_exec_core SET eligibility_state='eligible_now' \
WHERE partition_key=0 AND execution_id=?1",
)
.bind(exec_uuid)
.execute(b.pool_for_test())
.await
.unwrap();
let args = ChangePriorityArgs {
execution_id: exec_id.clone(),
new_priority: 42,
lane_id: LaneId::new("lane-ignored-on-sqlite"),
now: TimestampMs::from_millis(now_ms()),
};
let r = b.change_priority(args).await.expect("change_priority");
assert!(matches!(r, ChangePriorityResult::Changed { .. }));
let p: i64 = sqlx::query_scalar(
"SELECT priority FROM ff_exec_core WHERE partition_key=0 AND execution_id=?1",
)
.bind(exec_uuid)
.fetch_one(b.pool_for_test())
.await
.unwrap();
assert_eq!(p, 42);
assert_eq!(count_outbox(&b, "ff_operator_event", exec_uuid).await, 1);
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn change_priority_ineligible_execution_returns_not_eligible() {
let b = fresh_backend().await;
let lane = LaneId::new(format!("lane-{}", Uuid::new_v4()));
let exec_id = new_exec_id();
b.create_execution(create_args(&exec_id, &lane)).await.expect("create");
let args = ChangePriorityArgs {
execution_id: exec_id.clone(),
new_priority: 10,
lane_id: lane,
now: TimestampMs::from_millis(now_ms()),
};
let err = b.change_priority(args).await.unwrap_err();
assert!(
matches!(err, EngineError::Contention(ContentionKind::ExecutionNotEligible)),
"got {err:?}"
);
}
async fn create_complete_terminal(
b: &Arc<SqliteBackend>,
) -> (ExecutionId, Uuid) {
let (exec_id, handle) = create_and_claim(b).await;
b.complete(&handle, None).await.expect("complete");
let exec_uuid = uuid_of(&exec_id);
assert_eq!(read_lifecycle_phase(b, exec_uuid).await, "terminal");
(exec_id, exec_uuid)
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn replay_execution_normal_branch() {
let b = fresh_backend().await;
let (exec_id, exec_uuid) = create_complete_terminal(&b).await;
let args = ReplayExecutionArgs {
execution_id: exec_id.clone(),
now: TimestampMs::from_millis(now_ms()),
};
let r = b.replay_execution(args).await.expect("replay");
assert!(
matches!(
r,
ReplayExecutionResult::Replayed {
public_state: PublicState::Waiting,
}
),
"normal branch must resume to Waiting, got {r:?}"
);
assert_eq!(read_lifecycle_phase(&b, exec_uuid).await, "runnable");
let replay_count: i64 = sqlx::query_scalar(
"SELECT json_extract(raw_fields, '$.replay_count') \
FROM ff_exec_core WHERE partition_key=0 AND execution_id=?1",
)
.bind(exec_uuid)
.fetch_one(b.pool_for_test())
.await
.unwrap();
assert_eq!(replay_count, 1);
assert_eq!(count_outbox(&b, "ff_operator_event", exec_uuid).await, 1);
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn replay_execution_skipped_flow_member_resets_edge_group_counters() {
let b = fresh_backend().await;
let (exec_id, exec_uuid) = create_complete_terminal(&b).await;
let fake_flow = Uuid::new_v4();
sqlx::query(
"UPDATE ff_exec_core SET flow_id=?1 \
WHERE partition_key=0 AND execution_id=?2",
)
.bind(fake_flow)
.bind(exec_uuid)
.execute(b.pool_for_test())
.await
.unwrap();
sqlx::query(
"UPDATE ff_attempt SET outcome='skipped' \
WHERE partition_key=0 AND execution_id=?1",
)
.bind(exec_uuid)
.execute(b.pool_for_test())
.await
.unwrap();
let edge_id = Uuid::new_v4();
let upstream = Uuid::new_v4();
sqlx::query(
"INSERT INTO ff_edge (partition_key, flow_id, edge_id, upstream_eid, \
downstream_eid, policy, policy_version) \
VALUES (0, ?1, ?2, ?3, ?4, '{}', 0)",
)
.bind(fake_flow)
.bind(edge_id)
.bind(upstream)
.bind(exec_uuid)
.execute(b.pool_for_test())
.await
.unwrap();
sqlx::query(
"INSERT INTO ff_edge_group (partition_key, flow_id, downstream_eid, \
policy, k_target, success_count, fail_count, skip_count, running_count) \
VALUES (0, ?1, ?2, '{}', 1, 3, 5, 7, 2)",
)
.bind(fake_flow)
.bind(exec_uuid)
.execute(b.pool_for_test())
.await
.unwrap();
let args = ReplayExecutionArgs {
execution_id: exec_id.clone(),
now: TimestampMs::from_millis(now_ms()),
};
let r = b.replay_execution(args).await.expect("replay");
assert!(
matches!(
r,
ReplayExecutionResult::Replayed {
public_state: PublicState::WaitingChildren,
}
),
"skipped-flow-member branch resumes to WaitingChildren, got {r:?}"
);
let row: (i64, i64, i64, i64) = sqlx::query_as(
"SELECT success_count, fail_count, skip_count, running_count \
FROM ff_edge_group WHERE partition_key=0 AND flow_id=?1 AND downstream_eid=?2",
)
.bind(fake_flow)
.bind(exec_uuid)
.fetch_one(b.pool_for_test())
.await
.unwrap();
assert_eq!(row.0, 3, "success_count preserved");
assert_eq!(row.1, 0, "fail_count reset");
assert_eq!(row.2, 0, "skip_count reset");
assert_eq!(row.3, 0, "running_count reset");
let details: String = sqlx::query_scalar(
"SELECT details FROM ff_operator_event \
WHERE execution_id=?1 AND event_type='replayed'",
)
.bind(exec_uuid.to_string())
.fetch_one(b.pool_for_test())
.await
.unwrap();
assert!(
details.contains("skipped_flow_member"),
"details should mark skipped_flow_member branch: {details}"
);
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn replay_execution_non_terminal_returns_not_terminal() {
let b = fresh_backend().await;
let (exec_id, _lane) = create_runnable(&b).await;
let args = ReplayExecutionArgs {
execution_id: exec_id.clone(),
now: TimestampMs::from_millis(now_ms()),
};
let err = b.replay_execution(args).await.unwrap_err();
assert!(
matches!(err, EngineError::State(StateKind::ExecutionNotTerminal)),
"got {err:?}"
);
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn concurrent_cancel_serialization() {
let b = fresh_backend().await;
let (exec_id, _handle) = create_and_claim(&b).await;
let args = CancelExecutionArgs {
execution_id: exec_id.clone(),
reason: "concurrent".into(),
source: CancelSource::OperatorOverride,
lease_id: None,
lease_epoch: None,
attempt_id: None,
now: TimestampMs::from_millis(now_ms()),
};
let b2 = b.clone();
let args2 = args.clone();
let (r1, r2) = tokio::join!(
b.cancel_execution(args),
b2.cancel_execution(args2),
);
assert!(
matches!(r1, Ok(CancelExecutionResult::Cancelled { .. })),
"first cancel must succeed: {r1:?}"
);
assert!(
matches!(r2, Ok(CancelExecutionResult::Cancelled { .. })),
"second cancel must idempotently succeed: {r2:?}"
);
tokio::time::sleep(Duration::from_millis(20)).await;
}
async fn seed_flow_with_members(
b: &Arc<SqliteBackend>,
flow_id: &FlowId,
n: usize,
) -> Vec<Uuid> {
let flow_uuid = flow_id.0;
sqlx::query(
"INSERT INTO ff_flow_core \
(partition_key, flow_id, graph_revision, public_flow_state, created_at_ms, raw_fields) \
VALUES (0, ?1, 0, 'open', ?2, '{}')",
)
.bind(flow_uuid)
.bind(now_ms())
.execute(b.pool_for_test())
.await
.unwrap();
let mut members = Vec::with_capacity(n);
for _ in 0..n {
let m = Uuid::new_v4();
sqlx::query(
"INSERT INTO ff_exec_core \
(partition_key, execution_id, flow_id, lane_id, attempt_index, \
lifecycle_phase, ownership_state, eligibility_state, \
public_state, attempt_state, priority, created_at_ms, raw_fields) \
VALUES (0, ?1, ?2, 'default', 0, \
'active', 'leased', 'eligible_now', 'running', 'running_attempt', \
0, ?3, '{}')",
)
.bind(m)
.bind(flow_uuid)
.bind(now_ms())
.execute(b.pool_for_test())
.await
.unwrap();
members.push(m);
}
members
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn cancel_flow_header_happy_path_creates_backlog_and_flips_members() {
let b = fresh_backend().await;
let flow_id = FlowId::new();
let members = seed_flow_with_members(&b, &flow_id, 3).await;
let r = b
.cancel_flow_header(CancelFlowArgs {
flow_id: flow_id.clone(),
reason: "op-shutdown".into(),
cancellation_policy: "cancel_all".into(),
now: TimestampMs::from_millis(now_ms()),
})
.await
.expect("cancel_flow_header ok");
match r {
CancelFlowHeader::Cancelled {
cancellation_policy,
member_execution_ids,
} => {
assert_eq!(cancellation_policy, "cancel_all");
assert_eq!(member_execution_ids.len(), 3);
}
other => panic!("expected Cancelled, got {other:?}"),
}
let header_count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM ff_cancel_backlog WHERE flow_id=?1",
)
.bind(flow_id.0)
.fetch_one(b.pool_for_test())
.await
.unwrap();
assert_eq!(header_count, 1);
let member_rows: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM ff_cancel_backlog_member WHERE flow_id=?1",
)
.bind(flow_id.0)
.fetch_one(b.pool_for_test())
.await
.unwrap();
assert_eq!(member_rows, 3);
let cancelled: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM ff_exec_core \
WHERE flow_id=?1 AND lifecycle_phase='cancelled'",
)
.bind(flow_id.0)
.fetch_one(b.pool_for_test())
.await
.unwrap();
assert_eq!(cancelled, 3);
let fstate: String = sqlx::query_scalar(
"SELECT public_flow_state FROM ff_flow_core WHERE flow_id=?1",
)
.bind(flow_id.0)
.fetch_one(b.pool_for_test())
.await
.unwrap();
assert_eq!(fstate, "cancelled");
let op_events: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM ff_operator_event \
WHERE event_type='flow_cancel_requested' AND execution_id=?1",
)
.bind(flow_id.0.to_string())
.fetch_one(b.pool_for_test())
.await
.unwrap();
assert_eq!(op_events, 1);
let _ = members;
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn cancel_flow_header_already_terminal_is_idempotent() {
let b = fresh_backend().await;
let flow_id = FlowId::new();
sqlx::query(
"INSERT INTO ff_flow_core \
(partition_key, flow_id, graph_revision, public_flow_state, created_at_ms, raw_fields) \
VALUES (0, ?1, 0, 'cancelled', ?2, \
'{\"cancellation_policy\":\"flow_only\",\"cancel_reason\":\"prior\"}')",
)
.bind(flow_id.0)
.bind(now_ms())
.execute(b.pool_for_test())
.await
.unwrap();
let r = b
.cancel_flow_header(CancelFlowArgs {
flow_id: flow_id.clone(),
reason: "retry".into(),
cancellation_policy: "cancel_all".into(),
now: TimestampMs::from_millis(now_ms()),
})
.await
.expect("idempotent ok");
match r {
CancelFlowHeader::AlreadyTerminal {
stored_cancellation_policy,
stored_cancel_reason,
member_execution_ids,
} => {
assert_eq!(stored_cancellation_policy.as_deref(), Some("flow_only"));
assert_eq!(stored_cancel_reason.as_deref(), Some("prior"));
assert!(member_execution_ids.is_empty());
}
other => panic!("expected AlreadyTerminal, got {other:?}"),
}
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn ack_cancel_member_drains_and_deletes_parent_when_last() {
let b = fresh_backend().await;
let flow_id = FlowId::new();
let _members = seed_flow_with_members(&b, &flow_id, 2).await;
let r = b
.cancel_flow_header(CancelFlowArgs {
flow_id: flow_id.clone(),
reason: "test".into(),
cancellation_policy: "cancel_all".into(),
now: TimestampMs::from_millis(now_ms()),
})
.await
.expect("cancel ok");
let member_ids = match r {
CancelFlowHeader::Cancelled {
member_execution_ids,
..
} => member_execution_ids,
other => panic!("expected Cancelled, got {other:?}"),
};
assert_eq!(member_ids.len(), 2);
let eid_a = ExecutionId::parse(&member_ids[0]).expect("parse");
b.ack_cancel_member(&flow_id, &eid_a)
.await
.expect("ack 1");
let header_count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM ff_cancel_backlog WHERE flow_id=?1",
)
.bind(flow_id.0)
.fetch_one(b.pool_for_test())
.await
.unwrap();
assert_eq!(header_count, 1);
let remaining: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM ff_cancel_backlog_member WHERE flow_id=?1",
)
.bind(flow_id.0)
.fetch_one(b.pool_for_test())
.await
.unwrap();
assert_eq!(remaining, 1);
let eid_b = ExecutionId::parse(&member_ids[1]).expect("parse");
b.ack_cancel_member(&flow_id, &eid_b)
.await
.expect("ack 2");
let header_after: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM ff_cancel_backlog WHERE flow_id=?1",
)
.bind(flow_id.0)
.fetch_one(b.pool_for_test())
.await
.unwrap();
assert_eq!(header_after, 0, "parent backlog row dropped on final ack");
let members_after: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM ff_cancel_backlog_member WHERE flow_id=?1",
)
.bind(flow_id.0)
.fetch_one(b.pool_for_test())
.await
.unwrap();
assert_eq!(members_after, 0);
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn ack_cancel_member_idempotent_on_missing() {
let b = fresh_backend().await;
let flow_id = FlowId::new();
let missing_eid = new_exec_id();
b.ack_cancel_member(&flow_id, &missing_eid)
.await
.expect("idempotent no-op");
}