#![cfg(feature = "core")]
use std::sync::Arc;
use ff_backend_sqlite::SqliteBackend;
use ff_core::backend::{CancelFlowPolicy, CancelFlowWait, CapabilitySet, ClaimPolicy};
use ff_core::contracts::{CancelFlowResult, CreateExecutionArgs};
use ff_core::engine_backend::EngineBackend;
use ff_core::types::{
ExecutionId, FlowId, LaneId, Namespace, TimestampMs, WorkerId, WorkerInstanceId,
};
use serial_test::serial;
use sqlx::Row;
use uuid::Uuid;
fn now_ms() -> i64 {
use std::time::{SystemTime, UNIX_EPOCH};
i64::try_from(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or(0),
)
.unwrap_or(0)
}
fn uuid_like() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let tid = std::thread::current().id();
format!("{ns}-{tid:?}").replace([':', ' '], "-")
}
async fn fresh_backend() -> Arc<SqliteBackend> {
unsafe {
std::env::set_var("FF_DEV_MODE", "1");
}
let uri = format!(
"file:rfc-023-wave9-followups-{}?mode=memory&cache=shared",
uuid_like()
);
SqliteBackend::new(&uri).await.expect("backend")
}
fn new_exec_id() -> ExecutionId {
ExecutionId::parse(&format!("{{fp:0}}:{}", Uuid::new_v4())).expect("exec id")
}
fn uuid_of(eid: &ExecutionId) -> Uuid {
Uuid::parse_str(eid.as_str().split_once("}:").unwrap().1).unwrap()
}
async fn create_runnable(b: &Arc<SqliteBackend>, lane: &LaneId) -> ExecutionId {
let exec_id = new_exec_id();
let args = CreateExecutionArgs {
execution_id: exec_id.clone(),
namespace: Namespace::new("default"),
lane_id: lane.clone(),
execution_kind: "op".into(),
input_payload: b"hello".to_vec(),
payload_encoding: None,
priority: 0,
creator_identity: "test".into(),
idempotency_key: None,
tags: Default::default(),
policy: None,
delay_until: None,
execution_deadline_at: None,
partition_id: 0,
now: TimestampMs::from_millis(now_ms()),
};
b.create_execution(args).await.expect("create");
let exec_uuid = uuid_of(&exec_id);
sqlx::query(
"UPDATE ff_exec_core SET lifecycle_phase='runnable', eligibility_state='eligible_now', \
attempt_state='initial' WHERE partition_key=0 AND execution_id=?1",
)
.bind(exec_uuid)
.execute(b.pool_for_test())
.await
.unwrap();
exec_id
}
async fn read_started_at_ms(b: &Arc<SqliteBackend>, exec_uuid: Uuid) -> Option<i64> {
let row = sqlx::query(
"SELECT started_at_ms FROM ff_exec_core WHERE partition_key=0 AND execution_id=?1",
)
.bind(exec_uuid)
.fetch_one(b.pool_for_test())
.await
.unwrap();
row.try_get::<Option<i64>, _>("started_at_ms").unwrap()
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn sqlite_first_claim_populates_started_at_ms_set_once() {
let b = fresh_backend().await;
let lane_id = LaneId::new(format!("wave9-356-{}", uuid_like()).as_str());
let exec_id = create_runnable(&b, &lane_id).await;
let exec_uuid = uuid_of(&exec_id);
assert_eq!(read_started_at_ms(&b, exec_uuid).await, None);
let policy = ClaimPolicy::new(
WorkerId::new("w356"),
WorkerInstanceId::new("w356-i1"),
30_000,
None,
);
let _h1 = b
.claim(&lane_id, &CapabilitySet::default(), policy.clone())
.await
.expect("claim 1")
.expect("handle 1");
let first = read_started_at_ms(&b, exec_uuid).await.expect("populated");
sqlx::query(
"UPDATE ff_exec_core SET lifecycle_phase='runnable', ownership_state='unowned', \
eligibility_state='eligible_now', attempt_state='initial', \
attempt_index=attempt_index+1 \
WHERE partition_key=0 AND execution_id=?1",
)
.bind(exec_uuid)
.execute(b.pool_for_test())
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
let _h2 = b
.claim(&lane_id, &CapabilitySet::default(), policy)
.await
.expect("claim 2")
.expect("handle 2");
let second = read_started_at_ms(&b, exec_uuid).await.expect("populated");
assert_eq!(first, second, "set-once: reclaim must not overwrite started_at_ms (#356)");
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn sqlite_cancel_flow_clears_member_attempt_outcome() {
let b = fresh_backend().await;
let lane_id = LaneId::new("wave9-355-cancel-flow");
let exec_id = create_runnable(&b, &lane_id).await;
let exec_uuid = uuid_of(&exec_id);
let flow_id = FlowId::new();
let flow_uuid = flow_id.0;
sqlx::query(
"INSERT INTO ff_flow_core \
(partition_key, flow_id, graph_revision, public_flow_state, created_at_ms) \
VALUES (0, ?1, 0, 'open', ?2)",
)
.bind(flow_uuid)
.bind(now_ms())
.execute(b.pool_for_test())
.await
.unwrap();
sqlx::query("UPDATE ff_exec_core SET flow_id=?1 WHERE partition_key=0 AND execution_id=?2")
.bind(flow_uuid)
.bind(exec_uuid)
.execute(b.pool_for_test())
.await
.unwrap();
sqlx::query(
"INSERT INTO ff_attempt \
(partition_key, execution_id, attempt_index, lease_epoch, outcome) \
VALUES (0, ?1, 0, 1, 'retry')",
)
.bind(exec_uuid)
.execute(b.pool_for_test())
.await
.unwrap();
let pre: Option<String> = sqlx::query_scalar(
"SELECT outcome FROM ff_attempt WHERE partition_key=0 AND execution_id=?1 AND attempt_index=0",
)
.bind(exec_uuid)
.fetch_one(b.pool_for_test())
.await
.unwrap();
assert_eq!(pre.as_deref(), Some("retry"));
let r = b
.cancel_flow(&flow_id, CancelFlowPolicy::CancelAll, CancelFlowWait::NoWait)
.await
.expect("cancel_flow ok");
assert!(
matches!(r, CancelFlowResult::Cancelled { .. }),
"got {r:?}"
);
let post: Option<String> = sqlx::query_scalar(
"SELECT outcome FROM ff_attempt WHERE partition_key=0 AND execution_id=?1 AND attempt_index=0",
)
.bind(exec_uuid)
.fetch_one(b.pool_for_test())
.await
.unwrap();
assert_eq!(post, None, "post-cancel: attempt.outcome must be NULL (#355)");
}