use ff_backend_sqlite::SqliteBackend;
use serial_test::serial;
async fn fresh_backend() -> std::sync::Arc<SqliteBackend> {
unsafe {
std::env::set_var("FF_DEV_MODE", "1");
}
let uri = format!(
"file:rfc-023-migrations-{}?mode=memory&cache=shared",
uuid_like()
);
SqliteBackend::new(&uri)
.await
.expect("construct with migrations")
}
fn uuid_like() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let tid = std::thread::current().id();
format!("{ns}-{tid:?}").replace([':', ' '], "-")
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn migrations_apply_and_schema_is_present() {
let backend = fresh_backend().await;
let pool = backend.pool_for_test();
let mut rows =
sqlx::query_as::<_, (String,)>("SELECT name FROM sqlite_master WHERE type='table'")
.fetch_all(pool)
.await
.expect("query sqlite_master");
rows.sort_by(|a, b| a.0.cmp(&b.0));
let names: Vec<String> = rows.into_iter().map(|(n,)| n).collect();
for expected in [
"ff_waitpoint_hmac",
"ff_lane_registry",
"ff_partition_config",
"ff_migration_annotation",
"ff_exec_core",
"ff_execution_capabilities",
"ff_flow_core",
"ff_attempt",
"ff_edge",
"ff_edge_group",
"ff_suspension_current",
"ff_waitpoint_pending",
"ff_stream_frame",
"ff_completion_event",
"ff_budget_policy",
"ff_budget_usage",
"ff_budget_usage_dedup",
"ff_suspend_dedup",
"ff_lease_event",
"ff_signal_event",
"ff_operator_event",
"ff_quota_policy",
"ff_quota_window",
"ff_quota_admitted",
"ff_cancel_backlog",
"ff_cancel_backlog_member",
"ff_claim_grant",
] {
assert!(
names.iter().any(|n| n == expected),
"expected table `{expected}` not present; got {names:?}"
);
}
assert!(
names.iter().any(|n| n == "_sqlx_migrations"),
"sqlx migration metadata table missing; got {names:?}"
);
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn sqlx_migration_ledger_records_all_14() {
let backend = fresh_backend().await;
let pool = backend.pool_for_test();
let (count,): (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM _sqlx_migrations WHERE success = 1")
.fetch_one(pool)
.await
.expect("query _sqlx_migrations");
assert_eq!(
count, 16,
"expected 16 successful migrations (0001..=0014 + 0016 + 0017), got {count}"
);
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn ff_execution_capabilities_junction_schema() {
let backend = fresh_backend().await;
let pool = backend.pool_for_test();
let cols: Vec<(i64, String, String, i64, Option<String>, i64)> =
sqlx::query_as("SELECT cid, name, type, \"notnull\", dflt_value, pk FROM pragma_table_info('ff_execution_capabilities')")
.fetch_all(pool)
.await
.expect("pragma_table_info");
let by_name: std::collections::HashMap<_, _> =
cols.iter().map(|c| (c.1.as_str(), c)).collect();
let exec = by_name
.get("execution_id")
.expect("execution_id column present");
assert_eq!(exec.2, "BLOB");
assert_eq!(exec.3, 1, "execution_id NOT NULL");
assert_eq!(exec.5, 1, "execution_id part of PK");
let cap = by_name
.get("capability")
.expect("capability column present");
assert_eq!(cap.2, "TEXT");
assert_eq!(cap.3, 1, "capability NOT NULL");
assert_eq!(cap.5, 2, "capability second PK column");
let (idx_count,): (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM sqlite_master \
WHERE type='index' AND name='ff_execution_capabilities_capability_idx'",
)
.fetch_one(pool)
.await
.expect("query reverse index");
assert_eq!(idx_count, 1, "capability reverse index missing");
}
#[tokio::test]
#[serial(ff_dev_mode)]
async fn migration_annotation_ledger_populated() {
let backend = fresh_backend().await;
let pool = backend.pool_for_test();
let rows: Vec<(i64,)> =
sqlx::query_as("SELECT version FROM ff_migration_annotation ORDER BY version")
.fetch_all(pool)
.await
.expect("query annotations");
let versions: Vec<i64> = rows.into_iter().map(|(v,)| v).collect();
let mut expected: Vec<i64> = (1..=14).collect();
expected.push(16);
expected.push(17);
assert_eq!(versions, expected);
}