use std::{sync::Arc, time::Duration};
use tsoracle_core::{Epoch, PeerEndpoint};
use tsoracle_proto::v1::{
GetCurrentMaxSafeRequest, GetTsRequest, tso_service_client::TsoServiceClient,
};
use tsoracle_server::Server;
use tsoracle_server::test_fakes::InMemoryDriver;
use tsoracle_server::test_support::{boot_server, wait_for_grpc_handshake, wait_until_serving};
const WINDOW_AHEAD: Duration = Duration::from_millis(500);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_current_max_safe_returns_zero_on_follower() {
let driver = Arc::new(InMemoryDriver::new());
let server = Server::builder()
.consensus_driver(driver.clone())
.window_ahead(WINDOW_AHEAD)
.failover_advance(Duration::from_millis(200))
.build()
.unwrap();
let booted = boot_server(server).await;
driver.become_follower(Some(PeerEndpoint::try_from("10.9.8.7:50551").unwrap()));
wait_for_grpc_handshake(booted.addr, Duration::from_secs(5))
.await
.expect("tonic never accepted gRPC handshake");
let mut client = TsoServiceClient::connect(format!("http://{}", booted.addr))
.await
.unwrap();
let resp = client
.get_current_max_safe(GetCurrentMaxSafeRequest {})
.await
.unwrap()
.into_inner();
assert_eq!(
resp.max_safe_physical_ms, 0,
"follower must report safe-point 0",
);
assert_eq!(
(resp.epoch_hi, resp.epoch_lo),
(0, 0),
"follower must report epoch zero",
);
booted.shutdown().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_current_max_safe_advances_and_is_bounded() {
let driver = Arc::new(InMemoryDriver::new());
let server = Server::builder()
.consensus_driver(driver.clone())
.window_ahead(WINDOW_AHEAD)
.failover_advance(Duration::from_millis(200))
.build()
.unwrap();
let mut booted = boot_server(server).await;
driver.become_leader(Epoch(1));
wait_until_serving(&mut booted.state_rx).await;
wait_for_grpc_handshake(booted.addr, Duration::from_secs(5))
.await
.expect("tonic never accepted gRPC handshake");
let mut client = TsoServiceClient::connect(format!("http://{}", booted.addr))
.await
.unwrap();
for _ in 0..4 {
client.get_ts(GetTsRequest { count: 64 }).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
}
let safe_a = client
.get_current_max_safe(GetCurrentMaxSafeRequest {})
.await
.unwrap()
.into_inner();
tokio::time::sleep(Duration::from_millis(200)).await;
let safe_b = client
.get_current_max_safe(GetCurrentMaxSafeRequest {})
.await
.unwrap()
.into_inner();
assert!(
safe_a.max_safe_physical_ms > 0,
"safe-point did not advance after issued timestamps",
);
assert!(
safe_b.max_safe_physical_ms >= safe_a.max_safe_physical_ms,
"safe-point regressed: {} -> {}",
safe_a.max_safe_physical_ms,
safe_b.max_safe_physical_ms,
);
assert_eq!((safe_a.epoch_hi, safe_a.epoch_lo), (0, 1));
booted.shutdown().await.unwrap();
}