use std::time::Duration;
use tsoracle_driver_file::FileDriver;
use tsoracle_proto::v1::{GetSeqRequest, tso_service_client::TsoServiceClient};
use tsoracle_server::Server;
use tsoracle_server::test_support::{boot_server, wait_for_grpc_handshake, wait_until_serving};
async fn boot_file_server() -> (
tsoracle_server::test_support::BootedServer,
TsoServiceClient<tonic::transport::Channel>,
tempfile::TempDir,
) {
let dir = tempfile::tempdir().unwrap();
let driver = FileDriver::open_or_init(dir.path()).unwrap();
let server = Server::builder()
.consensus_driver(driver)
.window_ahead(Duration::from_secs(1))
.failover_advance(Duration::from_millis(500))
.build()
.unwrap();
let mut booted = boot_server(server).await;
wait_until_serving(&mut booted.state_rx).await;
wait_for_grpc_handshake(booted.addr, Duration::from_secs(5))
.await
.expect("tonic never accepted gRPC handshake");
let client = TsoServiceClient::connect(format!("http://{}", booted.addr))
.await
.unwrap();
(booted, client, dir)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_seq_returns_contiguous_blocks() {
let (booted, mut client, _dir) = boot_file_server().await;
let resp = client
.get_seq(GetSeqRequest {
key: "orders".to_string(),
count: 5,
})
.await
.unwrap()
.into_inner();
assert_eq!(resp.key, "orders");
assert_eq!(resp.start, 0);
assert_eq!(resp.count, 5);
let epoch = resp.epoch.expect("epoch must be present on success");
assert_eq!((epoch.hi, epoch.lo), (0, 0));
let resp2 = client
.get_seq(GetSeqRequest {
key: "orders".to_string(),
count: 3,
})
.await
.unwrap()
.into_inner();
assert_eq!(resp2.start, 5);
assert_eq!(resp2.count, 3);
assert_eq!(resp2.epoch, resp.epoch);
booted.shutdown().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_seq_count_zero_returns_invalid_argument() {
let (booted, mut client, _dir) = boot_file_server().await;
let err = client
.get_seq(GetSeqRequest {
key: "orders".to_string(),
count: 0,
})
.await
.unwrap_err();
assert_eq!(
err.code(),
tonic::Code::InvalidArgument,
"count=0 must return InvalidArgument, got: {err:?}"
);
booted.shutdown().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_seq_empty_key_returns_invalid_argument() {
let (booted, mut client, _dir) = boot_file_server().await;
let err = client
.get_seq(GetSeqRequest {
key: String::new(),
count: 1,
})
.await
.unwrap_err();
assert_eq!(
err.code(),
tonic::Code::InvalidArgument,
"empty key must return InvalidArgument, got: {err:?}"
);
booted.shutdown().await.unwrap();
}
struct DenseUnsupportedDriver;
struct DenseNotActivatedDriver;
#[async_trait::async_trait]
impl tsoracle_consensus::ConsensusDriver for DenseNotActivatedDriver {
fn leadership_events(
&self,
) -> std::pin::Pin<Box<dyn futures::Stream<Item = tsoracle_consensus::LeaderState> + Send>>
{
use futures::StreamExt;
Box::pin(
futures::stream::once(async {
tsoracle_consensus::LeaderState::Leader {
epoch: tsoracle_core::Epoch::ZERO,
}
})
.chain(futures::stream::pending()),
)
}
async fn load_high_water(&self) -> Result<u64, tsoracle_consensus::ConsensusError> {
Ok(0)
}
async fn persist_high_water(
&self,
at_least: u64,
_epoch: tsoracle_core::Epoch,
) -> Result<u64, tsoracle_consensus::ConsensusError> {
Ok(at_least)
}
async fn advance_dense(
&self,
_key: &tsoracle_core::SeqKey,
_count: u32,
_epoch: tsoracle_core::Epoch,
) -> Result<u64, tsoracle_consensus::ConsensusError> {
Err(tsoracle_consensus::ConsensusError::DenseNotActivated {
required: 5,
active: 4,
})
}
}
#[async_trait::async_trait]
impl tsoracle_consensus::ConsensusDriver for DenseUnsupportedDriver {
fn leadership_events(
&self,
) -> std::pin::Pin<Box<dyn futures::Stream<Item = tsoracle_consensus::LeaderState> + Send>>
{
use futures::StreamExt;
Box::pin(
futures::stream::once(async {
tsoracle_consensus::LeaderState::Leader {
epoch: tsoracle_core::Epoch::ZERO,
}
})
.chain(futures::stream::pending()),
)
}
async fn load_high_water(&self) -> Result<u64, tsoracle_consensus::ConsensusError> {
Ok(0)
}
async fn persist_high_water(
&self,
at_least: u64,
_epoch: tsoracle_core::Epoch,
) -> Result<u64, tsoracle_consensus::ConsensusError> {
Ok(at_least)
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_seq_unsupported_driver_returns_unimplemented() {
let server = Server::builder()
.consensus_driver(std::sync::Arc::new(DenseUnsupportedDriver))
.window_ahead(Duration::from_secs(1))
.failover_advance(Duration::from_millis(500))
.build()
.unwrap();
let mut booted = boot_server(server).await;
wait_until_serving(&mut booted.state_rx).await;
wait_for_grpc_handshake(booted.addr, Duration::from_secs(5))
.await
.expect("tonic never accepted gRPC handshake");
let mut client = TsoServiceClient::connect(format!("http://{}", booted.addr))
.await
.unwrap();
let err = client
.get_seq(GetSeqRequest {
key: "orders".to_string(),
count: 1,
})
.await
.unwrap_err();
assert_eq!(
err.code(),
tonic::Code::Unimplemented,
"a leader without dense support must return UNIMPLEMENTED, got: {err:?}"
);
booted.shutdown().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_seq_dense_not_activated_returns_failed_precondition() {
let server = Server::builder()
.consensus_driver(std::sync::Arc::new(DenseNotActivatedDriver))
.window_ahead(Duration::from_secs(1))
.failover_advance(Duration::from_millis(500))
.build()
.unwrap();
let mut booted = boot_server(server).await;
wait_until_serving(&mut booted.state_rx).await;
wait_for_grpc_handshake(booted.addr, Duration::from_secs(5))
.await
.expect("tonic never accepted gRPC handshake");
let mut client = TsoServiceClient::connect(format!("http://{}", booted.addr))
.await
.unwrap();
let err = client
.get_seq(GetSeqRequest {
key: "orders".to_string(),
count: 1,
})
.await
.unwrap_err();
assert_eq!(
err.code(),
tonic::Code::FailedPrecondition,
"DenseNotActivated must return FAILED_PRECONDITION, got: {err:?}"
);
assert!(
err.message().contains("not yet activated"),
"message must mention 'not yet activated', got: {:?}",
err.message()
);
booted.shutdown().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_seq_honours_configured_max_seq_count() {
let dir = tempfile::tempdir().unwrap();
let driver = FileDriver::open_or_init(dir.path()).unwrap();
let server = Server::builder()
.consensus_driver(driver)
.window_ahead(Duration::from_secs(1))
.failover_advance(Duration::from_millis(500))
.max_seq_count(2)
.build()
.unwrap();
let mut booted = boot_server(server).await;
wait_until_serving(&mut booted.state_rx).await;
wait_for_grpc_handshake(booted.addr, Duration::from_secs(5))
.await
.expect("tonic never accepted gRPC handshake");
let mut client = TsoServiceClient::connect(format!("http://{}", booted.addr))
.await
.unwrap();
let err = client
.get_seq(GetSeqRequest {
key: "orders".to_string(),
count: 3,
})
.await
.unwrap_err();
assert_eq!(
err.code(),
tonic::Code::InvalidArgument,
"count above the configured max_seq_count must be rejected, got: {err:?}"
);
let resp = client
.get_seq(GetSeqRequest {
key: "orders".to_string(),
count: 2,
})
.await
.expect("count at the configured cap must be served")
.into_inner();
assert_eq!(resp.count, 2);
assert_eq!(resp.start, 0);
booted.shutdown().await.unwrap();
}