use std::sync::Arc;
use std::time::Duration;
use ff_backend_valkey::ValkeyBackend;
use ff_core::engine_backend::EngineBackend;
use ff_core::engine_error::EngineError;
use ff_core::partition::PartitionConfig;
async fn construct_backend_with_permits(permits: u32) -> Arc<ValkeyBackend> {
let dial = tokio::time::timeout(
Duration::from_millis(500),
ferriskey::ClientBuilder::new()
.host("127.0.0.1", 6379)
.connect_timeout(Duration::from_millis(400))
.request_timeout(Duration::from_millis(400))
.build(),
)
.await;
let client = match dial {
Ok(Ok(c)) => c,
Ok(Err(e)) => panic!("dial 127.0.0.1:6379 failed: {e}"),
Err(_) => panic!("dial 127.0.0.1:6379 timed out"),
};
let mut backend =
ValkeyBackend::from_client_and_partitions(client, PartitionConfig::default());
assert!(
backend.with_stream_semaphore_permits(permits),
"size stream-op pool on fresh Arc"
);
backend
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore]
async fn shutdown_prepare_drains_in_flight_tails_within_grace() {
const PERMITS: u32 = 16;
let backend = construct_backend_with_permits(PERMITS).await;
let sem = backend.stream_semaphore_clone_for_tests();
let mut held = Vec::with_capacity(PERMITS as usize);
for _ in 0..PERMITS {
held.push(sem.clone().try_acquire_owned().expect("acquire"));
}
assert_eq!(backend.stream_semaphore_available(), 0);
let handle = tokio::spawn(async move {
let mut held = held;
for i in 0..(PERMITS as u64) {
tokio::time::sleep(Duration::from_millis(25 + i * 10)).await;
held.pop();
}
});
let start = std::time::Instant::now();
let res = backend
.shutdown_prepare(Duration::from_secs(5))
.await;
let elapsed = start.elapsed();
assert!(res.is_ok(), "happy-path shutdown_prepare Ok: {res:?}");
assert!(
elapsed < Duration::from_secs(5),
"drain completed inside 5s grace; elapsed={elapsed:?}"
);
let post_close = backend
.read_stream(
&ff_core::types::ExecutionId::solo(
&ff_core::types::LaneId::new("post_close"),
&PartitionConfig::default(),
),
ff_core::types::AttemptIndex::new(0),
ff_core::contracts::StreamCursor::Start,
ff_core::contracts::StreamCursor::End,
10,
)
.await;
assert!(
matches!(
&post_close,
Err(EngineError::Unavailable { op: "stream_ops" })
),
"post-close acquire → Unavailable, got {post_close:?}"
);
handle.await.expect("background releaser joined");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore]
async fn shutdown_prepare_times_out_when_grace_too_short() {
const PERMITS: u32 = 4;
let backend = construct_backend_with_permits(PERMITS).await;
let sem = backend.stream_semaphore_clone_for_tests();
let mut held = Vec::with_capacity(PERMITS as usize);
for _ in 0..PERMITS {
held.push(sem.clone().try_acquire_owned().expect("acquire"));
}
let res = backend
.shutdown_prepare(Duration::from_millis(100))
.await;
assert!(
matches!(&res, Err(EngineError::Timeout { op: "shutdown_prepare", .. })),
"starved drain → Timeout, got {res:?}"
);
drop(held);
}