#![cfg(feature = "core")]
use ff_backend_sqlite::SqliteBackend;
use ff_core::backend::{CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy};
use ff_core::contracts::{
AddExecutionToFlowArgs, AddExecutionToFlowResult, ApplyDependencyToChildArgs,
ApplyDependencyToChildResult, CreateExecutionArgs, CreateExecutionResult, CreateFlowArgs,
CreateFlowResult, StageDependencyEdgeArgs, StageDependencyEdgeResult,
};
use ff_core::engine_backend::EngineBackend;
use ff_core::types::{
EdgeId, ExecutionId, FlowId, LaneId, Namespace, TimestampMs, WorkerId, WorkerInstanceId,
};
use serial_test::serial;
use std::collections::HashMap;
use std::sync::Arc;
use uuid::Uuid;
async fn fresh_backend() -> Arc<SqliteBackend> {
unsafe {
std::env::set_var("FF_DEV_MODE", "1");
}
let uri = format!(
"file:rfc-023-producer-{}?mode=memory&cache=shared",
uuid_like()
);
SqliteBackend::new(&uri).await.expect("construct backend")
}
fn uuid_like() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let tid = std::thread::current().id();
format!("{ns}-{tid:?}").replace([':', ' '], "-")
}
fn new_exec_id() -> ExecutionId {
ExecutionId::parse(&format!("{{fp:0}}:{}", Uuid::new_v4())).expect("exec id")
}
fn create_execution_args(exec_id: &ExecutionId, caps: &[&str]) -> CreateExecutionArgs {
let mut policy = ff_core::policy::ExecutionPolicy::default();
if !caps.is_empty() {
let rr = ff_core::policy::RoutingRequirements {
required_capabilities: caps.iter().map(|s| (*s).to_owned()).collect(),
..Default::default()
};
policy.routing_requirements = Some(rr);
}
CreateExecutionArgs {
execution_id: exec_id.clone(),
namespace: Namespace::new("default"),
lane_id: LaneId::new("default"),
execution_kind: "op".into(),
input_payload: b"hello".to_vec(),
payload_encoding: None,
priority: 0,
creator_identity: "test".into(),
idempotency_key: None,
tags: HashMap::new(),
policy: Some(policy),
delay_until: None,
execution_deadline_at: None,
partition_id: 0,
now: TimestampMs::from_millis(1_000),
}
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn create_execution_seeds_exec_core_and_capability_junction() {
let backend = fresh_backend().await;
let eid = new_exec_id();
let res = backend
.create_execution(create_execution_args(&eid, &["capA", "capB"]))
.await
.expect("create_execution");
match res {
CreateExecutionResult::Created { execution_id, .. } => {
assert_eq!(execution_id, eid);
}
other => panic!("expected Created, got {other:?}"),
}
let res2 = backend
.create_execution(create_execution_args(&eid, &["capA", "capB"]))
.await
.expect("create_execution replay");
assert!(matches!(res2, CreateExecutionResult::Duplicate { .. }));
let pool = backend.pool_for_test();
let exec_uuid = Uuid::parse_str(eid.as_str().split_once("}:").unwrap().1).unwrap();
let caps: Vec<String> = sqlx::query_scalar(
"SELECT capability FROM ff_execution_capabilities WHERE execution_id = ?1 ORDER BY capability",
)
.bind(exec_uuid)
.fetch_all(pool)
.await
.expect("read junction");
assert_eq!(caps, vec!["capA".to_string(), "capB".to_string()]);
let lane: Option<String> = sqlx::query_scalar(
"SELECT lane_id FROM ff_lane_registry WHERE lane_id = ?1",
)
.bind("default")
.fetch_optional(pool)
.await
.expect("read lane registry");
assert_eq!(lane.as_deref(), Some("default"));
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn create_flow_idempotent() {
let backend = fresh_backend().await;
let fid = FlowId::from_uuid(Uuid::new_v4());
let args = CreateFlowArgs {
flow_id: fid.clone(),
flow_kind: "dag".into(),
namespace: Namespace::new("default"),
now: TimestampMs::from_millis(100),
};
let r = backend.create_flow(args.clone()).await.expect("create_flow");
assert!(matches!(r, CreateFlowResult::Created { .. }));
let r2 = backend.create_flow(args).await.expect("create_flow replay");
assert!(matches!(r2, CreateFlowResult::AlreadySatisfied { .. }));
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn flow_build_cancel_roundtrip() {
let backend = fresh_backend().await;
let fid = FlowId::from_uuid(Uuid::new_v4());
backend
.create_flow(CreateFlowArgs {
flow_id: fid.clone(),
flow_kind: "dag".into(),
namespace: Namespace::new("default"),
now: TimestampMs::from_millis(0),
})
.await
.expect("create_flow");
let mut members: Vec<ExecutionId> = Vec::new();
for _ in 0..3 {
let eid = new_exec_id();
backend
.create_execution(create_execution_args(&eid, &[]))
.await
.expect("create_execution");
let added = backend
.add_execution_to_flow(AddExecutionToFlowArgs {
flow_id: fid.clone(),
execution_id: eid.clone(),
now: TimestampMs::from_millis(10),
})
.await
.expect("add_execution_to_flow");
assert!(matches!(added, AddExecutionToFlowResult::Added { .. }));
members.push(eid);
}
let replay = backend
.add_execution_to_flow(AddExecutionToFlowArgs {
flow_id: fid.clone(),
execution_id: members[2].clone(),
now: TimestampMs::from_millis(11),
})
.await
.expect("add_execution_to_flow replay");
assert!(matches!(replay, AddExecutionToFlowResult::AlreadyMember { .. }));
let staged1 = backend
.stage_dependency_edge(StageDependencyEdgeArgs {
flow_id: fid.clone(),
edge_id: EdgeId::from_uuid(Uuid::new_v4()),
upstream_execution_id: members[0].clone(),
downstream_execution_id: members[2].clone(),
dependency_kind: "success_only".into(),
data_passing_ref: None,
expected_graph_revision: 3,
now: TimestampMs::from_millis(20),
})
.await
.expect("stage edge 1");
let StageDependencyEdgeResult::Staged { new_graph_revision: rev1, edge_id: eid1 } = staged1;
assert_eq!(rev1, 4);
let staged2 = backend
.stage_dependency_edge(StageDependencyEdgeArgs {
flow_id: fid.clone(),
edge_id: EdgeId::from_uuid(Uuid::new_v4()),
upstream_execution_id: members[1].clone(),
downstream_execution_id: members[2].clone(),
dependency_kind: "success_only".into(),
data_passing_ref: None,
expected_graph_revision: 4,
now: TimestampMs::from_millis(21),
})
.await
.expect("stage edge 2");
let StageDependencyEdgeResult::Staged { new_graph_revision: rev2, edge_id: eid2 } = staged2;
assert_eq!(rev2, 5);
let applied = backend
.apply_dependency_to_child(ApplyDependencyToChildArgs {
flow_id: fid.clone(),
edge_id: eid1.clone(),
downstream_execution_id: members[2].clone(),
upstream_execution_id: members[0].clone(),
graph_revision: rev2,
dependency_kind: "success_only".into(),
data_passing_ref: None,
now: TimestampMs::from_millis(30),
})
.await
.expect("apply edge 1");
match applied {
ApplyDependencyToChildResult::Applied { unsatisfied_count } => {
assert_eq!(unsatisfied_count, 1);
}
other => panic!("expected Applied, got {other:?}"),
}
let replay = backend
.apply_dependency_to_child(ApplyDependencyToChildArgs {
flow_id: fid.clone(),
edge_id: eid1,
downstream_execution_id: members[2].clone(),
upstream_execution_id: members[0].clone(),
graph_revision: rev2,
dependency_kind: "success_only".into(),
data_passing_ref: None,
now: TimestampMs::from_millis(31),
})
.await
.expect("apply replay");
assert!(matches!(replay, ApplyDependencyToChildResult::AlreadyApplied));
let applied2 = backend
.apply_dependency_to_child(ApplyDependencyToChildArgs {
flow_id: fid.clone(),
edge_id: eid2,
downstream_execution_id: members[2].clone(),
upstream_execution_id: members[1].clone(),
graph_revision: rev2,
dependency_kind: "success_only".into(),
data_passing_ref: None,
now: TimestampMs::from_millis(32),
})
.await
.expect("apply edge 2");
match applied2 {
ApplyDependencyToChildResult::Applied { unsatisfied_count } => {
assert_eq!(unsatisfied_count, 2);
}
other => panic!("expected Applied(2), got {other:?}"),
}
let res = backend
.cancel_flow(&fid, CancelFlowPolicy::CancelAll, CancelFlowWait::NoWait)
.await
.expect("cancel_flow");
match res {
ff_core::contracts::CancelFlowResult::Cancelled {
cancellation_policy,
member_execution_ids,
} => {
assert_eq!(cancellation_policy, "cancel_all");
assert_eq!(member_execution_ids.len(), 3);
}
other => panic!("expected Cancelled, got {other:?}"),
}
let pool = backend.pool_for_test();
let state: String = sqlx::query_scalar(
"SELECT public_flow_state FROM ff_flow_core WHERE partition_key=0 AND flow_id=?1",
)
.bind(fid.0)
.fetch_one(pool)
.await
.expect("flow state");
assert_eq!(state, "cancelled");
for eid in &members {
let exec_uuid = Uuid::parse_str(eid.as_str().split_once("}:").unwrap().1).unwrap();
let lp: String = sqlx::query_scalar(
"SELECT lifecycle_phase FROM ff_exec_core WHERE partition_key=0 AND execution_id=?1",
)
.bind(exec_uuid)
.fetch_one(pool)
.await
.expect("exec lifecycle_phase");
assert_eq!(lp, "cancelled");
}
let completion_rows: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM ff_completion_event WHERE flow_id = ?1 AND outcome='cancelled'",
)
.bind(fid.0)
.fetch_one(pool)
.await
.expect("count completion");
assert_eq!(completion_rows, 3);
let lease_rows: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM ff_lease_event WHERE event_type='revoked'",
)
.fetch_one(pool)
.await
.expect("count lease");
assert_eq!(lease_rows, 3);
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn stage_dependency_edge_rejects_stale_revision() {
let backend = fresh_backend().await;
let fid = FlowId::from_uuid(Uuid::new_v4());
backend
.create_flow(CreateFlowArgs {
flow_id: fid.clone(),
flow_kind: "dag".into(),
namespace: Namespace::new("default"),
now: TimestampMs::from_millis(0),
})
.await
.unwrap();
let up = new_exec_id();
let down = new_exec_id();
for eid in [&up, &down] {
backend
.create_execution(create_execution_args(eid, &[]))
.await
.unwrap();
backend
.add_execution_to_flow(AddExecutionToFlowArgs {
flow_id: fid.clone(),
execution_id: eid.clone(),
now: TimestampMs::from_millis(10),
})
.await
.unwrap();
}
let err = backend
.stage_dependency_edge(StageDependencyEdgeArgs {
flow_id: fid,
edge_id: EdgeId::from_uuid(Uuid::new_v4()),
upstream_execution_id: up,
downstream_execution_id: down,
dependency_kind: "success_only".into(),
data_passing_ref: None,
expected_graph_revision: 99,
now: TimestampMs::from_millis(20),
})
.await
.expect_err("expect stale rev");
assert!(
matches!(
err,
ff_core::engine_error::EngineError::Contention(
ff_core::engine_error::ContentionKind::StaleGraphRevision
)
),
"unexpected error: {err:?}"
);
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn complete_writes_outbox_row_and_wakes_broadcast_subscriber() {
let backend = fresh_backend().await;
let pool = backend.pool_for_test();
let mut rx = backend.subscribe_completion_for_test();
let eid = new_exec_id();
backend
.create_execution(create_execution_args(&eid, &[]))
.await
.expect("create_execution");
let exec_uuid = Uuid::parse_str(eid.as_str().split_once("}:").unwrap().1).unwrap();
sqlx::query(
"UPDATE ff_exec_core SET lifecycle_phase='runnable', public_state='pending', \
attempt_state='initial' WHERE partition_key=0 AND execution_id=?1",
)
.bind(exec_uuid)
.execute(pool)
.await
.unwrap();
let policy = ClaimPolicy::new(
WorkerId::new("w"),
WorkerInstanceId::new("wi"),
30_000,
None,
);
let caps = CapabilitySet::new::<_, &str>([]);
let h = backend
.claim(&LaneId::new("default"), &caps, policy)
.await
.expect("claim")
.expect("some handle");
backend.complete(&h, None).await.expect("complete");
let (outcome, db_event_id): (String, i64) = sqlx::query_as(
"SELECT outcome, event_id FROM ff_completion_event WHERE execution_id = ?1",
)
.bind(exec_uuid)
.fetch_one(pool)
.await
.expect("read completion event");
assert_eq!(outcome, "success");
let ev = tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv())
.await
.expect("broadcast wakeup within 500ms")
.expect("broadcast channel healthy");
assert_eq!(ev.event_id, db_event_id);
assert_eq!(ev.partition_key, 0);
}