#![cfg(feature = "yieldpoints")]
use std::sync::Arc;
use std::time::Duration;
use tsoracle_consensus::ConsensusDriver;
use tsoracle_core::Epoch;
use tsoracle_server::test_fakes::InMemoryDriver;
use tsoracle_server::{Server, ServingState};
use tsoracle_yieldpoint as yieldpoint;
const FENCE_GATE: &str = "server::fence::after_load_before_persist";
#[tokio::test]
async fn fence_parks_at_after_load_yieldpoint_until_released() {
let driver = Arc::new(InMemoryDriver::new());
driver.persist_high_water(1_000, Epoch(0)).await.unwrap();
let server = Arc::new(
Server::builder()
.consensus_driver(driver.clone())
.failover_advance(Duration::from_millis(0))
.build()
.unwrap(),
);
let mut state_rx = server.subscribe();
let gate = yieldpoint::cfg(FENCE_GATE);
let watch_server = server.clone();
let watch_handle = tokio::spawn(async move { watch_server.run_leader_watch_for_tests().await });
driver.become_leader(Epoch(1));
tokio::time::sleep(Duration::from_millis(200)).await;
let state = state_rx.borrow().clone();
assert!(
matches!(state, ServingState::NotServing { .. }),
"fence must still be parked at the yield point; observed {state:?}"
);
gate.notify_one();
tokio::time::timeout(Duration::from_secs(2), async {
loop {
if matches!(*state_rx.borrow_and_update(), ServingState::Serving) {
return;
}
state_rx.changed().await.unwrap();
}
})
.await
.expect("fence must reach Serving within 2s of yield-point release");
yieldpoint::remove(FENCE_GATE);
watch_handle.abort();
}