#![cfg(all(feature = "failpoints", feature = "test-support"))]
use std::sync::Arc;
use std::time::{Duration, Instant};
use tsoracle_core::Epoch;
use tsoracle_server::test_fakes::InMemoryDriver;
use tsoracle_server::test_support::{
boot_router, wait_for_grpc_handshake, wait_until, wait_until_serving,
};
use tsoracle_server::{Server, ServerError, ServingState};
static FAILPOINT_TEST_SERIAL: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
#[tokio::test]
async fn fence_aborted_after_load_does_not_advance_to_serving() {
let _serial = FAILPOINT_TEST_SERIAL.lock().await;
let _scenario = fail::FailScenario::setup();
let driver = Arc::new(InMemoryDriver::new());
let server = Server::builder()
.consensus_driver(driver.clone())
.build()
.unwrap();
let (_routes, watch_handle) = server.into_router();
fail::cfg(
"server::fence::after_load_before_persist",
"return(transient)",
)
.unwrap();
driver.become_leader(Epoch(1));
let result = tokio::time::timeout(Duration::from_secs(2), watch_handle)
.await
.expect("watch task did not terminate within 2s")
.expect("watch task panicked");
assert!(
matches!(result, Err(ServerError::Consensus(_))),
"expected ServerError::Consensus, got {result:?}"
);
assert_eq!(driver.current_high_water(), 0);
}
#[tokio::test]
async fn fence_panic_after_persist_advances_durable_but_not_serving() {
let _serial = FAILPOINT_TEST_SERIAL.lock().await;
let _scenario = fail::FailScenario::setup();
let driver = Arc::new(InMemoryDriver::new());
let server = Server::builder()
.consensus_driver(driver.clone())
.build()
.unwrap();
let (_routes, watch_handle) = server.into_router();
fail::cfg("server::fence::after_persist_before_publish", "panic").unwrap();
driver.become_leader(Epoch(1));
let result = tokio::time::timeout(Duration::from_secs(2), watch_handle)
.await
.expect("watch task did not terminate within 2s");
assert!(
result.is_err(),
"expected the panic to surface as a JoinError, got {result:?}"
);
assert!(
driver.current_high_water() > 0,
"persist should have advanced the driver's stored value before the panic"
);
}
#[tokio::test]
async fn panic_after_serving_published_poisons_state_when_handle_dropped() {
let _serial = FAILPOINT_TEST_SERIAL.lock().await;
let _scenario = fail::FailScenario::setup();
let driver = Arc::new(InMemoryDriver::new());
let server = Server::builder()
.consensus_driver(driver.clone())
.build()
.unwrap();
let (routes, watch_handle) = server.into_router();
drop(watch_handle);
let booted = boot_router(routes).await;
fail::cfg("server::fence::after_serving_published", "panic").unwrap();
driver.become_leader(Epoch(1));
wait_for_grpc_handshake(booted.addr, Duration::from_secs(5))
.await
.expect("tonic never accepted gRPC handshake");
let mut client = tsoracle_proto::v1::tso_service_client::TsoServiceClient::connect(format!(
"http://{}",
booted.addr
))
.await
.unwrap();
let deadline = Instant::now() + Duration::from_secs(2);
loop {
let result = client
.get_ts(tsoracle_proto::v1::GetTsRequest { count: 1 })
.await;
match result {
Ok(_) => {
assert!(
Instant::now() < deadline,
"GetTs continued to succeed after watch-task panic — \
serving state was never poisoned (the regression #27 pins)"
);
tokio::time::sleep(Duration::from_millis(20)).await;
}
Err(status) if status.code() == tonic::Code::FailedPrecondition => break,
Err(status) => panic!("unexpected gRPC status: {status:?}"),
}
}
booted.abort();
}
#[tokio::test]
async fn before_allocate_sleep_delays_get_ts() {
let _serial = FAILPOINT_TEST_SERIAL.lock().await;
let _scenario = fail::FailScenario::setup();
let driver = Arc::new(InMemoryDriver::new());
let server = Server::builder()
.consensus_driver(driver.clone())
.build()
.unwrap();
let mut state_rx = server.state_rx.clone();
let (routes, _watch_handle) = server.into_router();
let booted = boot_router(routes).await;
driver.become_leader(Epoch(1));
wait_until_serving(&mut state_rx).await;
wait_for_grpc_handshake(booted.addr, Duration::from_secs(5))
.await
.expect("tonic never accepted gRPC handshake");
let endpoint = format!("http://{}", booted.addr);
let client = tsoracle_client::Client::connect(vec![endpoint])
.await
.unwrap();
fail::cfg("server::service::before_allocate", "sleep(150)").unwrap();
let start = Instant::now();
let result = client.get_ts().await;
let elapsed = start.elapsed();
fail::cfg("server::service::before_allocate", "off").unwrap();
let err = result.err();
assert!(
err.is_none(),
"get_ts failed under sleep(150) failpoint: {err:?} (elapsed {elapsed:?})"
);
assert!(
elapsed >= Duration::from_millis(120),
"expected at least 120ms delay (sleep was 150ms), saw {elapsed:?}"
);
drop(client);
booted.abort();
}
#[tokio::test]
async fn extension_gate_held_sleep_delays_get_ts() {
let _serial = FAILPOINT_TEST_SERIAL.lock().await;
let _scenario = fail::FailScenario::setup();
let driver = Arc::new(InMemoryDriver::new());
let server = Server::builder()
.consensus_driver(driver.clone())
.failover_advance(Duration::from_millis(1))
.build()
.unwrap();
let mut state_rx = server.state_rx.clone();
let (routes, _watch_handle) = server.into_router();
let booted = boot_router(routes).await;
driver.become_leader(Epoch(1));
wait_until(&mut state_rx, |s| matches!(s, ServingState::Serving)).await;
wait_for_grpc_handshake(booted.addr, Duration::from_secs(5))
.await
.expect("tonic never accepted gRPC handshake");
let endpoint = format!("http://{}", booted.addr);
let client = tsoracle_client::Client::connect(vec![endpoint])
.await
.unwrap();
fail::cfg("server::service::extension_gate_held", "sleep(150)").unwrap();
let start = Instant::now();
let result = client.get_ts().await;
let elapsed = start.elapsed();
fail::cfg("server::service::extension_gate_held", "off").unwrap();
let err = result.err();
assert!(
err.is_none(),
"get_ts failed under sleep(150) failpoint: {err:?} (elapsed {elapsed:?})"
);
assert!(
elapsed >= Duration::from_millis(120),
"expected at least 120ms delay (sleep was 150ms), saw {elapsed:?}"
);
drop(client);
booted.abort();
}