use core::pin::Pin;
use futures::{Stream, StreamExt, stream};
use std::{sync::Arc, time::Duration};
use tokio::sync::Notify;
use tsoracle_consensus::{ConsensusDriver, ConsensusError, LeaderState};
use tsoracle_core::Epoch;
use tsoracle_proto::v1::{GetTsRequest, tso_service_client::TsoServiceClient};
use tsoracle_server::Server;
use tsoracle_server::test_fakes::InMemoryDriver;
use tsoracle_server::test_support::{
boot_router, wait_for_grpc_handshake, wait_until_not_serving, wait_until_serving,
};
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn embedded_router_serves_via_caller_owned_listener() {
let driver = Arc::new(InMemoryDriver::new());
let tsoracle = Server::builder()
.consensus_driver(driver.clone())
.build()
.unwrap();
let mut state_rx = tsoracle.subscribe();
let (router, _leader_watch) = tsoracle
.into_router()
.expect("into_router is infallible without the reflection feature");
let booted = boot_router(router).await;
driver.become_leader(Epoch(1));
wait_until_serving(&mut state_rx).await;
wait_for_grpc_handshake(booted.addr, Duration::from_secs(5))
.await
.expect("tonic never accepted gRPC handshake");
let mut client = TsoServiceClient::connect(format!("http://{}", booted.addr))
.await
.unwrap();
let resp = client
.get_ts(GetTsRequest { count: 1 })
.await
.unwrap()
.into_inner();
assert_eq!(resp.count, 1);
assert_eq!((resp.epoch_hi, resp.epoch_lo), (0, 1));
booted.shutdown().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn embedded_router_poisons_when_leadership_stream_closes() {
let eof_gate = Arc::new(Notify::new());
let driver: Arc<dyn ConsensusDriver> = Arc::new(LeaderThenGatedEofDriver {
epoch: Epoch(1),
eof_gate: eof_gate.clone(),
});
let tsoracle = Server::builder().consensus_driver(driver).build().unwrap();
let mut state_rx = tsoracle.subscribe();
let (router, _leader_watch) = tsoracle
.into_router()
.expect("into_router is infallible without the reflection feature");
let booted = boot_router(router).await;
tokio::time::timeout(Duration::from_secs(5), wait_until_serving(&mut state_rx))
.await
.expect("fence never reached Serving after Leader event");
eof_gate.notify_one();
tokio::time::timeout(
Duration::from_secs(5),
wait_until_not_serving(&mut state_rx),
)
.await
.expect("watch task did not publish NotServing after leadership stream ended");
wait_for_grpc_handshake(booted.addr, Duration::from_secs(5))
.await
.expect("tonic never accepted gRPC handshake");
let mut client = TsoServiceClient::connect(format!("http://{}", booted.addr))
.await
.unwrap();
let status = client
.get_ts(GetTsRequest { count: 1 })
.await
.expect_err("GetTs must fail fast once the watch task has terminated");
assert_eq!(
status.code(),
tonic::Code::FailedPrecondition,
"post-EOF GetTs must surface FAILED_PRECONDITION; got {status:?}",
);
booted.shutdown().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn dropping_watch_guard_cancels_leader_watch_and_poisons() {
let driver = Arc::new(InMemoryDriver::new());
let tsoracle = Server::builder()
.consensus_driver(driver.clone())
.build()
.unwrap();
let mut state_rx = tsoracle.subscribe();
let (router, leader_watch) = tsoracle
.into_router()
.expect("into_router is infallible without the reflection feature");
let booted = boot_router(router).await;
driver.become_leader(Epoch(1));
wait_until_serving(&mut state_rx).await;
wait_for_grpc_handshake(booted.addr, Duration::from_secs(5))
.await
.expect("tonic never accepted gRPC handshake");
drop(leader_watch);
tokio::time::timeout(
Duration::from_secs(5),
wait_until_not_serving(&mut state_rx),
)
.await
.expect("dropping the WatchGuard did not stop the leader-watch task");
let mut client = TsoServiceClient::connect(format!("http://{}", booted.addr))
.await
.unwrap();
let status = client
.get_ts(GetTsRequest { count: 1 })
.await
.expect_err("GetTs must fail fast once the watch task has been cancelled");
assert_eq!(
status.code(),
tonic::Code::FailedPrecondition,
"post-cancel GetTs must surface FAILED_PRECONDITION; got {status:?}",
);
booted.shutdown().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn watch_guard_shutdown_returns_ok_and_poisons() {
let driver = Arc::new(InMemoryDriver::new());
let tsoracle = Server::builder()
.consensus_driver(driver.clone())
.build()
.unwrap();
let mut state_rx = tsoracle.subscribe();
let (router, leader_watch) = tsoracle
.into_router()
.expect("into_router is infallible without the reflection feature");
let _booted = boot_router(router).await;
driver.become_leader(Epoch(1));
wait_until_serving(&mut state_rx).await;
let outcome = tokio::time::timeout(Duration::from_secs(5), leader_watch.shutdown())
.await
.expect("WatchGuard::shutdown did not complete");
assert!(
outcome.is_ok(),
"cooperative shutdown of a healthy watch task must return Ok(()); got {outcome:?}",
);
assert!(
matches!(
&*state_rx.borrow(),
tsoracle_server::ServingState::NotServing { .. }
),
"shutdown() must leave serving state NotServing",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn watch_guard_abort_stops_the_task() {
let driver = Arc::new(InMemoryDriver::new());
let tsoracle = Server::builder()
.consensus_driver(driver.clone())
.build()
.unwrap();
let mut state_rx = tsoracle.subscribe();
let (router, leader_watch) = tsoracle
.into_router()
.expect("into_router is infallible without the reflection feature");
let _booted = boot_router(router).await;
leader_watch.abort();
tokio::time::sleep(Duration::from_millis(200)).await;
driver.become_leader(Epoch(1));
let serving =
tokio::time::timeout(Duration::from_secs(2), wait_until_serving(&mut state_rx)).await;
assert!(
serving.is_err(),
"an aborted watch task must not transition to Serving on a later leadership gain",
);
}
struct LeaderThenGatedEofDriver {
epoch: Epoch,
eof_gate: Arc<Notify>,
}
#[async_trait::async_trait]
impl ConsensusDriver for LeaderThenGatedEofDriver {
fn leadership_events(&self) -> Pin<Box<dyn Stream<Item = LeaderState> + Send>> {
let epoch = self.epoch;
let gate = self.eof_gate.clone();
let leader = stream::iter(vec![LeaderState::Leader { epoch }]);
let wait_then_end = stream::once(async move {
gate.notified().await;
})
.flat_map(|()| stream::empty::<LeaderState>());
Box::pin(leader.chain(wait_then_end))
}
async fn load_high_water(&self) -> Result<u64, ConsensusError> {
Ok(0)
}
async fn persist_high_water(
&self,
at_least: u64,
_epoch: Epoch,
) -> Result<u64, ConsensusError> {
Ok(at_least)
}
}